You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2019/10/28 05:44:12 UTC

[hadoop] branch branch-3.2 updated (54d3e21 -> a6cdcf6)

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 54d3e21  HDFS-14923. Remove dead code from HealthMonitor. Contributed by Fei Hui.
     new 3f6f095  HDFS-14354: Refactor MappableBlock to align with the implementation of SCM cache. Contributed by Feilong He.
     new 0c68241  HDFS-14393. Refactor FsDatasetCache for SCM cache implementation. Contributed by Rakesh R
     new 6bc73a9  HDFS-14355 : Implement HDFS cache on SCM by using pure java mapped byte buffer. Contributed by Feilong He.
     new f3571c1  HDFS-14401. Refine the implementation for HDFS cache on SCM. Contributed by Feilong He.
     new 0fe720d  HDFS-14402. Use FileChannel.transferTo() method for transferring block to SCM cache. Contributed by Feilong He.
     new 6eb5fb5  HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He.
     new 75c66bf  HDFS-14458. Report pmem stats to namenode. Contributed by Feilong He.
     new c6d59e6  HDFS-14357. Update documentation for HDFS cache on SCM support. Contributed by Feilong He.
     new dc2fad4  HDFS-14700. Clean up pmem cache before setting pmem cache capacity. Contributed by Feilong He.
     new a6cdcf6  HDFS-14818. Check native pmdk lib by 'hadoop checknative' command. Contributed by Feilong He.

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 BUILDING.txt                                       |  28 ++
 dev-support/bin/dist-copynativelibs                |   8 +
 hadoop-common-project/hadoop-common/pom.xml        |   2 +
 .../hadoop-common/src/CMakeLists.txt               |  21 ++
 .../hadoop-common/src/config.h.cmake               |   1 +
 .../org/apache/hadoop/io/nativeio/NativeIO.java    | 151 +++++++-
 .../apache/hadoop/util/NativeLibraryChecker.java   |  10 +
 .../src/org/apache/hadoop/io/nativeio/NativeIO.c   | 264 ++++++++++++++
 .../src/org/apache/hadoop/io/nativeio/pmdk_load.c  | 106 ++++++
 .../src/org/apache/hadoop/io/nativeio/pmdk_load.h  |  90 +++++
 .../apache/hadoop/io/nativeio/TestNativeIO.java    | 153 ++++++++
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   5 +
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |   9 +
 .../server/datanode/fsdataset/impl/CacheStats.java | 212 +++++++++++
 .../datanode/fsdataset/impl/FsDatasetCache.java    | 231 ++++++------
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |  34 +-
 .../datanode/fsdataset/impl/FsDatasetUtil.java     |  49 +++
 .../datanode/fsdataset/impl/MappableBlock.java     | 155 +-------
 ...MappableBlock.java => MappableBlockLoader.java} | 162 +++++----
 .../fsdataset/impl/MappableBlockLoaderFactory.java |  51 +++
 .../fsdataset/impl/MemoryMappableBlockLoader.java  | 125 +++++++
 .../datanode/fsdataset/impl/MemoryMappedBlock.java |  59 +++
 .../impl/NativePmemMappableBlockLoader.java        | 192 ++++++++++
 .../fsdataset/impl/NativePmemMappedBlock.java      |  85 +++++
 .../fsdataset/impl/PmemMappableBlockLoader.java    | 149 ++++++++
 .../datanode/fsdataset/impl/PmemMappedBlock.java   |  69 ++++
 .../datanode/fsdataset/impl/PmemVolumeManager.java | 398 +++++++++++++++++++++
 .../src/main/resources/hdfs-default.xml            |  10 +
 .../site/markdown/CentralizedCacheManagement.md    |  18 +-
 .../datanode/TestFsDatasetCacheRevocation.java     |  31 +-
 .../impl/TestCacheByPmemMappableBlockLoader.java   | 325 +++++++++++++++++
 .../{ => fsdataset/impl}/TestFsDatasetCache.java   |  15 +-
 32 files changed, 2845 insertions(+), 373 deletions(-)
 create mode 100644 hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
 create mode 100644 hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/CacheStats.java
 copy hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/{MappableBlock.java => MappableBlockLoader.java} (56%)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
 rename hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/{ => fsdataset/impl}/TestFsDatasetCache.java (97%)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 05/10: HDFS-14402. Use FileChannel.transferTo() method for transferring block to SCM cache. Contributed by Feilong He.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 0fe720dc485908b58b1b4d1fc7edf2f65ab446af
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Sun May 26 14:30:11 2019 +0530

    HDFS-14402. Use FileChannel.transferTo() method for transferring block to SCM cache. Contributed by Feilong He.
    
    (cherry picked from commit 37900c5639f8ba8d41b9fedc3d41ee0fbda7d5db)
---
 .../fsdataset/impl/MappableBlockLoader.java        |  59 +++++++++++
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |  59 -----------
 .../fsdataset/impl/PmemMappableBlockLoader.java    | 110 +++------------------
 3 files changed, 75 insertions(+), 153 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 044e5c5..3ec8416 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -18,10 +18,16 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.util.DataChecksum;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -108,6 +114,59 @@ public abstract class MappableBlockLoader {
   }
 
   /**
+   * Verifies the block's checksum. This is an I/O intensive operation.
+   */
+  protected void verifyChecksum(long length, FileInputStream metaIn,
+                                FileChannel blockChannel, String blockFileName)
+      throws IOException {
+    // Verify the checksum from the block's meta file
+    // Get the DataChecksum from the meta file header
+    BlockMetadataHeader header =
+        BlockMetadataHeader.readHeader(new DataInputStream(
+            new BufferedInputStream(metaIn, BlockMetadataHeader
+                .getHeaderSize())));
+    FileChannel metaChannel = null;
+    try {
+      metaChannel = metaIn.getChannel();
+      if (metaChannel == null) {
+        throw new IOException(
+            "Block InputStream meta file has no FileChannel.");
+      }
+      DataChecksum checksum = header.getChecksum();
+      final int bytesPerChecksum = checksum.getBytesPerChecksum();
+      final int checksumSize = checksum.getChecksumSize();
+      final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
+      ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
+      ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
+      // Verify the checksum
+      int bytesVerified = 0;
+      while (bytesVerified < length) {
+        Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
+            "Unexpected partial chunk before EOF");
+        assert bytesVerified % bytesPerChecksum == 0;
+        int bytesRead = fillBuffer(blockChannel, blockBuf);
+        if (bytesRead == -1) {
+          throw new IOException("checksum verification failed: premature EOF");
+        }
+        blockBuf.flip();
+        // Number of read chunks, including partial chunk at end
+        int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
+        checksumBuf.limit(chunks * checksumSize);
+        fillBuffer(metaChannel, checksumBuf);
+        checksumBuf.flip();
+        checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
+            bytesVerified);
+        // Success
+        bytesVerified += bytesRead;
+        blockBuf.clear();
+        checksumBuf.clear();
+      }
+    } finally {
+      IOUtils.closeQuietly(metaChannel);
+    }
+  }
+
+  /**
    * Reads bytes into a buffer until EOF or the buffer's limit is reached.
    */
   protected int fillBuffer(FileChannel channel, ByteBuffer buf)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 919835a..52d8d93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -18,22 +18,16 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
@@ -98,59 +92,6 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
     return mappableBlock;
   }
 
-  /**
-   * Verifies the block's checksum. This is an I/O intensive operation.
-   */
-  private void verifyChecksum(long length, FileInputStream metaIn,
-                             FileChannel blockChannel, String blockFileName)
-      throws IOException {
-    // Verify the checksum from the block's meta file
-    // Get the DataChecksum from the meta file header
-    BlockMetadataHeader header =
-        BlockMetadataHeader.readHeader(new DataInputStream(
-            new BufferedInputStream(metaIn, BlockMetadataHeader
-                .getHeaderSize())));
-    FileChannel metaChannel = null;
-    try {
-      metaChannel = metaIn.getChannel();
-      if (metaChannel == null) {
-        throw new IOException(
-            "Block InputStream meta file has no FileChannel.");
-      }
-      DataChecksum checksum = header.getChecksum();
-      final int bytesPerChecksum = checksum.getBytesPerChecksum();
-      final int checksumSize = checksum.getChecksumSize();
-      final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
-      ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
-      ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
-      // Verify the checksum
-      int bytesVerified = 0;
-      while (bytesVerified < length) {
-        Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
-            "Unexpected partial chunk before EOF");
-        assert bytesVerified % bytesPerChecksum == 0;
-        int bytesRead = fillBuffer(blockChannel, blockBuf);
-        if (bytesRead == -1) {
-          throw new IOException("checksum verification failed: premature EOF");
-        }
-        blockBuf.flip();
-        // Number of read chunks, including partial chunk at end
-        int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
-        checksumBuf.limit(chunks * checksumSize);
-        fillBuffer(metaChannel, checksumBuf);
-        checksumBuf.flip();
-        checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
-            bytesVerified);
-        // Success
-        bytesVerified += bytesRead;
-        blockBuf.clear();
-        checksumBuf.clear();
-      }
-    } finally {
-      IOUtils.closeQuietly(metaChannel);
-    }
-  }
-
   @Override
   public long getCacheUsed() {
     return memCacheStats.getCacheUsed();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index 05a9ba7..239fff8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -18,25 +18,17 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
 /**
@@ -79,112 +71,42 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
    */
   @Override
   MappableBlock load(long length, FileInputStream blockIn,
-                            FileInputStream metaIn, String blockFileName,
-                            ExtendedBlockId key)
+                     FileInputStream metaIn, String blockFileName,
+                     ExtendedBlockId key)
       throws IOException {
     PmemMappedBlock mappableBlock = null;
-    String filePath = null;
+    String cachePath = null;
 
     FileChannel blockChannel = null;
-    RandomAccessFile file = null;
-    MappedByteBuffer out = null;
+    RandomAccessFile cacheFile = null;
     try {
       blockChannel = blockIn.getChannel();
       if (blockChannel == null) {
         throw new IOException("Block InputStream has no FileChannel.");
       }
+      cachePath = pmemVolumeManager.getCachePath(key);
+      cacheFile = new RandomAccessFile(cachePath, "rw");
+      blockChannel.transferTo(0, length, cacheFile.getChannel());
+
+      // Verify checksum for the cached data instead of block file.
+      // The file channel should be repositioned.
+      cacheFile.getChannel().position(0);
+      verifyChecksum(length, metaIn, cacheFile.getChannel(), blockFileName);
 
-      filePath = pmemVolumeManager.getCachePath(key);
-      file = new RandomAccessFile(filePath, "rw");
-      out = file.getChannel().
-          map(FileChannel.MapMode.READ_WRITE, 0, length);
-      if (out == null) {
-        throw new IOException("Failed to map the block " + blockFileName +
-            " to persistent storage.");
-      }
-      verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
-          blockFileName);
       mappableBlock = new PmemMappedBlock(length, key);
       LOG.info("Successfully cached one replica:{} into persistent memory"
-          + ", [cached path={}, length={}]", key, filePath, length);
+          + ", [cached path={}, length={}]", key, cachePath, length);
     } finally {
       IOUtils.closeQuietly(blockChannel);
-      if (out != null) {
-        NativeIO.POSIX.munmap(out);
-      }
-      IOUtils.closeQuietly(file);
+      IOUtils.closeQuietly(cacheFile);
       if (mappableBlock == null) {
-        LOG.debug("Delete {} due to unsuccessful mapping.", filePath);
-        FsDatasetUtil.deleteMappedFile(filePath);
+        LOG.debug("Delete {} due to unsuccessful mapping.", cachePath);
+        FsDatasetUtil.deleteMappedFile(cachePath);
       }
     }
     return mappableBlock;
   }
 
-  /**
-   * Verifies the block's checksum meanwhile maps block to persistent memory.
-   * This is an I/O intensive operation.
-   */
-  private void verifyChecksumAndMapBlock(
-      MappedByteBuffer out, long length, FileInputStream metaIn,
-      FileChannel blockChannel, String blockFileName)
-      throws IOException {
-    // Verify the checksum from the block's meta file
-    // Get the DataChecksum from the meta file header
-    BlockMetadataHeader header =
-        BlockMetadataHeader.readHeader(new DataInputStream(
-            new BufferedInputStream(metaIn, BlockMetadataHeader
-                .getHeaderSize())));
-    FileChannel metaChannel = null;
-    try {
-      metaChannel = metaIn.getChannel();
-      if (metaChannel == null) {
-        throw new IOException("Cannot get FileChannel from " +
-            "Block InputStream meta file.");
-      }
-      DataChecksum checksum = header.getChecksum();
-      final int bytesPerChecksum = checksum.getBytesPerChecksum();
-      final int checksumSize = checksum.getChecksumSize();
-      final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
-      ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
-      ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
-      // Verify the checksum
-      int bytesVerified = 0;
-      while (bytesVerified < length) {
-        Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
-            "Unexpected partial chunk before EOF");
-        assert bytesVerified % bytesPerChecksum == 0;
-        int bytesRead = fillBuffer(blockChannel, blockBuf);
-        if (bytesRead == -1) {
-          throw new IOException(
-              "Checksum verification failed for the block " + blockFileName +
-                  ": premature EOF");
-        }
-        blockBuf.flip();
-        // Number of read chunks, including partial chunk at end
-        int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
-        checksumBuf.limit(chunks * checksumSize);
-        fillBuffer(metaChannel, checksumBuf);
-        checksumBuf.flip();
-        checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
-            bytesVerified);
-
-        // / Copy data to persistent file
-        out.put(blockBuf);
-        // positioning the
-        bytesVerified += bytesRead;
-
-        // Clear buffer
-        blockBuf.clear();
-        checksumBuf.clear();
-      }
-      // Forces to write data to storage device containing the mapped file
-      out.force();
-    } finally {
-      IOUtils.closeQuietly(metaChannel);
-    }
-  }
-
   @Override
   public long getCacheUsed() {
     return pmemVolumeManager.getCacheUsed();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 10/10: HDFS-14818. Check native pmdk lib by 'hadoop checknative' command. Contributed by Feilong He.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a6cdcf676e781f229654474927aa16583d34884e
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Sun Sep 22 22:02:54 2019 +0530

    HDFS-14818. Check native pmdk lib by 'hadoop checknative' command. Contributed by Feilong He.
    
    (cherry picked from commit 659c88801d008bb352d10a1cb3bd0e401486cc9b)
---
 .../hadoop-common/src/CMakeLists.txt               |  2 +-
 .../org/apache/hadoop/io/nativeio/NativeIO.java    | 28 +++++++++++++++++-----
 .../apache/hadoop/util/NativeLibraryChecker.java   | 10 ++++++++
 .../src/org/apache/hadoop/io/nativeio/NativeIO.c   | 14 ++++++++++-
 .../src/org/apache/hadoop/io/nativeio/pmdk_load.c  | 28 +++++++++++-----------
 .../src/org/apache/hadoop/io/nativeio/pmdk_load.h  |  5 ----
 .../datanode/fsdataset/impl/FsDatasetCache.java    | 15 +++++++++---
 7 files changed, 72 insertions(+), 30 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
index 771c685..10591f6 100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -170,7 +170,7 @@ if(REQUIRE_PMDK)
     set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
 
     if(PMDK_LIBRARY)
-        GET_FILENAME_COMPONENT(HADOOP_PMDK_LIBRARY ${PMDK_LIBRARY} NAME)
+        GET_FILENAME_COMPONENT(HADOOP_PMDK_LIBRARY ${PMDK_LIBRARY} REALPATH)
         set(PMDK_SOURCE_FILES ${SRC}/io/nativeio/pmdk_load.c)
     else(PMDK_LIBRARY)
         MESSAGE(FATAL_ERROR "The required PMDK library is NOT found. PMDK_LIBRARY=${PMDK_LIBRARY}")
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 1d0eab7..973afa3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -120,16 +120,19 @@ public class NativeIO {
       public String getMessage() {
         String msg;
         switch (stateCode) {
+        // -1 represents UNSUPPORTED.
         case -1:
-          msg = "The native code is built without PMDK support.";
+          msg = "The native code was built without PMDK support.";
           break;
+        // 1 represents PMDK_LIB_NOT_FOUND.
         case 1:
-          msg = "The native code is built with PMDK support, but PMDK libs " +
-              "are NOT found in execution environment or failed to be loaded.";
+          msg = "The native code was built with PMDK support, but PMDK libs " +
+              "were NOT found in execution environment or failed to be loaded.";
           break;
+        // 0 represents SUPPORTED.
         case 0:
-          msg = "The native code is built with PMDK support, and PMDK libs " +
-              "are loaded successfully.";
+          msg = "The native code was built with PMDK support, and PMDK libs " +
+              "were loaded successfully.";
           break;
         default:
           msg = "The state code: " + stateCode + " is unrecognized!";
@@ -140,7 +143,7 @@ public class NativeIO {
 
     // Denotes the state of supporting PMDK. The value is set by JNI.
     private static SupportState pmdkSupportState =
-        SupportState.PMDK_LIB_NOT_FOUND;
+        SupportState.UNSUPPORTED;
 
     private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
 
@@ -177,6 +180,14 @@ public class NativeIO {
       LOG.error("The state code: " + stateCode + " is unrecognized!");
     }
 
+    public static String getPmdkSupportStateMessage() {
+      if (getPmdkLibPath() != null) {
+        return pmdkSupportState.getMessage() +
+            " The pmdk lib path: " + getPmdkLibPath();
+      }
+      return pmdkSupportState.getMessage();
+    }
+
     public static boolean isPmdkAvailable() {
       LOG.info(pmdkSupportState.getMessage());
       return pmdkSupportState == SupportState.SUPPORTED;
@@ -242,8 +253,13 @@ public class NativeIO {
           NativeIO.POSIX.pmemSync(region.getAddress(), region.getLength());
         }
       }
+
+      public static String getPmdkLibPath() {
+        return POSIX.getPmdkLibPath();
+      }
     }
 
+    private static native String getPmdkLibPath();
     private static native boolean isPmemCheck(long address, long length);
     private static native PmemMappedRegion pmemCreateMapFile(String path,
         long length);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
index 776839c..2338824 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +70,7 @@ public class NativeLibraryChecker {
     boolean snappyLoaded = false;
     boolean isalLoaded = false;
     boolean zStdLoaded = false;
+    boolean pmdkLoaded = false;
     // lz4 is linked within libhadoop
     boolean lz4Loaded = nativeHadoopLoaded;
     boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf);
@@ -80,6 +82,7 @@ public class NativeLibraryChecker {
     String zlibLibraryName = "";
     String snappyLibraryName = "";
     String isalDetail = "";
+    String pmdkDetail = "";
     String zstdLibraryName = "";
     String lz4LibraryName = "";
     String bzip2LibraryName = "";
@@ -110,6 +113,12 @@ public class NativeLibraryChecker {
         isalLoaded = true;
       }
 
+      pmdkDetail = NativeIO.POSIX.getPmdkSupportStateMessage();
+      pmdkLoaded = NativeIO.POSIX.isPmdkAvailable();
+      if (pmdkLoaded) {
+        pmdkDetail = NativeIO.POSIX.Pmem.getPmdkLibPath();
+      }
+
       openSslDetail = OpensslCipher.getLoadingFailureReason();
       if (openSslDetail != null) {
         openSslLoaded = false;
@@ -148,6 +157,7 @@ public class NativeLibraryChecker {
     System.out.printf("bzip2:   %b %s%n", bzip2Loaded, bzip2LibraryName);
     System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail);
     System.out.printf("ISA-L:   %b %s%n", isalLoaded, isalDetail);
+    System.out.printf("PMDK:    %b %s%n", pmdkLoaded, pmdkDetail);
 
     if (Shell.WINDOWS) {
       System.out.printf("winutils: %b %s%n", winutilsExists, winutilsPath);
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
index 3a0641b..b0b5151 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
@@ -292,10 +292,13 @@ static int loadPmdkLib(JNIEnv *env) {
   if (mid == 0) {
     return 0;
   }
+
   if (strlen(errMsg) > 0) {
+    // Set PMDK support state to 1 which represents PMDK_LIB_NOT_FOUND.
     (*env)->CallStaticVoidMethod(env, clazz, mid, 1);
     return 0;
   }
+  // Set PMDK support state to 0 which represents SUPPORTED.
   (*env)->CallStaticVoidMethod(env, clazz, mid, 0);
   return 1;
 }
@@ -1620,7 +1623,7 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pm
     char msg[1000];
     succeed = pmdkLoader->pmem_msync(address, length);
     // succeed = -1 failure
-    if (succeed = -1) {
+    if (succeed == -1) {
       snprintf(msg, sizeof(msg), "Failed to msync region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
       THROW(env, "java/io/IOException", msg);
       return;
@@ -1631,6 +1634,15 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pm
   #endif
   }
 
+JNIEXPORT jstring JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_getPmdkLibPath
+  (JNIEnv * env, jclass thisClass) {
+    jstring libpath = NULL;
+
+    #ifdef HADOOP_PMDK_LIBRARY
+      libpath = (*env)->NewStringUTF(env, HADOOP_PMDK_LIBRARY);
+    #endif
+    return libpath;
+  }
 
 #ifdef __cplusplus
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
index f7d6cfb..502508c 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
@@ -59,11 +59,11 @@ static const char* load_functions() {
 void load_pmdk_lib(char* err, size_t err_len) {
   const char* errMsg;
   const char* library = NULL;
-#ifdef UNIX
-  Dl_info dl_info;
-#else
-  LPTSTR filename = NULL;
-#endif
+  #ifdef UNIX
+    Dl_info dl_info;
+  #else
+    LPTSTR filename = NULL;
+  #endif
 
   err[0] = '\0';
 
@@ -88,15 +88,15 @@ void load_pmdk_lib(char* err, size_t err_len) {
     snprintf(err, err_len, "Loading functions from PMDK failed: %s", errMsg);
   }
 
-#ifdef UNIX
-  if(dladdr(pmdkLoader->pmem_map_file, &dl_info)) {
-    library = dl_info.dli_fname;
-  }
-#else
-  if (GetModuleFileName(pmdkLoader->libec, filename, 256) > 0) {
-    library = filename;
-  }
-#endif
+  #ifdef UNIX
+    if (dladdr(pmdkLoader->pmem_map_file, &dl_info)) {
+      library = dl_info.dli_fname;
+    }
+  #else
+    if (GetModuleFileName(pmdkLoader->libec, filename, 256) > 0) {
+      library = filename;
+    }
+  #endif
 
   if (library == NULL) {
     library = HADOOP_PMDK_LIBRARY;
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
index c93a076..a668377 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
@@ -80,11 +80,6 @@ void *myDlsym(void *handle, const char *symbol) {
 #endif
 
 /**
- * Return 0 if not support, 1 otherwise.
- */
-int build_support_pmdk();
-
-/**
  * Initialize and load PMDK library, returning error message if any.
  *
  * @param err     The err message buffer.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 1514927..283f13b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -267,8 +267,8 @@ public class FsDatasetCache {
     Value prevValue = mappableBlockMap.get(key);
     boolean deferred = false;
 
-    if (!dataset.datanode.getShortCircuitRegistry().
-            processBlockMunlockRequest(key)) {
+    if (cacheLoader.isTransientCache() && !dataset.datanode.
+        getShortCircuitRegistry().processBlockMunlockRequest(key)) {
       deferred = true;
     }
     if (prevValue == null) {
@@ -438,7 +438,11 @@ public class FsDatasetCache {
         }
         LOG.debug("Successfully cached {}.  We are now caching {} bytes in"
             + " total.", key, newUsedBytes);
-        dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key);
+        // Only applicable to DRAM cache.
+        if (cacheLoader.isTransientCache()) {
+          dataset.datanode.
+              getShortCircuitRegistry().processBlockMlockEvent(key);
+        }
         numBlocksCached.addAndGet(1);
         dataset.datanode.getMetrics().incrBlocksCached(1);
         success = true;
@@ -476,6 +480,11 @@ public class FsDatasetCache {
     }
 
     private boolean shouldDefer() {
+      // Currently, defer condition is just checked for DRAM cache case.
+      if (!cacheLoader.isTransientCache()) {
+        return false;
+      }
+
       /* If revocationTimeMs == 0, this is an immediate uncache request.
        * No clients were anchored at the time we made the request. */
       if (revocationTimeMs == 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/10: HDFS-14393. Refactor FsDatasetCache for SCM cache implementation. Contributed by Rakesh R

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 0c6824120af4c95d912949f99bc5df57983ec9b3
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Fri Mar 29 00:18:15 2019 +0530

    HDFS-14393. Refactor FsDatasetCache for SCM cache implementation. Contributed by Rakesh R
    
    (cherry picked from commit f3f51284d57ef2e0c7e968b6eea56eab578f7e93)
---
 .../datanode/fsdataset/impl/FsDatasetCache.java    | 132 ++-----------
 .../fsdataset/impl/MappableBlockLoader.java        |  20 ++
 .../datanode/fsdataset/impl/MemoryCacheStats.java  | 212 +++++++++++++++++++++
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |  24 ++-
 .../datanode/TestFsDatasetCacheRevocation.java     |  31 +--
 .../{ => fsdataset/impl}/TestFsDatasetCache.java   |  10 +-
 6 files changed, 298 insertions(+), 131 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 9efd11a..f90a4b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,104 +130,10 @@ public class FsDatasetCache {
 
   private final long revocationPollingMs;
 
-  /**
-   * The approximate amount of cache space in use.
-   *
-   * This number is an overestimate, counting bytes that will be used only
-   * if pending caching operations succeed.  It does not take into account
-   * pending uncaching operations.
-   *
-   * This overestimate is more useful to the NameNode than an underestimate,
-   * since we don't want the NameNode to assign us more replicas than
-   * we can cache, because of the current batch of operations.
-   */
-  private final UsedBytesCount usedBytesCount;
-
-  public static class PageRounder {
-    private final long osPageSize =
-        NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
-
-    /**
-     * Round up a number to the operating system page size.
-     */
-    public long roundUp(long count) {
-      return (count + osPageSize - 1) & (~(osPageSize - 1));
-    }
-
-    /**
-     * Round down a number to the operating system page size.
-     */
-    public long roundDown(long count) {
-      return count & (~(osPageSize - 1));
-    }
-  }
-
-  private class UsedBytesCount {
-    private final AtomicLong usedBytes = new AtomicLong(0);
-    
-    private final PageRounder rounder = new PageRounder();
-
-    /**
-     * Try to reserve more bytes.
-     *
-     * @param count    The number of bytes to add.  We will round this
-     *                 up to the page size.
-     *
-     * @return         The new number of usedBytes if we succeeded;
-     *                 -1 if we failed.
-     */
-    long reserve(long count) {
-      count = rounder.roundUp(count);
-      while (true) {
-        long cur = usedBytes.get();
-        long next = cur + count;
-        if (next > maxBytes) {
-          return -1;
-        }
-        if (usedBytes.compareAndSet(cur, next)) {
-          return next;
-        }
-      }
-    }
-    
-    /**
-     * Release some bytes that we're using.
-     *
-     * @param count    The number of bytes to release.  We will round this
-     *                 up to the page size.
-     *
-     * @return         The new number of usedBytes.
-     */
-    long release(long count) {
-      count = rounder.roundUp(count);
-      return usedBytes.addAndGet(-count);
-    }
-
-    /**
-     * Release some bytes that we're using rounded down to the page size.
-     *
-     * @param count    The number of bytes to release.  We will round this
-     *                 down to the page size.
-     *
-     * @return         The new number of usedBytes.
-     */
-    long releaseRoundDown(long count) {
-      count = rounder.roundDown(count);
-      return usedBytes.addAndGet(-count);
-    }
-
-    long get() {
-      return usedBytes.get();
-    }
-  }
-
-  /**
-   * The total cache capacity in bytes.
-   */
-  private final long maxBytes;
-
   private final MappableBlockLoader mappableBlockLoader;
 
+  private final MemoryCacheStats memCacheStats;
+
   /**
    * Number of cache commands that could not be completed successfully
    */
@@ -240,12 +145,10 @@ public class FsDatasetCache {
 
   public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
     this.dataset = dataset;
-    this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
     ThreadFactory workerFactory = new ThreadFactoryBuilder()
         .setDaemon(true)
         .setNameFormat("FsDatasetCache-%d-" + dataset.toString())
         .build();
-    this.usedBytesCount = new UsedBytesCount();
     this.uncachingExecutor = new ThreadPoolExecutor(
             0, 1,
             60, TimeUnit.SECONDS,
@@ -270,7 +173,11 @@ public class FsDatasetCache {
               ".  Reconfigure this to " + minRevocationPollingMs);
     }
     this.revocationPollingMs = confRevocationPollingMs;
-    this.mappableBlockLoader = new MemoryMappableBlockLoader();
+
+    this.mappableBlockLoader = new MemoryMappableBlockLoader(this);
+    // Both lazy writer and read cache are sharing this statistics.
+    this.memCacheStats = new MemoryCacheStats(
+        dataset.datanode.getDnConf().getMaxLockedMemory());
   }
 
   /**
@@ -371,7 +278,7 @@ public class FsDatasetCache {
    *                 -1 if we failed.
    */
   long reserve(long count) {
-    return usedBytesCount.reserve(count);
+    return memCacheStats.reserve(count);
   }
 
   /**
@@ -383,7 +290,7 @@ public class FsDatasetCache {
    * @return         The new number of usedBytes.
    */
   long release(long count) {
-    return usedBytesCount.release(count);
+    return memCacheStats.release(count);
   }
 
   /**
@@ -395,7 +302,7 @@ public class FsDatasetCache {
    * @return         The new number of usedBytes.
    */
   long releaseRoundDown(long count) {
-    return usedBytesCount.releaseRoundDown(count);
+    return memCacheStats.releaseRoundDown(count);
   }
 
   /**
@@ -404,14 +311,14 @@ public class FsDatasetCache {
    * @return the OS page size.
    */
   long getOsPageSize() {
-    return usedBytesCount.rounder.osPageSize;
+    return memCacheStats.getPageSize();
   }
 
   /**
    * Round up to the OS page size.
    */
   long roundUpPageSize(long count) {
-    return usedBytesCount.rounder.roundUp(count);
+    return memCacheStats.roundUpPageSize(count);
   }
 
   /**
@@ -437,14 +344,14 @@ public class FsDatasetCache {
       MappableBlock mappableBlock = null;
       ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
           key.getBlockId(), length, genstamp);
-      long newUsedBytes = reserve(length);
+      long newUsedBytes = mappableBlockLoader.reserve(length);
       boolean reservedBytes = false;
       try {
         if (newUsedBytes < 0) {
           LOG.warn("Failed to cache " + key + ": could not reserve " + length +
               " more bytes in the cache: " +
               DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
-              " of " + maxBytes + " exceeded.");
+              " of " + memCacheStats.getCacheCapacity() + " exceeded.");
           return;
         }
         reservedBytes = true;
@@ -497,10 +404,10 @@ public class FsDatasetCache {
         IOUtils.closeQuietly(metaIn);
         if (!success) {
           if (reservedBytes) {
-            release(length);
+            mappableBlockLoader.release(length);
           }
           LOG.debug("Caching of {} was aborted.  We are now caching only {} "
-                  + "bytes in total.", key, usedBytesCount.get());
+                  + "bytes in total.", key, memCacheStats.getCacheUsed());
           IOUtils.closeQuietly(mappableBlock);
           numBlocksFailedToCache.incrementAndGet();
 
@@ -574,7 +481,8 @@ public class FsDatasetCache {
       synchronized (FsDatasetCache.this) {
         mappableBlockMap.remove(key);
       }
-      long newUsedBytes = release(value.mappableBlock.getLength());
+      long newUsedBytes = mappableBlockLoader
+          .release(value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
       dataset.datanode.getMetrics().incrBlocksUncached(1);
       if (revocationTimeMs != 0) {
@@ -593,14 +501,14 @@ public class FsDatasetCache {
    * Get the approximate amount of cache space used.
    */
   public long getCacheUsed() {
-    return usedBytesCount.get();
+    return memCacheStats.getCacheUsed();
   }
 
   /**
    * Get the maximum amount of bytes we can cache.  This is a constant.
    */
   public long getCacheCapacity() {
-    return maxBytes;
+    return memCacheStats.getCacheCapacity();
   }
 
   public long getNumBlocksFailedToCache() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index a323f78..0f5ec2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -58,6 +58,26 @@ public abstract class MappableBlockLoader {
       throws IOException;
 
   /**
+   * Try to reserve some given bytes.
+   *
+   * @param bytesCount
+   *          The number of bytes to add.
+   *
+   * @return The new number of usedBytes if we succeeded; -1 if we failed.
+   */
+  abstract long reserve(long bytesCount);
+
+  /**
+   * Release some bytes that we're using.
+   *
+   * @param bytesCount
+   *          The number of bytes to release.
+   *
+   * @return The new number of usedBytes.
+   */
+  abstract long release(long bytesCount);
+
+  /**
    * Reads bytes into a buffer until EOF or the buffer's limit is reached.
    */
   protected int fillBuffer(FileChannel channel, ByteBuffer buf)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java
new file mode 100644
index 0000000..d276c27
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Keeps statistics for the memory cache.
+ */
+class MemoryCacheStats {
+
+  /**
+   * The approximate amount of cache space in use.
+   *
+   * This number is an overestimate, counting bytes that will be used only if
+   * pending caching operations succeed. It does not take into account pending
+   * uncaching operations.
+   *
+   * This overestimate is more useful to the NameNode than an underestimate,
+   * since we don't want the NameNode to assign us more replicas than we can
+   * cache, because of the current batch of operations.
+   */
+  private final UsedBytesCount usedBytesCount;
+
+  /**
+   * The total cache capacity in bytes.
+   */
+  private final long maxBytes;
+
+  MemoryCacheStats(long maxBytes) {
+    this.usedBytesCount = new UsedBytesCount();
+    this.maxBytes = maxBytes;
+  }
+
+  /**
+   * Used to count operating system page size.
+   */
+  @VisibleForTesting
+  static class PageRounder {
+    private final long osPageSize = NativeIO.POSIX.getCacheManipulator()
+        .getOperatingSystemPageSize();
+
+    /**
+     * Round up a number to the operating system page size.
+     */
+    public long roundUp(long count) {
+      return (count + osPageSize - 1) & (~(osPageSize - 1));
+    }
+
+    /**
+     * Round down a number to the operating system page size.
+     */
+    public long roundDown(long count) {
+      return count & (~(osPageSize - 1));
+    }
+  }
+
+  /**
+   * Counts used bytes for memory.
+   */
+  private class UsedBytesCount {
+    private final AtomicLong usedBytes = new AtomicLong(0);
+
+    private MemoryCacheStats.PageRounder rounder = new PageRounder();
+
+    /**
+     * Try to reserve more bytes.
+     *
+     * @param count
+     *          The number of bytes to add. We will round this up to the page
+     *          size.
+     *
+     * @return The new number of usedBytes if we succeeded; -1 if we failed.
+     */
+    long reserve(long count) {
+      count = rounder.roundUp(count);
+      while (true) {
+        long cur = usedBytes.get();
+        long next = cur + count;
+        if (next > getCacheCapacity()) {
+          return -1;
+        }
+        if (usedBytes.compareAndSet(cur, next)) {
+          return next;
+        }
+      }
+    }
+
+    /**
+     * Release some bytes that we're using.
+     *
+     * @param count
+     *          The number of bytes to release. We will round this up to the
+     *          page size.
+     *
+     * @return The new number of usedBytes.
+     */
+    long release(long count) {
+      count = rounder.roundUp(count);
+      return usedBytes.addAndGet(-count);
+    }
+
+    /**
+     * Release some bytes that we're using rounded down to the page size.
+     *
+     * @param count
+     *          The number of bytes to release. We will round this down to the
+     *          page size.
+     *
+     * @return The new number of usedBytes.
+     */
+    long releaseRoundDown(long count) {
+      count = rounder.roundDown(count);
+      return usedBytes.addAndGet(-count);
+    }
+
+    long get() {
+      return usedBytes.get();
+    }
+  }
+
+  // Stats related methods for FSDatasetMBean
+
+  /**
+   * Get the approximate amount of cache space used.
+   */
+  public long getCacheUsed() {
+    return usedBytesCount.get();
+  }
+
+  /**
+   * Get the maximum amount of bytes we can cache. This is a constant.
+   */
+  public long getCacheCapacity() {
+    return maxBytes;
+  }
+
+  /**
+   * Try to reserve more bytes.
+   *
+   * @param count
+   *          The number of bytes to add. We will round this up to the page
+   *          size.
+   *
+   * @return The new number of usedBytes if we succeeded; -1 if we failed.
+   */
+  long reserve(long count) {
+    return usedBytesCount.reserve(count);
+  }
+
+  /**
+   * Release some bytes that we're using.
+   *
+   * @param count
+   *          The number of bytes to release. We will round this up to the
+   *          page size.
+   *
+   * @return The new number of usedBytes.
+   */
+  long release(long count) {
+    return usedBytesCount.release(count);
+  }
+
+  /**
+   * Release some bytes that we're using rounded down to the page size.
+   *
+   * @param count
+   *          The number of bytes to release. We will round this down to the
+   *          page size.
+   *
+   * @return The new number of usedBytes.
+   */
+  long releaseRoundDown(long count) {
+    return usedBytesCount.releaseRoundDown(count);
+  }
+
+  /**
+   * Get the OS page size.
+   *
+   * @return the OS page size.
+   */
+  long getPageSize() {
+    return usedBytesCount.rounder.osPageSize;
+  }
+
+  /**
+   * Round up to the OS page size.
+   */
+  long roundUpPageSize(long count) {
+    return usedBytesCount.rounder.roundUp(count);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 7a0f7c7..d93193e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -42,6 +42,18 @@ import java.nio.channels.FileChannel;
 @InterfaceStability.Unstable
 public class MemoryMappableBlockLoader extends MappableBlockLoader {
 
+  private final FsDatasetCache cacheManager;
+
+  /**
+   * Constructs memory mappable loader.
+   *
+   * @param cacheManager
+   *          FsDatasetCache reference.
+   */
+  MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
+    this.cacheManager = cacheManager;
+  }
+
   /**
    * Load the block.
    *
@@ -90,7 +102,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
   /**
    * Verifies the block's checksum. This is an I/O intensive operation.
    */
-  public void verifyChecksum(long length, FileInputStream metaIn,
+  private void verifyChecksum(long length, FileInputStream metaIn,
                              FileChannel blockChannel, String blockFileName)
       throws IOException {
     // Verify the checksum from the block's meta file
@@ -139,4 +151,14 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
       IOUtils.closeQuietly(metaChannel);
     }
   }
+
+  @Override
+  long reserve(long bytesCount) {
+    return cacheManager.reserve(bytesCount);
+  }
+
+  @Override
+  long release(long bytesCount) {
+    return cacheManager.release(bytesCount);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
index 40de320..fd72804 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestFsDatasetCache;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
@@ -50,6 +51,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Tests FsDatasetCache behaviors.
+ */
 public class TestFsDatasetCacheRevocation {
   private static final Logger LOG = LoggerFactory.getLogger(
       TestFsDatasetCacheRevocation.class);
@@ -86,7 +90,7 @@ public class TestFsDatasetCacheRevocation {
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
     conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
-      new File(sockDir.getDir(), "sock").getAbsolutePath());
+        new File(sockDir.getDir(), "sock").getAbsolutePath());
     return conf;
   }
 
@@ -112,19 +116,18 @@ public class TestFsDatasetCacheRevocation {
     DistributedFileSystem dfs = cluster.getFileSystem();
 
     // Create and cache a file.
-    final String TEST_FILE = "/test_file";
-    DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
+    final String testFile = "/test_file";
+    DFSTestUtil.createFile(dfs, new Path(testFile),
         BLOCK_SIZE, (short)1, 0xcafe);
     dfs.addCachePool(new CachePoolInfo("pool"));
-    long cacheDirectiveId =
-      dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
-        setPool("pool").setPath(new Path(TEST_FILE)).
-          setReplication((short) 1).build());
+    long cacheDirectiveId = dfs
+        .addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool")
+            .setPath(new Path(testFile)).setReplication((short) 1).build());
     FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
     DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
 
     // Mmap the file.
-    FSDataInputStream in = dfs.open(new Path(TEST_FILE));
+    FSDataInputStream in = dfs.open(new Path(testFile));
     ByteBuffer buf =
         in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
 
@@ -143,8 +146,8 @@ public class TestFsDatasetCacheRevocation {
   }
 
   /**
-   * Test that when we have an uncache request, and the client refuses to release
-   * the replica for a long time, we will un-mlock it.
+   * Test that when we have an uncache request, and the client refuses to
+   * release the replica for a long time, we will un-mlock it.
    */
   @Test(timeout=120000)
   public void testRevocation() throws Exception {
@@ -163,19 +166,19 @@ public class TestFsDatasetCacheRevocation {
     DistributedFileSystem dfs = cluster.getFileSystem();
 
     // Create and cache a file.
-    final String TEST_FILE = "/test_file2";
-    DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
+    final String testFile = "/test_file2";
+    DFSTestUtil.createFile(dfs, new Path(testFile),
         BLOCK_SIZE, (short)1, 0xcafe);
     dfs.addCachePool(new CachePoolInfo("pool"));
     long cacheDirectiveId =
         dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
-            setPool("pool").setPath(new Path(TEST_FILE)).
+            setPool("pool").setPath(new Path(testFile)).
             setReplication((short) 1).build());
     FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
     DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
 
     // Mmap the file.
-    FSDataInputStream in = dfs.open(new Path(TEST_FILE));
+    FSDataInputStream in = dfs.open(new Path(testFile));
     ByteBuffer buf =
         in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
similarity index 98%
rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
index a0c6498..7e97960 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -59,9 +59,11 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryCacheStats.PageRounder;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@@ -100,7 +102,7 @@ public class TestFsDatasetCache {
       LoggerFactory.getLogger(TestFsDatasetCache.class);
 
   // Most Linux installs allow a default of 64KB locked memory
-  static final long CACHE_CAPACITY = 64 * 1024;
+  public static final long CACHE_CAPACITY = 64 * 1024;
   // mlock always locks the entire page. So we don't need to deal with this
   // rounding, use the OS page size for the block size.
   private static final long PAGE_SIZE =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 08/10: HDFS-14357. Update documentation for HDFS cache on SCM support. Contributed by Feilong He.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit c6d59e6a4796b626fcf0046f9bc435f4f2217259
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Mon Jul 15 13:18:23 2019 +0530

    HDFS-14357. Update documentation for HDFS cache on SCM support. Contributed by Feilong He.
    
    (cherry picked from commit 30a8f840f1572129fe7d02f8a784c47ab57ce89a)
---
 .../src/site/markdown/CentralizedCacheManagement.md    | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/CentralizedCacheManagement.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/CentralizedCacheManagement.md
index 7568949..8880ea5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/CentralizedCacheManagement.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/CentralizedCacheManagement.md
@@ -32,6 +32,8 @@ Centralized cache management in HDFS has many significant advantages.
 
 4.  Centralized caching can improve overall cluster memory utilization. When relying on the OS buffer cache at each DataNode, repeated reads of a block will result in all *n* replicas of the block being pulled into buffer cache. With centralized cache management, a user can explicitly pin only *m* of the *n* replicas, saving *n-m* memory.
 
+5.  HDFS supports non-volatile storage class memory (SCM, also known as persistent memory) cache in Linux platform. User can enable either memory cache or SCM cache for a DataNode. Memory cache and SCM cache can coexist among DataNodes. In the current implementation, the cache data in SCM will be cleaned up when DataNode restarts. Persistent HDFS cache support on SCM will be considered in the future.
+
 Use Cases
 ---------
 
@@ -200,11 +202,21 @@ Configuration
 
 In order to lock block files into memory, the DataNode relies on native JNI code found in `libhadoop.so` or `hadoop.dll` on Windows. Be sure to [enable JNI](../hadoop-common/NativeLibraries.html) if you are using HDFS centralized cache management.
 
+Currently, there are two implementations for persistent memory cache. The default one is pure Java based implementation and the other is native implementation which leverages PMDK library to improve the performance of cache write and cache read.
+
+To enable PMDK based implementation, please follow the below steps.
+
+1. Install PMDK library. Please refer to the official site http://pmem.io/ for detailed information.
+
+2. Build Hadoop with PMDK support. Please refer to "PMDK library build options" section in `BUILDING.txt` in the source code.
+
+To verify that PMDK is correctly detected by Hadoop, run the `hadoop checknative` command.
+
 ### Configuration Properties
 
 #### Required
 
-Be sure to configure the following:
+Be sure to configure one of the following properties for DRAM cache or persistent memory cache. Please note that DRAM cache and persistent cache cannot coexist on a DataNode.
 
 *   dfs.datanode.max.locked.memory
 
@@ -212,6 +224,10 @@ Be sure to configure the following:
 
     This setting is shared with the [Lazy Persist Writes feature](./MemoryStorage.html). The Data Node will ensure that the combined memory used by Lazy Persist Writes and Centralized Cache Management does not exceed the amount configured in `dfs.datanode.max.locked.memory`.
 
+*   dfs.datanode.cache.pmem.dirs
+
+    This property specifies the cache volume of persistent memory. For multiple volumes, they should be separated by “,”, e.g. “/mnt/pmem0, /mnt/pmem1”. The default value is empty. If this property is configured, the volume capacity will be detected. And there is no need to configure `dfs.datanode.max.locked.memory`.
+
 #### Optional
 
 The following properties are not required, but may be specified for tuning:


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 06/10: HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 6eb5fb596ffa44e4c4fe67e960f396cb4b37c80f
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Wed Jun 5 21:33:00 2019 +0800

    HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He.
    
    (cherry picked from commit d1aad444907e1fc5314e8e64529e57c51ed7561c)
---
 BUILDING.txt                                       |  28 +++
 dev-support/bin/dist-copynativelibs                |   8 +
 hadoop-common-project/hadoop-common/pom.xml        |   2 +
 .../hadoop-common/src/CMakeLists.txt               |  21 ++
 .../hadoop-common/src/config.h.cmake               |   1 +
 .../org/apache/hadoop/io/nativeio/NativeIO.java    | 135 ++++++++++-
 .../src/org/apache/hadoop/io/nativeio/NativeIO.c   | 252 +++++++++++++++++++++
 .../src/org/apache/hadoop/io/nativeio/pmdk_load.c  | 106 +++++++++
 .../src/org/apache/hadoop/io/nativeio/pmdk_load.h  |  95 ++++++++
 .../apache/hadoop/io/nativeio/TestNativeIO.java    | 153 +++++++++++++
 .../datanode/fsdataset/impl/FsDatasetCache.java    |  22 ++
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |   8 +
 .../datanode/fsdataset/impl/FsDatasetUtil.java     |  22 ++
 .../datanode/fsdataset/impl/MappableBlock.java     |   6 +
 .../fsdataset/impl/MappableBlockLoader.java        |  11 +-
 .../fsdataset/impl/MappableBlockLoaderFactory.java |   4 +
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |   8 +-
 .../datanode/fsdataset/impl/MemoryMappedBlock.java |   5 +
 ...der.java => NativePmemMappableBlockLoader.java} | 166 +++++++-------
 ...MappedBlock.java => NativePmemMappedBlock.java} |  49 ++--
 .../fsdataset/impl/PmemMappableBlockLoader.java    |  10 +-
 .../datanode/fsdataset/impl/PmemMappedBlock.java   |   5 +
 22 files changed, 1009 insertions(+), 108 deletions(-)

diff --git a/BUILDING.txt b/BUILDING.txt
index 1b900c3..0698469 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -86,6 +86,8 @@ Optional packages:
   $ sudo apt-get install fuse libfuse-dev
 * ZStandard compression
     $ sudo apt-get install zstd
+* PMDK library for storage class memory(SCM) as HDFS cache backend
+  Please refer to http://pmem.io/ and https://github.com/pmem/pmdk
 
 ----------------------------------------------------------------------------------
 Maven main modules:
@@ -262,6 +264,32 @@ Maven build goals:
    invoke, run 'mvn dependency-check:aggregate'. Note that this plugin
    requires maven 3.1.1 or greater.
 
+ PMDK library build options:
+
+   The Persistent Memory Development Kit (PMDK), formerly known as NVML, is a growing
+   collection of libraries which have been developed for various use cases, tuned,
+   validated to production quality, and thoroughly documented. These libraries are built
+   on the Direct Access (DAX) feature available in both Linux and Windows, which allows
+   applications directly load/store access to persistent memory by memory-mapping files
+   on a persistent memory aware file system.
+
+   It is currently an optional component, meaning that Hadoop can be built without
+   this dependency. Please Note the library is used via dynamic module. For getting
+   more details please refer to the official sites:
+   http://pmem.io/ and https://github.com/pmem/pmdk.
+
+  * -Drequire.pmdk is used to build the project with PMDK libraries forcibly. With this
+    option provided, the build will fail if libpmem library is not found. If this option
+    is not given, the build will generate a version of Hadoop with libhadoop.so.
+    And storage class memory(SCM) backed HDFS cache is still supported without PMDK involved.
+    Because PMDK can bring better caching write/read performance, it is recommended to build
+    the project with this option if user plans to use SCM backed HDFS cache.
+  * -Dpmdk.lib is used to specify a nonstandard location for PMDK libraries if they are not
+    under /usr/lib or /usr/lib64.
+  * -Dbundle.pmdk is used to copy the specified libpmem libraries into the distribution tar
+    package. This option requires that -Dpmdk.lib is specified. With -Dbundle.pmdk provided,
+    the build will fail if -Dpmdk.lib is not specified.
+
 ----------------------------------------------------------------------------------
 Building components separately
 
diff --git a/dev-support/bin/dist-copynativelibs b/dev-support/bin/dist-copynativelibs
index 67d2edf..4a783f0 100755
--- a/dev-support/bin/dist-copynativelibs
+++ b/dev-support/bin/dist-copynativelibs
@@ -96,6 +96,12 @@ for i in "$@"; do
     --isalbundle=*)
       ISALBUNDLE=${i#*=}
     ;;
+    --pmdklib=*)
+      PMDKLIB=${i#*=}
+    ;;
+    --pmdkbundle=*)
+      PMDKBUNDLE=${i#*=}
+    ;;
     --opensslbinbundle=*)
       OPENSSLBINBUNDLE=${i#*=}
     ;;
@@ -153,6 +159,8 @@ if [[ -d "${LIB_DIR}" ]]; then
   bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}"
 
   bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}"
+
+  bundle_native_lib "${PMDKBUNDLE}" "pmdk.lib" "pmdk" "${PMDKLIB}"
 fi
 
 # Windows
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 23bd270..5f48fb8 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -713,6 +713,8 @@
                     <REQUIRE_ISAL>${require.isal} </REQUIRE_ISAL>
                     <CUSTOM_ISAL_PREFIX>${isal.prefix} </CUSTOM_ISAL_PREFIX>
                     <CUSTOM_ISAL_LIB>${isal.lib} </CUSTOM_ISAL_LIB>
+                    <REQUIRE_PMDK>${require.pmdk}</REQUIRE_PMDK>
+                    <CUSTOM_PMDK_LIB>${pmdk.lib}</CUSTOM_PMDK_LIB>
                     <REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL>
                     <CUSTOM_OPENSSL_PREFIX>${openssl.prefix} </CUSTOM_OPENSSL_PREFIX>
                     <CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB>
diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
index b9287c0..771c685 100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -121,6 +121,7 @@ else ()
     ENDIF(REQUIRE_ZSTD)
 endif ()
 
+#Require ISA-L
 set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
 hadoop_set_find_shared_library_version("2")
 find_library(ISAL_LIBRARY
@@ -159,6 +160,25 @@ else (ISAL_LIBRARY)
     ENDIF(REQUIRE_ISAL)
 endif (ISAL_LIBRARY)
 
+# Build with PMDK library if -Drequire.pmdk option is specified.
+if(REQUIRE_PMDK)
+    set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
+    hadoop_set_find_shared_library_version("1")
+    find_library(PMDK_LIBRARY
+        NAMES pmem
+        PATHS ${CUSTOM_PMDK_LIB} /usr/lib /usr/lib64)
+    set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
+
+    if(PMDK_LIBRARY)
+        GET_FILENAME_COMPONENT(HADOOP_PMDK_LIBRARY ${PMDK_LIBRARY} NAME)
+        set(PMDK_SOURCE_FILES ${SRC}/io/nativeio/pmdk_load.c)
+    else(PMDK_LIBRARY)
+        MESSAGE(FATAL_ERROR "The required PMDK library is NOT found. PMDK_LIBRARY=${PMDK_LIBRARY}")
+    endif(PMDK_LIBRARY)
+else(REQUIRE_PMDK)
+    MESSAGE(STATUS "Build without PMDK support.")
+endif(REQUIRE_PMDK)
+
 # Build hardware CRC32 acceleration, if supported on the platform.
 if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
   set(BULK_CRC_ARCH_SOURCE_FIlE "${SRC}/util/bulk_crc32_x86.c")
@@ -256,6 +276,7 @@ hadoop_add_dual_library(hadoop
     ${SRC}/io/compress/zlib/ZlibDecompressor.c
     ${BZIP2_SOURCE_FILES}
     ${SRC}/io/nativeio/NativeIO.c
+    ${PMDK_SOURCE_FILES}
     ${SRC}/io/nativeio/errno_enum.c
     ${SRC}/io/nativeio/file_descriptor.c
     ${SRC}/io/nativeio/SharedFileDescriptorFactory.c
diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake
index 40aa467..7e23a5d 100644
--- a/hadoop-common-project/hadoop-common/src/config.h.cmake
+++ b/hadoop-common-project/hadoop-common/src/config.h.cmake
@@ -24,6 +24,7 @@
 #cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@"
 #cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
 #cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@"
+#cmakedefine HADOOP_PMDK_LIBRARY "@HADOOP_PMDK_LIBRARY@"
 #cmakedefine HAVE_SYNC_FILE_RANGE
 #cmakedefine HAVE_POSIX_FADVISE
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 4e0cd8f..1d0eab7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -100,6 +100,48 @@ public class NativeIO {
        write.  */
     public static int SYNC_FILE_RANGE_WAIT_AFTER = 4;
 
+    /**
+     * Keeps the support state of PMDK.
+     */
+    public enum SupportState {
+      UNSUPPORTED(-1),
+      PMDK_LIB_NOT_FOUND(1),
+      SUPPORTED(0);
+
+      private byte stateCode;
+      SupportState(int stateCode) {
+        this.stateCode = (byte) stateCode;
+      }
+
+      public int getStateCode() {
+        return stateCode;
+      }
+
+      public String getMessage() {
+        String msg;
+        switch (stateCode) {
+        case -1:
+          msg = "The native code is built without PMDK support.";
+          break;
+        case 1:
+          msg = "The native code is built with PMDK support, but PMDK libs " +
+              "are NOT found in execution environment or failed to be loaded.";
+          break;
+        case 0:
+          msg = "The native code is built with PMDK support, and PMDK libs " +
+              "are loaded successfully.";
+          break;
+        default:
+          msg = "The state code: " + stateCode + " is unrecognized!";
+        }
+        return msg;
+      }
+    }
+
+    // Denotes the state of supporting PMDK. The value is set by JNI.
+    private static SupportState pmdkSupportState =
+        SupportState.PMDK_LIB_NOT_FOUND;
+
     private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
 
     // Set to true via JNI if possible
@@ -124,6 +166,93 @@ public class NativeIO {
       POSIX.cacheManipulator = cacheManipulator;
     }
 
+    // This method is invoked by JNI.
+    public static void setPmdkSupportState(int stateCode) {
+      for (SupportState state : SupportState.values()) {
+        if (state.getStateCode() == stateCode) {
+          pmdkSupportState = state;
+          return;
+        }
+      }
+      LOG.error("The state code: " + stateCode + " is unrecognized!");
+    }
+
+    public static boolean isPmdkAvailable() {
+      LOG.info(pmdkSupportState.getMessage());
+      return pmdkSupportState == SupportState.SUPPORTED;
+    }
+
+    /**
+     * Denote memory region for a file mapped.
+     */
+    public static class PmemMappedRegion {
+      private long address;
+      private long length;
+      private boolean isPmem;
+
+      public PmemMappedRegion(long address, long length, boolean isPmem) {
+        this.address = address;
+        this.length = length;
+        this.isPmem = isPmem;
+      }
+
+      public boolean isPmem() {
+        return this.isPmem;
+      }
+
+      public long getAddress() {
+        return this.address;
+      }
+
+      public long getLength() {
+        return this.length;
+      }
+    }
+
+    /**
+     * JNI wrapper of persist memory operations.
+     */
+    public static class Pmem {
+      // check whether the address is a Pmem address or DIMM address
+      public static boolean isPmem(long address, long length) {
+        return NativeIO.POSIX.isPmemCheck(address, length);
+      }
+
+      // create a pmem file and memory map it
+      public static PmemMappedRegion mapBlock(String path, long length) {
+        return NativeIO.POSIX.pmemCreateMapFile(path, length);
+      }
+
+      // unmap a pmem file
+      public static boolean unmapBlock(long address, long length) {
+        return NativeIO.POSIX.pmemUnMap(address, length);
+      }
+
+      // copy data from disk file(src) to pmem file(dest), without flush
+      public static void memCopy(byte[] src, long dest, boolean isPmem,
+          long length) {
+        NativeIO.POSIX.pmemCopy(src, dest, isPmem, length);
+      }
+
+      // flush the memory content to persistent storage
+      public static void memSync(PmemMappedRegion region) {
+        if (region.isPmem()) {
+          NativeIO.POSIX.pmemDrain();
+        } else {
+          NativeIO.POSIX.pmemSync(region.getAddress(), region.getLength());
+        }
+      }
+    }
+
+    private static native boolean isPmemCheck(long address, long length);
+    private static native PmemMappedRegion pmemCreateMapFile(String path,
+        long length);
+    private static native boolean pmemUnMap(long address, long length);
+    private static native void pmemCopy(byte[] src, long dest, boolean isPmem,
+        long length);
+    private static native void pmemDrain();
+    private static native void pmemSync(long address, long length);
+
     /**
      * Used to manipulate the operating system cache.
      */
@@ -143,8 +272,8 @@ public class NativeIO {
       }
 
       public void posixFadviseIfPossible(String identifier,
-        FileDescriptor fd, long offset, long len, int flags)
-            throws NativeIOException {
+          FileDescriptor fd, long offset, long len, int flags)
+          throws NativeIOException {
         NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
             len, flags);
       }
@@ -748,7 +877,7 @@ public class NativeIO {
    * user account name, of the format DOMAIN\UserName. This method
    * will remove the domain part of the full logon name.
    *
-   * @param Fthe full principal name containing the domain
+   * @param name the full principal name containing the domain
    * @return name with domain removed
    */
   private static String stripDomain(String name) {
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
index 2274d57..3a0641b 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
@@ -36,6 +36,10 @@
 #include <sys/resource.h>
 #include <sys/stat.h>
 #include <sys/syscall.h>
+#ifdef HADOOP_PMDK_LIBRARY
+#include <libpmem.h>
+#include "pmdk_load.h"
+#endif
 #if !(defined(__FreeBSD__) || defined(__MACH__))
 #include <sys/sendfile.h>
 #endif
@@ -60,6 +64,7 @@
 
 #define NATIVE_IO_POSIX_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX"
 #define NATIVE_IO_STAT_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat"
+#define NATIVE_IO_POSIX_PMEMREGION_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$PmemMappedRegion"
 
 #define SET_INT_OR_RETURN(E, C, F) \
   { \
@@ -81,6 +86,12 @@ static jmethodID nioe_ctor;
 // Please see HADOOP-7156 for details.
 jobject pw_lock_object;
 
+#ifdef HADOOP_PMDK_LIBRARY
+// the NativeIO$POSIX$PmemMappedRegion inner class and its constructor
+static jclass pmem_region_clazz = NULL;
+static jmethodID pmem_region_ctor = NULL;
+#endif
+
 /*
  * Throw a java.IO.IOException, generating the message from errno.
  * NB. this is also used form windows_secure_container_executor.c
@@ -269,6 +280,63 @@ static void nioe_deinit(JNIEnv *env) {
   nioe_ctor = NULL;
 }
 
+#ifdef HADOOP_PMDK_LIBRARY
+static int loadPmdkLib(JNIEnv *env) {
+  char errMsg[1024];
+  jclass clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_CLASS);
+  if (clazz == NULL) {
+    return 0; // exception has been raised
+  }
+  load_pmdk_lib(errMsg, sizeof(errMsg));
+  jmethodID mid = (*env)->GetStaticMethodID(env, clazz, "setPmdkSupportState", "(I)V");
+  if (mid == 0) {
+    return 0;
+  }
+  if (strlen(errMsg) > 0) {
+    (*env)->CallStaticVoidMethod(env, clazz, mid, 1);
+    return 0;
+  }
+  (*env)->CallStaticVoidMethod(env, clazz, mid, 0);
+  return 1;
+}
+
+static void pmem_region_init(JNIEnv *env, jclass nativeio_class) {
+
+  jclass clazz = NULL;
+  // Init Stat
+  clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_PMEMREGION_CLASS);
+  if (!clazz) {
+    THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion class");
+    return; // exception has been raised
+  }
+
+  // Init PmemMappedRegion class
+  pmem_region_clazz = (*env)->NewGlobalRef(env, clazz);
+  if (!pmem_region_clazz) {
+    THROW(env, "java/io/IOException", "Failed to new global reference of PmemMappedRegion class");
+    return; // exception has been raised
+  }
+
+  pmem_region_ctor = (*env)->GetMethodID(env, pmem_region_clazz, "<init>", "(JJZ)V");
+  if (!pmem_region_ctor) {
+    THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion constructor");
+    return; // exception has been raised
+  }
+}
+
+static void pmem_region_deinit(JNIEnv *env) {
+  if (pmem_region_ctor != NULL) {
+    (*env)->DeleteGlobalRef(env, pmem_region_ctor);
+    pmem_region_ctor = NULL;
+  }
+
+  if (pmem_region_clazz != NULL) {
+    (*env)->DeleteGlobalRef(env, pmem_region_clazz);
+    pmem_region_clazz = NULL;
+  }
+ }
+#endif
+
 /*
  * private static native void initNative();
  *
@@ -292,6 +360,11 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative(
 #ifdef UNIX
   errno_enum_init(env);
   PASS_EXCEPTIONS_GOTO(env, error);
+#ifdef HADOOP_PMDK_LIBRARY
+  if (loadPmdkLib(env)) {
+    pmem_region_init(env, clazz);
+  }
+#endif
 #endif
   return;
 error:
@@ -299,6 +372,9 @@ error:
   // class wasn't initted yet
 #ifdef UNIX
   stat_deinit(env);
+#ifdef HADOOP_PMDK_LIBRARY
+  pmem_region_deinit(env);
+#endif
 #endif
   nioe_deinit(env);
   fd_deinit(env);
@@ -1383,3 +1459,179 @@ cleanup:
 /**
  * vim: sw=2: ts=2: et:
  */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method:    isPmemCheck
+ * Signature: (JJ)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_isPmemCheck(
+JNIEnv *env, jclass thisClass, jlong address, jlong length) {
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    jint is_pmem = pmdkLoader->pmem_is_pmem(address, length);
+    return (is_pmem) ? JNI_TRUE : JNI_FALSE;
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function isPmemCheck is not supported.");
+    return JNI_FALSE;
+  #endif
+  }
+
+/*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method:    pmemCreateMapFile
+ * Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCreateMapFile(
+JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    /* create a pmem file and memory map it */
+    const char * path = NULL;
+    void * pmemaddr = NULL;
+    size_t mapped_len = 0;
+    int is_pmem = 1;
+    char msg[1000];
+
+    path = (*env)->GetStringUTFChars(env, filePath, NULL);
+    if (!path) {
+      THROW(env, "java/lang/IllegalArgumentException", "File Path cannot be null");
+      return NULL;
+    }
+
+    if (fileLength <= 0) {
+      (*env)->ReleaseStringUTFChars(env, filePath, path);
+      THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
+      return NULL;
+    }
+
+    pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
+        0666, &mapped_len, &is_pmem);
+
+    if (!pmemaddr) {
+      snprintf(msg, sizeof(msg), "Failed to create pmem file. file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
+      THROW(env, "java/io/IOException", msg);
+      (*env)->ReleaseStringUTFChars(env, filePath, path);
+      return NULL;
+    }
+
+    if (fileLength != mapped_len) {
+      snprintf(msg, sizeof(msg), "Mapped length doesn't match the request length. file :%s, request length:%x, returned length:%x, error msg:%s", path, fileLength, mapped_len, pmem_errormsg());
+      THROW(env, "java/io/IOException", msg);
+      (*env)->ReleaseStringUTFChars(env, filePath, path);
+      return NULL;
+    }
+
+    (*env)->ReleaseStringUTFChars(env, filePath, path);
+
+    if ((!pmem_region_clazz) || (!pmem_region_ctor)) {
+      THROW(env, "java/io/IOException", "PmemMappedRegion class or constructor is NULL");
+      return NULL;
+    }
+
+    jobject ret = (*env)->NewObject(env, pmem_region_clazz, pmem_region_ctor, pmemaddr, mapped_len, (jboolean)is_pmem);
+    return ret;
+
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function pmemCreateMapFile is not supported.");
+    return NULL;
+  #endif
+  }
+
+/*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method:    pmemUnMap
+ * Signature: (JJ)V
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemUnMap(
+JNIEnv *env, jclass thisClass, jlong address, jlong length) {
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    int succeed = 0;
+    char msg[1000];
+    succeed = pmdkLoader->pmem_unmap(address, length);
+    // succeed = -1 failure; succeed = 0 success
+    if (succeed != 0) {
+      snprintf(msg, sizeof(msg), "Failed to unmap region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
+      THROW(env, "java/io/IOException", msg);
+      return JNI_FALSE;
+    } else {
+      return JNI_TRUE;
+    }
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function pmemUnMap is not supported.");
+    return JNI_FALSE;
+  #endif
+  }
+
+/*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method:    pmemCopy
+ * Signature: ([BJJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCopy(
+JNIEnv *env, jclass thisClass, jbyteArray buf, jlong address, jboolean is_pmem, jlong length) {
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    char msg[1000];
+    jbyte* srcBuf = (*env)->GetByteArrayElements(env, buf, 0);
+    snprintf(msg, sizeof(msg), "Pmem copy content. dest: %x, length: %x, src: %x ", address, length, srcBuf);
+    if (is_pmem) {
+      pmdkLoader->pmem_memcpy_nodrain(address, srcBuf, length);
+    } else {
+      memcpy(address, srcBuf, length);
+    }
+    (*env)->ReleaseByteArrayElements(env, buf, srcBuf, 0);
+    return;
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function pmemCopy is not supported.");
+  #endif
+  }
+
+/*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO
+ * Method:    pmemDrain
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemDrain(
+JNIEnv *env, jclass thisClass) {
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    pmdkLoader->pmem_drain();
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function pmemDrain is not supported.");
+  #endif
+  }
+
+  /*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method:    pmemSync
+ * Signature: (JJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemSync
+  (JNIEnv * env, jclass thisClass, jlong address, jlong length) {
+
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    int succeed = 0;
+    char msg[1000];
+    succeed = pmdkLoader->pmem_msync(address, length);
+    // succeed = -1 failure
+    if (succeed = -1) {
+      snprintf(msg, sizeof(msg), "Failed to msync region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
+      THROW(env, "java/io/IOException", msg);
+      return;
+    }
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function pmemSync is not supported.");
+  #endif
+  }
+
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
new file mode 100644
index 0000000..f7d6cfb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
@@ -0,0 +1,106 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "org_apache_hadoop.h"
+#include "pmdk_load.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
+
+#ifdef UNIX
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dlfcn.h>
+
+#include "config.h"
+#endif
+
+PmdkLibLoader * pmdkLoader;
+
+/**
+ *  pmdk_load.c
+ *  Utility of loading the libpmem library and the required functions.
+ *  Building of this codes won't rely on any libpmem source codes, but running
+ *  into this will rely on successfully loading of the dynamic library.
+ *
+ */
+
+static const char* load_functions() {
+#ifdef UNIX
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_map_file), "pmem_map_file");
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_unmap), "pmem_unmap");
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_is_pmem), "pmem_is_pmem");
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_drain), "pmem_drain");
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_memcpy_nodrain), "pmem_memcpy_nodrain");
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_msync), "pmem_msync");
+#endif
+  return NULL;
+}
+
+void load_pmdk_lib(char* err, size_t err_len) {
+  const char* errMsg;
+  const char* library = NULL;
+#ifdef UNIX
+  Dl_info dl_info;
+#else
+  LPTSTR filename = NULL;
+#endif
+
+  err[0] = '\0';
+
+  if (pmdkLoader != NULL) {
+    return;
+  }
+  pmdkLoader = calloc(1, sizeof(PmdkLibLoader));
+
+  // Load PMDK library
+  #ifdef UNIX
+  pmdkLoader->libec = dlopen(HADOOP_PMDK_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+  if (pmdkLoader->libec == NULL) {
+    snprintf(err, err_len, "Failed to load %s (%s)",
+        HADOOP_PMDK_LIBRARY, dlerror());
+    return;
+  }
+  // Clear any existing error
+  dlerror();
+  #endif
+  errMsg = load_functions(pmdkLoader->libec);
+  if (errMsg != NULL) {
+    snprintf(err, err_len, "Loading functions from PMDK failed: %s", errMsg);
+  }
+
+#ifdef UNIX
+  if(dladdr(pmdkLoader->pmem_map_file, &dl_info)) {
+    library = dl_info.dli_fname;
+  }
+#else
+  if (GetModuleFileName(pmdkLoader->libec, filename, 256) > 0) {
+    library = filename;
+  }
+#endif
+
+  if (library == NULL) {
+    library = HADOOP_PMDK_LIBRARY;
+  }
+
+  pmdkLoader->libname = strdup(library);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
new file mode 100644
index 0000000..c93a076
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
@@ -0,0 +1,95 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "org_apache_hadoop.h"
+
+#ifdef UNIX
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dlfcn.h>
+#endif
+
+#ifndef _PMDK_LOAD_H_
+#define _PMDK_LOAD_H_
+
+
+#ifdef UNIX
+// For libpmem.h
+typedef void * (*__d_pmem_map_file)(const char *path, size_t len, int flags, mode_t mode,
+    size_t *mapped_lenp, int *is_pmemp);
+typedef int (* __d_pmem_unmap)(void *addr, size_t len);
+typedef int (*__d_pmem_is_pmem)(const void *addr, size_t len);
+typedef void (*__d_pmem_drain)(void);
+typedef void * (*__d_pmem_memcpy_nodrain)(void *pmemdest, const void *src, size_t len);
+typedef int (* __d_pmem_msync)(const void *addr, size_t len);
+
+#endif
+
+typedef struct __PmdkLibLoader {
+  // The loaded library handle
+  void* libec;
+  char* libname;
+  __d_pmem_map_file pmem_map_file;
+  __d_pmem_unmap pmem_unmap;
+  __d_pmem_is_pmem pmem_is_pmem;
+  __d_pmem_drain pmem_drain;
+  __d_pmem_memcpy_nodrain pmem_memcpy_nodrain;
+  __d_pmem_msync pmem_msync;
+} PmdkLibLoader;
+
+extern PmdkLibLoader * pmdkLoader;
+
+/**
+ * A helper function to dlsym a 'symbol' from a given library-handle.
+ */
+
+#ifdef UNIX
+
+static __attribute__ ((unused))
+void *myDlsym(void *handle, const char *symbol) {
+  void *func_ptr = dlsym(handle, symbol);
+  return func_ptr;
+}
+
+/* A helper macro to dlsym the requisite dynamic symbol in NON-JNI env. */
+#define PMDK_LOAD_DYNAMIC_SYMBOL(func_ptr, symbol) \
+  if ((func_ptr = myDlsym(pmdkLoader->libec, symbol)) == NULL) { \
+    return "Failed to load symbol" symbol; \
+  }
+
+#endif
+
+/**
+ * Return 0 if not support, 1 otherwise.
+ */
+int build_support_pmdk();
+
+/**
+ * Initialize and load PMDK library, returning error message if any.
+ *
+ * @param err     The err message buffer.
+ * @param err_len The length of the message buffer.
+ */
+void load_pmdk_lib(char* err, size_t err_len);
+
+#endif //_PMDK_LOAD_H_
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
index 6b3c232..a14928c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
@@ -25,6 +25,8 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
@@ -782,4 +784,155 @@ public class TestNativeIO {
     assertTrue("Native POSIX_FADV_NOREUSE const not set",
       POSIX_FADV_NOREUSE >= 0);
   }
+
+
+  @Test (timeout=10000)
+  public void testPmemCheckParameters() {
+    assumeNotWindows("Native PMDK not supported on Windows");
+    // Skip testing while the build or environment does not support PMDK
+    assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+    // Please make sure /mnt/pmem0 is a persistent memory device with total
+    // volume size 'volumeSize'
+    String filePath = "/$:";
+    long length = 0;
+    long volumnSize = 16 * 1024 * 1024 * 1024L;
+
+    // Incorrect file length
+    try {
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      fail("Illegal length parameter should be detected");
+    } catch (Exception e) {
+      LOG.info(e.getMessage());
+    }
+
+    // Incorrect file length
+    filePath = "/mnt/pmem0/test_native_io";
+    length = -1L;
+    try {
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      fail("Illegal length parameter should be detected");
+    }catch (Exception e) {
+      LOG.info(e.getMessage());
+    }
+  }
+
+  @Test (timeout=10000)
+  public void testPmemMapMultipleFiles() {
+    assumeNotWindows("Native PMDK not supported on Windows");
+    // Skip testing while the build or environment does not support PMDK
+    assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+    // Please make sure /mnt/pmem0 is a persistent memory device with total
+    // volume size 'volumeSize'
+    String filePath = "/mnt/pmem0/test_native_io";
+    long length = 0;
+    long volumnSize = 16 * 1024 * 1024 * 1024L;
+
+    // Multiple files, each with 128MB size, aggregated size exceeds volume
+    // limit 16GB
+    length = 128 * 1024 * 1024L;
+    long fileNumber = volumnSize / length;
+    LOG.info("File number = " + fileNumber);
+    for (int i = 0; i < fileNumber; i++) {
+      String path = filePath + i;
+      LOG.info("File path = " + path);
+      NativeIO.POSIX.Pmem.mapBlock(path, length);
+    }
+    try {
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      fail("Request map extra file when persistent memory is all occupied");
+    } catch (Exception e) {
+      LOG.info(e.getMessage());
+    }
+  }
+
+  @Test (timeout=10000)
+  public void testPmemMapBigFile() {
+    assumeNotWindows("Native PMDK not supported on Windows");
+    // Skip testing while the build or environment does not support PMDK
+    assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+    // Please make sure /mnt/pmem0 is a persistent memory device with total
+    // volume size 'volumeSize'
+    String filePath = "/mnt/pmem0/test_native_io_big";
+    long length = 0;
+    long volumeSize = 16 * 1024 * 1024 * 1024L;
+
+    // One file length exceeds persistent memory volume 16GB.
+    length = volumeSize + 1024L;
+    try {
+      LOG.info("File length = " + length);
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      fail("File length exceeds persistent memory total volume size");
+    }catch (Exception e) {
+      LOG.info(e.getMessage());
+      deletePmemMappedFile(filePath);
+    }
+  }
+
+  @Test (timeout=10000)
+  public void testPmemCopy() throws IOException {
+    assumeNotWindows("Native PMDK not supported on Windows");
+    // Skip testing while the build or environment does not support PMDK
+    assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+    // Create and map a block file. Please make sure /mnt/pmem0 is a persistent
+    // memory device.
+    String filePath = "/mnt/pmem0/copy";
+    long length = 4096;
+    PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+    assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length));
+    assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100));
+    assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length));
+    assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() - 100, length));
+
+    // Copy content to mapped file
+    byte[] data = generateSequentialBytes(0, (int) length);
+    NativeIO.POSIX.Pmem.memCopy(data, region.getAddress(), region.isPmem(),
+        length);
+
+    // Read content before pmemSync
+    byte[] readBuf1 = new byte[(int)length];
+    IOUtils.readFully(new FileInputStream(filePath), readBuf1, 0, (int)length);
+    assertArrayEquals(data, readBuf1);
+
+    byte[] readBuf2 = new byte[(int)length];
+    // Sync content to persistent memory twice
+    NativeIO.POSIX.Pmem.memSync(region);
+    NativeIO.POSIX.Pmem.memSync(region);
+    // Read content after pmemSync twice
+    IOUtils.readFully(new FileInputStream(filePath), readBuf2, 0, (int)length);
+    assertArrayEquals(data, readBuf2);
+
+    //Read content after unmap twice
+    NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
+    NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
+    byte[] readBuf3 = new byte[(int)length];
+    IOUtils.readFully(new FileInputStream(filePath), readBuf3, 0, (int)length);
+    assertArrayEquals(data, readBuf3);
+  }
+
+  private static byte[] generateSequentialBytes(int start, int length) {
+    byte[] result = new byte[length];
+
+    for (int i = 0; i < length; i++) {
+      result[i] = (byte) ((start + i) % 127);
+    }
+    return result;
+  }
+
+  private static void deletePmemMappedFile(String filePath) {
+    try {
+      if (filePath != null) {
+        boolean result = Files.deleteIfExists(Paths.get(filePath));
+        if (!result) {
+          throw new IOException();
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("Failed to delete the mapped file " + filePath +
+          " from persistent memory", e);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 4fab214..37e548e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -215,6 +215,28 @@ public class FsDatasetCache {
   }
 
   /**
+   * Get cache address on persistent memory for read operation.
+   * The cache address comes from PMDK lib function when mapping
+   * block to persistent memory.
+   *
+   * @param bpid    blockPoolId
+   * @param blockId blockId
+   * @return address
+   */
+  long getCacheAddress(String bpid, long blockId) {
+    if (cacheLoader.isTransientCache() ||
+        !isCached(bpid, blockId)) {
+      return -1;
+    }
+    if (!(cacheLoader.isNativeLoader())) {
+      return -1;
+    }
+    ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
+    MappableBlock mappableBlock = mappableBlockMap.get(key).mappableBlock;
+    return mappableBlock.getAddress();
+  }
+
+  /**
    * @return List of cached blocks suitable for translation into a
    * {@link BlockListAsLongs} for a cache report.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 801b4c6..ee76f2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -815,6 +815,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     String cachePath = cacheManager.getReplicaCachePath(
         b.getBlockPoolId(), b.getBlockId());
     if (cachePath != null) {
+      long addr = cacheManager.getCacheAddress(
+          b.getBlockPoolId(), b.getBlockId());
+      if (addr != -1) {
+        LOG.debug("Get InputStream by cache address.");
+        return FsDatasetUtil.getDirectInputStream(
+            addr, info.getBlockDataLength());
+      }
+      LOG.debug("Get InputStream by cache file path.");
       return FsDatasetUtil.getInputStreamAndSeek(
           new File(cachePath), seekOffset);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 92c0888..c2bc703 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -26,7 +26,10 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -43,6 +46,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.shaded.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
 
 /** Utility methods. */
 @InterfaceAudience.Private
@@ -132,6 +136,24 @@ public class FsDatasetUtil {
     }
   }
 
+  public static InputStream getDirectInputStream(long addr, long length)
+      throws IOException {
+    try {
+      Class<?> directByteBufferClass =
+          Class.forName("java.nio.DirectByteBuffer");
+      Constructor<?> constructor =
+          directByteBufferClass.getDeclaredConstructor(long.class, int.class);
+      constructor.setAccessible(true);
+      ByteBuffer byteBuffer =
+          (ByteBuffer) constructor.newInstance(addr, (int)length);
+      return new ByteBufferBackedInputStream(byteBuffer);
+    } catch (ClassNotFoundException | NoSuchMethodException |
+        IllegalAccessException | InvocationTargetException |
+        InstantiationException e) {
+      throw new IOException(e);
+    }
+  }
+
   /**
    * Find the meta-file for the specified block file and then return the
    * generation stamp from the name of the meta-file. Generally meta file will
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
index 0fff327..a00c442 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
@@ -35,4 +35,10 @@ public interface MappableBlock extends Closeable {
    * @return the number of bytes that have been cached.
    */
   long getLength();
+
+  /**
+   * Get cache address if applicable.
+   * Return -1 if not applicable.
+   */
+  long getAddress();
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 3ec8416..5b9ba3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -64,8 +64,7 @@ public abstract class MappableBlockLoader {
    * @return               The Mappable block.
    */
   abstract MappableBlock load(long length, FileInputStream blockIn,
-                              FileInputStream metaIn, String blockFileName,
-                              ExtendedBlockId key)
+      FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
       throws IOException;
 
   /**
@@ -107,6 +106,11 @@ public abstract class MappableBlockLoader {
   abstract boolean isTransientCache();
 
   /**
+   * Check whether this is a native pmem cache loader.
+   */
+  abstract boolean isNativeLoader();
+
+  /**
    * Clean up cache, can be used during DataNode shutdown.
    */
   void shutdown() {
@@ -117,8 +121,7 @@ public abstract class MappableBlockLoader {
    * Verifies the block's checksum. This is an I/O intensive operation.
    */
   protected void verifyChecksum(long length, FileInputStream metaIn,
-                                FileChannel blockChannel, String blockFileName)
-      throws IOException {
+      FileChannel blockChannel, String blockFileName) throws IOException {
     // Verify the checksum from the block's meta file
     // Get the DataChecksum from the meta file header
     BlockMetadataHeader header =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
index 43b1b53..6569373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.io.nativeio.NativeIO;
 
 /**
  * Creates MappableBlockLoader.
@@ -42,6 +43,9 @@ public final class MappableBlockLoaderFactory {
     if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) {
       return new MemoryMappableBlockLoader();
     }
+    if (NativeIO.isAvailable() && NativeIO.POSIX.isPmdkAvailable()) {
+      return new NativePmemMappableBlockLoader();
+    }
     return new PmemMappableBlockLoader();
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 52d8d93..dd4188c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -66,8 +66,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
    */
   @Override
   MappableBlock load(long length, FileInputStream blockIn,
-                            FileInputStream metaIn, String blockFileName,
-                            ExtendedBlockId key)
+      FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
       throws IOException {
     MemoryMappedBlock mappableBlock = null;
     MappedByteBuffer mmap = null;
@@ -116,4 +115,9 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
   public boolean isTransientCache() {
     return true;
   }
+
+  @Override
+  public boolean isNativeLoader() {
+    return false;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
index c09ad1a..47dfeae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
@@ -45,6 +45,11 @@ public class MemoryMappedBlock implements MappableBlock {
   }
 
   @Override
+  public long getAddress() {
+    return -1L;
+  }
+
+  @Override
   public void close() {
     if (mmap != null) {
       NativeIO.POSIX.munmap(mmap);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
similarity index 53%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
index 3ec8416..09e9454 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
@@ -24,7 +24,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
 import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
@@ -34,21 +38,26 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
 /**
- * Maps block to DataNode cache region.
+ * Map block to persistent memory with native PMDK libs.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public abstract class MappableBlockLoader {
+public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NativePmemMappableBlockLoader.class);
 
-  /**
-   * Initialize a specific MappableBlockLoader.
-   */
-  abstract void initialize(FsDatasetCache cacheManager) throws IOException;
+  @Override
+  void initialize(FsDatasetCache cacheManager) throws IOException {
+    super.initialize(cacheManager);
+  }
 
   /**
    * Load the block.
    *
-   * Map the block, and then verify its checksum.
+   * Map the block and verify its checksum.
+   *
+   * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
+   * is a persistent memory volume chosen by PmemVolumeManager.
    *
    * @param length         The current length of the block.
    * @param blockIn        The block input stream. Should be positioned at the
@@ -58,67 +67,62 @@ public abstract class MappableBlockLoader {
    * @param blockFileName  The block file name, for logging purposes.
    * @param key            The extended block ID.
    *
-   * @throws IOException   If mapping block to cache region fails or checksum
-   *                       fails.
+   * @throws IOException   If mapping block to persistent memory fails or
+   *                       checksum fails.
    *
    * @return               The Mappable block.
    */
-  abstract MappableBlock load(long length, FileInputStream blockIn,
-                              FileInputStream metaIn, String blockFileName,
-                              ExtendedBlockId key)
-      throws IOException;
-
-  /**
-   * Try to reserve some given bytes.
-   *
-   * @param key           The ExtendedBlockId for a block.
-   *
-   * @param bytesCount    The number of bytes to add.
-   *
-   * @return              The new number of usedBytes if we succeeded;
-   *                      -1 if we failed.
-   */
-  abstract long reserve(ExtendedBlockId key, long bytesCount);
-
-  /**
-   * Release some bytes that we're using.
-   *
-   * @param key           The ExtendedBlockId for a block.
-   *
-   * @param bytesCount    The number of bytes to release.
-   *
-   * @return              The new number of usedBytes.
-   */
-  abstract long release(ExtendedBlockId key, long bytesCount);
-
-  /**
-   * Get the approximate amount of cache space used.
-   */
-  abstract long getCacheUsed();
-
-  /**
-   * Get the maximum amount of cache bytes.
-   */
-  abstract long getCacheCapacity();
+  @Override
+  public MappableBlock load(long length, FileInputStream blockIn,
+      FileInputStream metaIn, String blockFileName,
+      ExtendedBlockId key)
+      throws IOException {
+    NativePmemMappedBlock mappableBlock = null;
+    POSIX.PmemMappedRegion region = null;
+    String filePath = null;
 
-  /**
-   * Check whether the cache is non-volatile.
-   */
-  abstract boolean isTransientCache();
+    FileChannel blockChannel = null;
+    try {
+      blockChannel = blockIn.getChannel();
+      if (blockChannel == null) {
+        throw new IOException("Block InputStream has no FileChannel.");
+      }
 
-  /**
-   * Clean up cache, can be used during DataNode shutdown.
-   */
-  void shutdown() {
-    // Do nothing.
+      assert NativeIO.isAvailable();
+      filePath = PmemVolumeManager.getInstance().getCachePath(key);
+      region = POSIX.Pmem.mapBlock(filePath, length);
+      if (region == null) {
+        throw new IOException("Failed to map the block " + blockFileName +
+            " to persistent storage.");
+      }
+      verifyChecksumAndMapBlock(region, length, metaIn, blockChannel,
+          blockFileName);
+      mappableBlock = new NativePmemMappedBlock(region.getAddress(),
+          region.getLength(), key);
+      LOG.info("Successfully cached one replica:{} into persistent memory"
+          + ", [cached path={}, address={}, length={}]", key, filePath,
+          region.getAddress(), length);
+    } finally {
+      IOUtils.closeQuietly(blockChannel);
+      if (mappableBlock == null) {
+        if (region != null) {
+          // unmap content from persistent memory
+          POSIX.Pmem.unmapBlock(region.getAddress(),
+              region.getLength());
+          FsDatasetUtil.deleteMappedFile(filePath);
+        }
+      }
+    }
+    return mappableBlock;
   }
 
   /**
-   * Verifies the block's checksum. This is an I/O intensive operation.
+   * Verifies the block's checksum meanwhile map block to persistent memory.
+   * This is an I/O intensive operation.
    */
-  protected void verifyChecksum(long length, FileInputStream metaIn,
-                                FileChannel blockChannel, String blockFileName)
-      throws IOException {
+  private void verifyChecksumAndMapBlock(POSIX.PmemMappedRegion region,
+      long length, FileInputStream metaIn, FileChannel blockChannel,
+      String blockFileName) throws IOException {
     // Verify the checksum from the block's meta file
     // Get the DataChecksum from the meta file header
     BlockMetadataHeader header =
@@ -129,8 +133,8 @@ public abstract class MappableBlockLoader {
     try {
       metaChannel = metaIn.getChannel();
       if (metaChannel == null) {
-        throw new IOException(
-            "Block InputStream meta file has no FileChannel.");
+        throw new IOException("Cannot get FileChannel" +
+            " from Block InputStream meta file.");
       }
       DataChecksum checksum = header.getChecksum();
       final int bytesPerChecksum = checksum.getBytesPerChecksum();
@@ -140,13 +144,19 @@ public abstract class MappableBlockLoader {
       ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
       // Verify the checksum
       int bytesVerified = 0;
+      long mappedAddress = -1L;
+      if (region != null) {
+        mappedAddress = region.getAddress();
+      }
       while (bytesVerified < length) {
         Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
-            "Unexpected partial chunk before EOF");
+            "Unexpected partial chunk before EOF.");
         assert bytesVerified % bytesPerChecksum == 0;
         int bytesRead = fillBuffer(blockChannel, blockBuf);
         if (bytesRead == -1) {
-          throw new IOException("checksum verification failed: premature EOF");
+          throw new IOException(
+              "Checksum verification failed for the block " + blockFileName +
+                  ": premature EOF");
         }
         blockBuf.flip();
         // Number of read chunks, including partial chunk at end
@@ -158,32 +168,24 @@ public abstract class MappableBlockLoader {
             bytesVerified);
         // Success
         bytesVerified += bytesRead;
+        // Copy data to persistent file
+        POSIX.Pmem.memCopy(blockBuf.array(), mappedAddress,
+            region.isPmem(), bytesRead);
+        mappedAddress += bytesRead;
+        // Clear buffer
         blockBuf.clear();
         checksumBuf.clear();
       }
+      if (region != null) {
+        POSIX.Pmem.memSync(region);
+      }
     } finally {
       IOUtils.closeQuietly(metaChannel);
     }
   }
 
-  /**
-   * Reads bytes into a buffer until EOF or the buffer's limit is reached.
-   */
-  protected int fillBuffer(FileChannel channel, ByteBuffer buf)
-      throws IOException {
-    int bytesRead = channel.read(buf);
-    if (bytesRead < 0) {
-      //EOF
-      return bytesRead;
-    }
-    while (buf.remaining() > 0) {
-      int n = channel.read(buf);
-      if (n < 0) {
-        //EOF
-        return bytesRead;
-      }
-      bytesRead += n;
-    }
-    return bytesRead;
+  @Override
+  public boolean isNativeLoader() {
+    return true;
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
similarity index 52%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
index 25c3d40..92012b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
@@ -21,25 +21,29 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
 /**
- * Represents an HDFS block that is mapped to persistent memory by DataNode
- * with mapped byte buffer. PMDK is NOT involved in this implementation.
+ * Represents an HDFS block that is mapped to persistent memory by the DataNode.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class PmemMappedBlock implements MappableBlock {
+public class NativePmemMappedBlock implements MappableBlock {
   private static final Logger LOG =
-      LoggerFactory.getLogger(PmemMappedBlock.class);
+      LoggerFactory.getLogger(NativePmemMappedBlock.class);
+
+  private long pmemMappedAddress = -1L;
   private long length;
   private ExtendedBlockId key;
 
-  PmemMappedBlock(long length, ExtendedBlockId key) {
+  NativePmemMappedBlock(long pmemMappedAddress, long length,
+      ExtendedBlockId key) {
     assert length > 0;
+    this.pmemMappedAddress = pmemMappedAddress;
     this.length = length;
     this.key = key;
   }
@@ -50,15 +54,32 @@ public class PmemMappedBlock implements MappableBlock {
   }
 
   @Override
+  public long getAddress() {
+    return pmemMappedAddress;
+  }
+
+  @Override
   public void close() {
-    String cacheFilePath =
-        PmemVolumeManager.getInstance().getCachePath(key);
-    try {
-      FsDatasetUtil.deleteMappedFile(cacheFilePath);
-      LOG.info("Successfully uncached one replica:{} from persistent memory"
-          + ", [cached path={}, length={}]", key, cacheFilePath, length);
-    } catch (IOException e) {
-      LOG.warn("Failed to delete the mapped File: {}!", cacheFilePath, e);
+    if (pmemMappedAddress != -1L) {
+      String cacheFilePath =
+          PmemVolumeManager.getInstance().getCachePath(key);
+      try {
+        // Current libpmem will report error when pmem_unmap is called with
+        // length not aligned with page size, although the length is returned
+        // by pmem_map_file.
+        boolean success =
+            NativeIO.POSIX.Pmem.unmapBlock(pmemMappedAddress, length);
+        if (!success) {
+          throw new IOException("Failed to unmap the mapped file from " +
+              "pmem address: " + pmemMappedAddress);
+        }
+        pmemMappedAddress = -1L;
+        FsDatasetUtil.deleteMappedFile(cacheFilePath);
+        LOG.info("Successfully uncached one replica:{} from persistent memory"
+            + ", [cached path={}, length={}]", key, cacheFilePath, length);
+      } catch (IOException e) {
+        LOG.warn("IOException occurred for block {}!", key, e);
+      }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index 239fff8..70a42c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -43,7 +43,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
 
   @Override
   void initialize(FsDatasetCache cacheManager) throws IOException {
-    LOG.info("Initializing cache loader: PmemMappableBlockLoader.");
+    LOG.info("Initializing cache loader: " + this.getClass().getName());
     DNConf dnConf = cacheManager.getDnConf();
     PmemVolumeManager.init(dnConf.getPmemVolumes());
     pmemVolumeManager = PmemVolumeManager.getInstance();
@@ -71,8 +71,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
    */
   @Override
   MappableBlock load(long length, FileInputStream blockIn,
-                     FileInputStream metaIn, String blockFileName,
-                     ExtendedBlockId key)
+      FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
       throws IOException {
     PmemMappedBlock mappableBlock = null;
     String cachePath = null;
@@ -133,6 +132,11 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
+  public boolean isNativeLoader() {
+    return false;
+  }
+
+  @Override
   void shutdown() {
     LOG.info("Clean up cache on persistent memory during shutdown.");
     PmemVolumeManager.getInstance().cleanup();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
index 25c3d40..a49626a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
@@ -50,6 +50,11 @@ public class PmemMappedBlock implements MappableBlock {
   }
 
   @Override
+  public long getAddress() {
+    return -1L;
+  }
+
+  @Override
   public void close() {
     String cacheFilePath =
         PmemVolumeManager.getInstance().getCachePath(key);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/10: HDFS-14354: Refactor MappableBlock to align with the implementation of SCM cache. Contributed by Feilong He.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 3f6f095c066f5f72e1883a3927dbad8b6b5b3cbf
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Thu Mar 14 22:21:08 2019 -0700

    HDFS-14354: Refactor MappableBlock to align with the implementation of SCM cache. Contributed by Feilong He.
    
    (cherry picked from commit ba50a36a3ead628c3d44d384f7ed4d2b3a55dd07)
---
 .../datanode/fsdataset/impl/FsDatasetCache.java    |  15 +-
 .../datanode/fsdataset/impl/MappableBlock.java     | 155 +--------------------
 .../fsdataset/impl/MappableBlockLoader.java        |  80 +++++++++++
 ...leBlock.java => MemoryMappableBlockLoader.java} | 111 +++++----------
 .../datanode/fsdataset/impl/MemoryMappedBlock.java |  54 +++++++
 5 files changed, 184 insertions(+), 231 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 767b150..9efd11a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -227,6 +227,8 @@ public class FsDatasetCache {
    */
   private final long maxBytes;
 
+  private final MappableBlockLoader mappableBlockLoader;
+
   /**
    * Number of cache commands that could not be completed successfully
    */
@@ -236,7 +238,7 @@ public class FsDatasetCache {
    */
   final AtomicLong numBlocksFailedToUncache = new AtomicLong(0);
 
-  public FsDatasetCache(FsDatasetImpl dataset) {
+  public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
     this.dataset = dataset;
     this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
     ThreadFactory workerFactory = new ThreadFactoryBuilder()
@@ -268,6 +270,7 @@ public class FsDatasetCache {
               ".  Reconfigure this to " + minRevocationPollingMs);
     }
     this.revocationPollingMs = confRevocationPollingMs;
+    this.mappableBlockLoader = new MemoryMappableBlockLoader();
   }
 
   /**
@@ -461,14 +464,14 @@ public class FsDatasetCache {
           return;
         }
         try {
-          mappableBlock = MappableBlock.
-              load(length, blockIn, metaIn, blockFileName);
+          mappableBlock = mappableBlockLoader.load(length, blockIn, metaIn,
+              blockFileName, key);
         } catch (ChecksumException e) {
           // Exception message is bogus since this wasn't caused by a file read
           LOG.warn("Failed to cache " + key + ": checksum verification failed.");
           return;
         } catch (IOException e) {
-          LOG.warn("Failed to cache " + key, e);
+          LOG.warn("Failed to cache the block [key=" + key + "]!", e);
           return;
         }
         synchronized (FsDatasetCache.this) {
@@ -498,9 +501,7 @@ public class FsDatasetCache {
           }
           LOG.debug("Caching of {} was aborted.  We are now caching only {} "
                   + "bytes in total.", key, usedBytesCount.get());
-          if (mappableBlock != null) {
-            mappableBlock.close();
-          }
+          IOUtils.closeQuietly(mappableBlock);
           numBlocksFailedToCache.incrementAndGet();
 
           synchronized (FsDatasetCache.this) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
index 45aa364..0fff327 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
@@ -18,164 +18,21 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import java.io.BufferedInputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
-
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.io.Closeable;
 
 /**
- * Represents an HDFS block that is mmapped by the DataNode.
+ * Represents an HDFS block that is mapped by the DataNode.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MappableBlock implements Closeable {
-  private MappedByteBuffer mmap;
-  private final long length;
-
-  MappableBlock(MappedByteBuffer mmap, long length) {
-    this.mmap = mmap;
-    this.length = length;
-    assert length > 0;
-  }
-
-  public long getLength() {
-    return length;
-  }
-
-  /**
-   * Load the block.
-   *
-   * mmap and mlock the block, and then verify its checksum.
-   *
-   * @param length         The current length of the block.
-   * @param blockIn        The block input stream.  Should be positioned at the
-   *                       start.  The caller must close this.
-   * @param metaIn         The meta file input stream.  Should be positioned at
-   *                       the start.  The caller must close this.
-   * @param blockFileName  The block file name, for logging purposes.
-   *
-   * @return               The Mappable block.
-   */
-  public static MappableBlock load(long length,
-      FileInputStream blockIn, FileInputStream metaIn,
-      String blockFileName) throws IOException {
-    MappableBlock mappableBlock = null;
-    MappedByteBuffer mmap = null;
-    FileChannel blockChannel = null;
-    try {
-      blockChannel = blockIn.getChannel();
-      if (blockChannel == null) {
-        throw new IOException("Block InputStream has no FileChannel.");
-      }
-      mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
-      NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
-      verifyChecksum(length, metaIn, blockChannel, blockFileName);
-      mappableBlock = new MappableBlock(mmap, length);
-    } finally {
-      IOUtils.closeQuietly(blockChannel);
-      if (mappableBlock == null) {
-        if (mmap != null) {
-          NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
-        }
-      }
-    }
-    return mappableBlock;
-  }
-
-  /**
-   * Verifies the block's checksum. This is an I/O intensive operation.
-   */
-  private static void verifyChecksum(long length,
-      FileInputStream metaIn, FileChannel blockChannel, String blockFileName)
-          throws IOException, ChecksumException {
-    // Verify the checksum from the block's meta file
-    // Get the DataChecksum from the meta file header
-    BlockMetadataHeader header =
-        BlockMetadataHeader.readHeader(new DataInputStream(
-            new BufferedInputStream(metaIn, BlockMetadataHeader
-                .getHeaderSize())));
-    FileChannel metaChannel = null;
-    try {
-      metaChannel = metaIn.getChannel();
-      if (metaChannel == null) {
-        throw new IOException("Block InputStream meta file has no FileChannel.");
-      }
-      DataChecksum checksum = header.getChecksum();
-      final int bytesPerChecksum = checksum.getBytesPerChecksum();
-      final int checksumSize = checksum.getChecksumSize();
-      final int numChunks = (8*1024*1024) / bytesPerChecksum;
-      ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum);
-      ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
-      // Verify the checksum
-      int bytesVerified = 0;
-      while (bytesVerified < length) {
-        Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
-            "Unexpected partial chunk before EOF");
-        assert bytesVerified % bytesPerChecksum == 0;
-        int bytesRead = fillBuffer(blockChannel, blockBuf);
-        if (bytesRead == -1) {
-          throw new IOException("checksum verification failed: premature EOF");
-        }
-        blockBuf.flip();
-        // Number of read chunks, including partial chunk at end
-        int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum;
-        checksumBuf.limit(chunks*checksumSize);
-        fillBuffer(metaChannel, checksumBuf);
-        checksumBuf.flip();
-        checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
-            bytesVerified);
-        // Success
-        bytesVerified += bytesRead;
-        blockBuf.clear();
-        checksumBuf.clear();
-      }
-    } finally {
-      IOUtils.closeQuietly(metaChannel);
-    }
-  }
+public interface MappableBlock extends Closeable {
 
   /**
-   * Reads bytes into a buffer until EOF or the buffer's limit is reached
+   * Get the number of bytes that have been cached.
+   * @return the number of bytes that have been cached.
    */
-  private static int fillBuffer(FileChannel channel, ByteBuffer buf)
-      throws IOException {
-    int bytesRead = channel.read(buf);
-    if (bytesRead < 0) {
-      //EOF
-      return bytesRead;
-    }
-    while (buf.remaining() > 0) {
-      int n = channel.read(buf);
-      if (n < 0) {
-        //EOF
-        return bytesRead;
-      }
-      bytesRead += n;
-    }
-    return bytesRead;
-  }
-
-  @Override
-  public void close() {
-    if (mmap != null) {
-      NativeIO.POSIX.munmap(mmap);
-      mmap = null;
-    }
-  }
+  long getLength();
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
new file mode 100644
index 0000000..a323f78
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * Maps block to DataNode cache region.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class MappableBlockLoader {
+
+  /**
+   * Load the block.
+   *
+   * Map the block, and then verify its checksum.
+   *
+   * @param length         The current length of the block.
+   * @param blockIn        The block input stream. Should be positioned at the
+   *                       start. The caller must close this.
+   * @param metaIn         The meta file input stream. Should be positioned at
+   *                       the start. The caller must close this.
+   * @param blockFileName  The block file name, for logging purposes.
+   * @param key            The extended block ID.
+   *
+   * @throws IOException   If mapping block to cache region fails or checksum
+   *                       fails.
+   *
+   * @return               The Mappable block.
+   */
+  abstract MappableBlock load(long length, FileInputStream blockIn,
+                              FileInputStream metaIn, String blockFileName,
+                              ExtendedBlockId key)
+      throws IOException;
+
+  /**
+   * Reads bytes into a buffer until EOF or the buffer's limit is reached.
+   */
+  protected int fillBuffer(FileChannel channel, ByteBuffer buf)
+      throws IOException {
+    int bytesRead = channel.read(buf);
+    if (bytesRead < 0) {
+      //EOF
+      return bytesRead;
+    }
+    while (buf.remaining() > 0) {
+      int n = channel.read(buf);
+      if (n < 0) {
+        //EOF
+        return bytesRead;
+      }
+      bytesRead += n;
+    }
+    return bytesRead;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
similarity index 64%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 45aa364..7a0f7c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -18,45 +18,29 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import java.io.BufferedInputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
-
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
 
 /**
- * Represents an HDFS block that is mmapped by the DataNode.
+ * Maps block to memory.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MappableBlock implements Closeable {
-  private MappedByteBuffer mmap;
-  private final long length;
-
-  MappableBlock(MappedByteBuffer mmap, long length) {
-    this.mmap = mmap;
-    this.length = length;
-    assert length > 0;
-  }
-
-  public long getLength() {
-    return length;
-  }
+public class MemoryMappableBlockLoader extends MappableBlockLoader {
 
   /**
    * Load the block.
@@ -64,18 +48,23 @@ public class MappableBlock implements Closeable {
    * mmap and mlock the block, and then verify its checksum.
    *
    * @param length         The current length of the block.
-   * @param blockIn        The block input stream.  Should be positioned at the
-   *                       start.  The caller must close this.
-   * @param metaIn         The meta file input stream.  Should be positioned at
-   *                       the start.  The caller must close this.
+   * @param blockIn        The block input stream. Should be positioned at the
+   *                       start. The caller must close this.
+   * @param metaIn         The meta file input stream. Should be positioned at
+   *                       the start. The caller must close this.
    * @param blockFileName  The block file name, for logging purposes.
+   * @param key            The extended block ID.
    *
+   * @throws IOException   If mapping block to memory fails or checksum fails.
+
    * @return               The Mappable block.
    */
-  public static MappableBlock load(long length,
-      FileInputStream blockIn, FileInputStream metaIn,
-      String blockFileName) throws IOException {
-    MappableBlock mappableBlock = null;
+  @Override
+  public MappableBlock load(long length, FileInputStream blockIn,
+                            FileInputStream metaIn, String blockFileName,
+                            ExtendedBlockId key)
+      throws IOException {
+    MemoryMappedBlock mappableBlock = null;
     MappedByteBuffer mmap = null;
     FileChannel blockChannel = null;
     try {
@@ -83,10 +72,10 @@ public class MappableBlock implements Closeable {
       if (blockChannel == null) {
         throw new IOException("Block InputStream has no FileChannel.");
       }
-      mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
+      mmap = blockChannel.map(FileChannel.MapMode.READ_ONLY, 0, length);
       NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
       verifyChecksum(length, metaIn, blockChannel, blockFileName);
-      mappableBlock = new MappableBlock(mmap, length);
+      mappableBlock = new MemoryMappedBlock(mmap, length);
     } finally {
       IOUtils.closeQuietly(blockChannel);
       if (mappableBlock == null) {
@@ -101,9 +90,9 @@ public class MappableBlock implements Closeable {
   /**
    * Verifies the block's checksum. This is an I/O intensive operation.
    */
-  private static void verifyChecksum(long length,
-      FileInputStream metaIn, FileChannel blockChannel, String blockFileName)
-          throws IOException, ChecksumException {
+  public void verifyChecksum(long length, FileInputStream metaIn,
+                             FileChannel blockChannel, String blockFileName)
+      throws IOException {
     // Verify the checksum from the block's meta file
     // Get the DataChecksum from the meta file header
     BlockMetadataHeader header =
@@ -114,14 +103,15 @@ public class MappableBlock implements Closeable {
     try {
       metaChannel = metaIn.getChannel();
       if (metaChannel == null) {
-        throw new IOException("Block InputStream meta file has no FileChannel.");
+        throw new IOException(
+            "Block InputStream meta file has no FileChannel.");
       }
       DataChecksum checksum = header.getChecksum();
       final int bytesPerChecksum = checksum.getBytesPerChecksum();
       final int checksumSize = checksum.getChecksumSize();
-      final int numChunks = (8*1024*1024) / bytesPerChecksum;
-      ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum);
-      ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
+      final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
+      ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
+      ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
       // Verify the checksum
       int bytesVerified = 0;
       while (bytesVerified < length) {
@@ -134,8 +124,8 @@ public class MappableBlock implements Closeable {
         }
         blockBuf.flip();
         // Number of read chunks, including partial chunk at end
-        int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum;
-        checksumBuf.limit(chunks*checksumSize);
+        int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
+        checksumBuf.limit(chunks * checksumSize);
         fillBuffer(metaChannel, checksumBuf);
         checksumBuf.flip();
         checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
@@ -149,33 +139,4 @@ public class MappableBlock implements Closeable {
       IOUtils.closeQuietly(metaChannel);
     }
   }
-
-  /**
-   * Reads bytes into a buffer until EOF or the buffer's limit is reached
-   */
-  private static int fillBuffer(FileChannel channel, ByteBuffer buf)
-      throws IOException {
-    int bytesRead = channel.read(buf);
-    if (bytesRead < 0) {
-      //EOF
-      return bytesRead;
-    }
-    while (buf.remaining() > 0) {
-      int n = channel.read(buf);
-      if (n < 0) {
-        //EOF
-        return bytesRead;
-      }
-      bytesRead += n;
-    }
-    return bytesRead;
-  }
-
-  @Override
-  public void close() {
-    if (mmap != null) {
-      NativeIO.POSIX.munmap(mmap);
-      mmap = null;
-    }
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
new file mode 100644
index 0000000..c09ad1a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.nio.MappedByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+/**
+ * Represents an HDFS block that is mapped to memory by the DataNode.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MemoryMappedBlock implements MappableBlock {
+  private MappedByteBuffer mmap;
+  private final long length;
+
+  MemoryMappedBlock(MappedByteBuffer mmap, long length) {
+    this.mmap = mmap;
+    this.length = length;
+    assert length > 0;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
+  }
+
+  @Override
+  public void close() {
+    if (mmap != null) {
+      NativeIO.POSIX.munmap(mmap);
+      mmap = null;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/10: HDFS-14355 : Implement HDFS cache on SCM by using pure java mapped byte buffer. Contributed by Feilong He.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 6bc73a983d272235dee9fbb69e27bf6440343c0d
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Sat Mar 30 23:33:25 2019 -0700

    HDFS-14355 : Implement HDFS cache on SCM by using pure java mapped byte buffer. Contributed by Feilong He.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  19 ++
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |  31 ++
 .../datanode/fsdataset/impl/FsDatasetCache.java    |  90 +++++-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |  20 +-
 .../datanode/fsdataset/impl/FsDatasetUtil.java     |  27 ++
 .../fsdataset/impl/MappableBlockLoader.java        |  41 ++-
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |  45 ++-
 ...ockLoader.java => PmemMappableBlockLoader.java} | 132 ++++++---
 .../datanode/fsdataset/impl/PmemMappedBlock.java   |  70 +++++
 .../datanode/fsdataset/impl/PmemVolumeManager.java | 306 +++++++++++++++++++
 .../src/main/resources/hdfs-default.xml            |  34 +++
 .../impl/TestCacheByPmemMappableBlockLoader.java   | 329 +++++++++++++++++++++
 12 files changed, 1077 insertions(+), 67 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8877101..bad8352 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@@ -390,6 +392,23 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms";
   public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
 
+  // Currently, the available cache loaders are MemoryMappableBlockLoader,
+  // PmemMappableBlockLoader. MemoryMappableBlockLoader is the default cache
+  // loader to cache block replica to memory.
+  public static final String DFS_DATANODE_CACHE_LOADER_CLASS =
+      "dfs.datanode.cache.loader.class";
+  public static final Class<? extends MappableBlockLoader>
+      DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT =
+      MemoryMappableBlockLoader.class;
+  // Multiple dirs separated by "," are acceptable.
+  public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY =
+      "dfs.datanode.cache.pmem.dirs";
+  public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = "";
+  // The cache capacity of persistent memory
+  public static final String DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY =
+      "dfs.datanode.cache.pmem.capacity";
+  public static final long DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT = 0L;
+
   public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
   public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 155b800..6ee8e92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -27,6 +27,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
@@ -66,6 +71,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 
 import java.util.concurrent.TimeUnit;
@@ -115,7 +121,10 @@ public class DNConf {
   final long xceiverStopTimeout;
   final long restartReplicaExpiry;
 
+  private final Class<? extends MappableBlockLoader> cacheLoaderClass;
   final long maxLockedMemory;
+  private final long maxLockedPmem;
+  private final String[] pmemDirs;
 
   private final long bpReadyTimeout;
 
@@ -257,10 +266,20 @@ public class DNConf {
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
 
+    this.cacheLoaderClass = getConf().getClass(DFS_DATANODE_CACHE_LOADER_CLASS,
+        DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT, MappableBlockLoader.class);
+
     this.maxLockedMemory = getConf().getLong(
         DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
 
+    this.maxLockedPmem = getConf().getLong(
+        DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY,
+        DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT);
+
+    this.pmemDirs = getConf().getTrimmedStrings(
+        DFS_DATANODE_CACHE_PMEM_DIRS_KEY);
+
     this.restartReplicaExpiry = getConf().getLong(
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
@@ -323,6 +342,10 @@ public class DNConf {
     return maxLockedMemory;
   }
 
+  public long getMaxLockedPmem() {
+    return maxLockedPmem;
+  }
+
   /**
    * Returns true if connect to datanode via hostname
    * 
@@ -425,4 +448,12 @@ public class DNConf {
   int getMaxDataLength() {
     return maxDataLength;
   }
+
+  public Class<? extends MappableBlockLoader> getCacheLoaderClass() {
+    return cacheLoaderClass;
+  }
+
+  public String[] getPmemVolumes() {
+    return pmemDirs;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index f90a4b1..dce84c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -48,10 +49,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,7 +132,11 @@ public class FsDatasetCache {
 
   private final long revocationPollingMs;
 
-  private final MappableBlockLoader mappableBlockLoader;
+  /**
+   * A specific cacheLoader could cache block either to DRAM or
+   * to persistent memory.
+   */
+  private final MappableBlockLoader cacheLoader;
 
   private final MemoryCacheStats memCacheStats;
 
@@ -173,11 +179,41 @@ public class FsDatasetCache {
               ".  Reconfigure this to " + minRevocationPollingMs);
     }
     this.revocationPollingMs = confRevocationPollingMs;
-
-    this.mappableBlockLoader = new MemoryMappableBlockLoader(this);
     // Both lazy writer and read cache are sharing this statistics.
     this.memCacheStats = new MemoryCacheStats(
         dataset.datanode.getDnConf().getMaxLockedMemory());
+
+    Class<? extends MappableBlockLoader> cacheLoaderClass =
+        dataset.datanode.getDnConf().getCacheLoaderClass();
+    this.cacheLoader = ReflectionUtils.newInstance(cacheLoaderClass, null);
+    cacheLoader.initialize(this);
+  }
+
+  /**
+   * Check if pmem cache is enabled.
+   */
+  private boolean isPmemCacheEnabled() {
+    return !cacheLoader.isTransientCache();
+  }
+
+  DNConf getDnConf() {
+    return this.dataset.datanode.getDnConf();
+  }
+
+  MemoryCacheStats getMemCacheStats() {
+    return memCacheStats;
+  }
+
+  /**
+   * Get the cache path if the replica is cached into persistent memory.
+   */
+  String getReplicaCachePath(String bpid, long blockId) {
+    if (cacheLoader.isTransientCache() ||
+        !isCached(bpid, blockId)) {
+      return null;
+    }
+    ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
+    return cacheLoader.getCachedPath(key);
   }
 
   /**
@@ -344,14 +380,14 @@ public class FsDatasetCache {
       MappableBlock mappableBlock = null;
       ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
           key.getBlockId(), length, genstamp);
-      long newUsedBytes = mappableBlockLoader.reserve(length);
+      long newUsedBytes = cacheLoader.reserve(length);
       boolean reservedBytes = false;
       try {
         if (newUsedBytes < 0) {
           LOG.warn("Failed to cache " + key + ": could not reserve " + length +
               " more bytes in the cache: " +
-              DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
-              " of " + memCacheStats.getCacheCapacity() + " exceeded.");
+              cacheLoader.getCacheCapacityConfigKey() +
+              " of " + cacheLoader.getCacheCapacity() + " exceeded.");
           return;
         }
         reservedBytes = true;
@@ -370,8 +406,9 @@ public class FsDatasetCache {
           LOG.warn("Failed to cache " + key + ": failed to open file", e);
           return;
         }
+
         try {
-          mappableBlock = mappableBlockLoader.load(length, blockIn, metaIn,
+          mappableBlock = cacheLoader.load(length, blockIn, metaIn,
               blockFileName, key);
         } catch (ChecksumException e) {
           // Exception message is bogus since this wasn't caused by a file read
@@ -381,6 +418,7 @@ public class FsDatasetCache {
           LOG.warn("Failed to cache the block [key=" + key + "]!", e);
           return;
         }
+
         synchronized (FsDatasetCache.this) {
           Value value = mappableBlockMap.get(key);
           Preconditions.checkNotNull(value);
@@ -404,7 +442,7 @@ public class FsDatasetCache {
         IOUtils.closeQuietly(metaIn);
         if (!success) {
           if (reservedBytes) {
-            mappableBlockLoader.release(length);
+            cacheLoader.release(length);
           }
           LOG.debug("Caching of {} was aborted.  We are now caching only {} "
                   + "bytes in total.", key, memCacheStats.getCacheUsed());
@@ -481,8 +519,7 @@ public class FsDatasetCache {
       synchronized (FsDatasetCache.this) {
         mappableBlockMap.remove(key);
       }
-      long newUsedBytes = mappableBlockLoader
-          .release(value.mappableBlock.getLength());
+      long newUsedBytes = cacheLoader.release(value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
       dataset.datanode.getMetrics().incrBlocksUncached(1);
       if (revocationTimeMs != 0) {
@@ -498,19 +535,41 @@ public class FsDatasetCache {
   // Stats related methods for FSDatasetMBean
 
   /**
-   * Get the approximate amount of cache space used.
+   * Get the approximate amount of DRAM cache space used.
    */
   public long getCacheUsed() {
     return memCacheStats.getCacheUsed();
   }
 
   /**
-   * Get the maximum amount of bytes we can cache.  This is a constant.
+   * Get the approximate amount of persistent memory cache space used.
+   * TODO: advertise this metric to NameNode by FSDatasetMBean
+   */
+  public long getPmemCacheUsed() {
+    if (isPmemCacheEnabled()) {
+      return cacheLoader.getCacheUsed();
+    }
+    return 0;
+  }
+
+  /**
+   * Get the maximum amount of bytes we can cache on DRAM.  This is a constant.
    */
   public long getCacheCapacity() {
     return memCacheStats.getCacheCapacity();
   }
 
+  /**
+   * Get cache capacity of persistent memory.
+   * TODO: advertise this metric to NameNode by FSDatasetMBean
+   */
+  public long getPmemCacheCapacity() {
+    if (isPmemCacheEnabled()) {
+      return cacheLoader.getCacheCapacity();
+    }
+    return 0;
+  }
+
   public long getNumBlocksFailedToCache() {
     return numBlocksFailedToCache.get();
   }
@@ -528,4 +587,9 @@ public class FsDatasetCache {
     Value val = mappableBlockMap.get(block);
     return (val != null) && val.state.shouldAdvertise();
   }
+
+  @VisibleForTesting
+  MappableBlockLoader getCacheLoader() {
+    return cacheLoader;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index a61fbb1..29c31ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -800,11 +800,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       datanode.getMetrics().incrRamDiskBlocksReadHits();
     }
 
-    if (info != null) {
-      return info.getDataInputStream(seekOffset);
-    } else {
+    if (info == null || !info.blockDataExists()) {
       throw new IOException("No data exists for block " + b);
     }
+    return getBlockInputStreamWithCheckingPmemCache(info, b, seekOffset);
+  }
+
+  /**
+   * Check whether the replica is cached to persistent memory.
+   * If so, get DataInputStream of the corresponding cache file on pmem.
+   */
+  private InputStream getBlockInputStreamWithCheckingPmemCache(
+      ReplicaInfo info, ExtendedBlock b, long seekOffset) throws IOException {
+    String cachePath = cacheManager.getReplicaCachePath(
+        b.getBlockPoolId(), b.getBlockId());
+    if (cachePath != null) {
+      return FsDatasetUtil.getInputStreamAndSeek(
+          new File(cachePath), seekOffset);
+    }
+    return info.getDataInputStream(seekOffset);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 8a3b237..92c0888 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -27,6 +27,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.net.URI;
+import java.nio.channels.Channels;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Arrays;
 
 import com.google.common.base.Preconditions;
@@ -116,6 +119,19 @@ public class FsDatasetUtil {
     }
   }
 
+  public static InputStream getInputStreamAndSeek(File file, long offset)
+      throws IOException {
+    RandomAccessFile raf = null;
+    try {
+      raf = new RandomAccessFile(file, "r");
+      raf.seek(offset);
+      return Channels.newInputStream(raf.getChannel());
+    } catch(IOException ioe) {
+      IOUtils.cleanupWithLogger(null, raf);
+      throw ioe;
+    }
+  }
+
   /**
    * Find the meta-file for the specified block file and then return the
    * generation stamp from the name of the meta-file. Generally meta file will
@@ -183,4 +199,15 @@ public class FsDatasetUtil {
 
     FsDatasetImpl.computeChecksum(wrapper, dstMeta, smallBufferSize, conf);
   }
+
+  public static void deleteMappedFile(String filePath) throws IOException {
+    if (filePath == null) {
+      throw new IOException("The filePath should not be null!");
+    }
+    boolean result = Files.deleteIfExists(Paths.get(filePath));
+    if (!result) {
+      throw new IOException(
+          "Failed to delete the mapped file: " + filePath);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 0f5ec2d..a9e9610 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -35,6 +35,11 @@ import java.nio.channels.FileChannel;
 public abstract class MappableBlockLoader {
 
   /**
+   * Initialize a specific MappableBlockLoader.
+   */
+  abstract void initialize(FsDatasetCache cacheManager) throws IOException;
+
+  /**
    * Load the block.
    *
    * Map the block, and then verify its checksum.
@@ -60,24 +65,48 @@ public abstract class MappableBlockLoader {
   /**
    * Try to reserve some given bytes.
    *
-   * @param bytesCount
-   *          The number of bytes to add.
+   * @param bytesCount    The number of bytes to add.
    *
-   * @return The new number of usedBytes if we succeeded; -1 if we failed.
+   * @return              The new number of usedBytes if we succeeded;
+   *                      -1 if we failed.
    */
   abstract long reserve(long bytesCount);
 
   /**
    * Release some bytes that we're using.
    *
-   * @param bytesCount
-   *          The number of bytes to release.
+   * @param bytesCount    The number of bytes to release.
    *
-   * @return The new number of usedBytes.
+   * @return              The new number of usedBytes.
    */
   abstract long release(long bytesCount);
 
   /**
+   * Get the config key of cache capacity.
+   */
+  abstract String getCacheCapacityConfigKey();
+
+  /**
+   * Get the approximate amount of cache space used.
+   */
+  abstract long getCacheUsed();
+
+  /**
+   * Get the maximum amount of cache bytes.
+   */
+  abstract long getCacheCapacity();
+
+  /**
+   * Check whether the cache is non-volatile.
+   */
+  abstract boolean isTransientCache();
+
+  /**
+   * Get a cache file path if applicable. Otherwise return null.
+   */
+  abstract String getCachedPath(ExtendedBlockId key);
+
+  /**
    * Reads bytes into a buffer until EOF or the buffer's limit is reached.
    */
   protected int fillBuffer(FileChannel channel, ByteBuffer buf)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index d93193e..4b7af19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -42,16 +43,11 @@ import java.nio.channels.FileChannel;
 @InterfaceStability.Unstable
 public class MemoryMappableBlockLoader extends MappableBlockLoader {
 
-  private final FsDatasetCache cacheManager;
+  private MemoryCacheStats memCacheStats;
 
-  /**
-   * Constructs memory mappable loader.
-   *
-   * @param cacheManager
-   *          FsDatasetCache reference.
-   */
-  MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
-    this.cacheManager = cacheManager;
+  @Override
+  void initialize(FsDatasetCache cacheManager) throws IOException {
+    this.memCacheStats = cacheManager.getMemCacheStats();
   }
 
   /**
@@ -72,7 +68,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
    * @return               The Mappable block.
    */
   @Override
-  public MappableBlock load(long length, FileInputStream blockIn,
+  MappableBlock load(long length, FileInputStream blockIn,
                             FileInputStream metaIn, String blockFileName,
                             ExtendedBlockId key)
       throws IOException {
@@ -153,12 +149,37 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
+  public String getCacheCapacityConfigKey() {
+    return DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+  }
+
+  @Override
+  public long getCacheUsed() {
+    return memCacheStats.getCacheUsed();
+  }
+
+  @Override
+  public long getCacheCapacity() {
+    return memCacheStats.getCacheCapacity();
+  }
+
+  @Override
   long reserve(long bytesCount) {
-    return cacheManager.reserve(bytesCount);
+    return memCacheStats.reserve(bytesCount);
   }
 
   @Override
   long release(long bytesCount) {
-    return cacheManager.release(bytesCount);
+    return memCacheStats.release(bytesCount);
+  }
+
+  @Override
+  public boolean isTransientCache() {
+    return true;
+  }
+
+  @Override
+  public String getCachedPath(ExtendedBlockId key) {
+    return null;
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
similarity index 55%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index d93193e..c581d31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -18,46 +18,58 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
 /**
- * Maps block to memory.
+ * Maps block to persistent memory by using mapped byte buffer.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MemoryMappableBlockLoader extends MappableBlockLoader {
+public class PmemMappableBlockLoader extends MappableBlockLoader {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PmemMappableBlockLoader.class);
+  private PmemVolumeManager pmemVolumeManager;
 
-  private final FsDatasetCache cacheManager;
+  @Override
+  void initialize(FsDatasetCache cacheManager) throws IOException {
+    DNConf dnConf = cacheManager.getDnConf();
+    this.pmemVolumeManager = new PmemVolumeManager(dnConf.getMaxLockedPmem(),
+        dnConf.getPmemVolumes());
+  }
 
-  /**
-   * Constructs memory mappable loader.
-   *
-   * @param cacheManager
-   *          FsDatasetCache reference.
-   */
-  MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
-    this.cacheManager = cacheManager;
+  @VisibleForTesting
+  PmemVolumeManager getPmemVolumeManager() {
+    return pmemVolumeManager;
   }
 
   /**
    * Load the block.
    *
-   * mmap and mlock the block, and then verify its checksum.
+   * Map the block and verify its checksum.
+   *
+   * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
+   * is a persistent memory volume selected by getOneLocation() method.
    *
    * @param length         The current length of the block.
    * @param blockIn        The block input stream. Should be positioned at the
@@ -67,43 +79,63 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
    * @param blockFileName  The block file name, for logging purposes.
    * @param key            The extended block ID.
    *
-   * @throws IOException   If mapping block to memory fails or checksum fails.
-
+   * @throws IOException   If mapping block fails or checksum fails.
+   *
    * @return               The Mappable block.
    */
   @Override
-  public MappableBlock load(long length, FileInputStream blockIn,
+  MappableBlock load(long length, FileInputStream blockIn,
                             FileInputStream metaIn, String blockFileName,
                             ExtendedBlockId key)
       throws IOException {
-    MemoryMappedBlock mappableBlock = null;
-    MappedByteBuffer mmap = null;
+    PmemMappedBlock mappableBlock = null;
+    String filePath = null;
+
     FileChannel blockChannel = null;
+    RandomAccessFile file = null;
+    MappedByteBuffer out = null;
     try {
       blockChannel = blockIn.getChannel();
       if (blockChannel == null) {
         throw new IOException("Block InputStream has no FileChannel.");
       }
-      mmap = blockChannel.map(FileChannel.MapMode.READ_ONLY, 0, length);
-      NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
-      verifyChecksum(length, metaIn, blockChannel, blockFileName);
-      mappableBlock = new MemoryMappedBlock(mmap, length);
+
+      Byte volumeIndex = pmemVolumeManager.getOneVolumeIndex();
+      filePath = pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
+      file = new RandomAccessFile(filePath, "rw");
+      out = file.getChannel().
+          map(FileChannel.MapMode.READ_WRITE, 0, length);
+      if (out == null) {
+        throw new IOException("Failed to map the block " + blockFileName +
+            " to persistent storage.");
+      }
+      verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
+          blockFileName);
+      mappableBlock = new PmemMappedBlock(
+          length, volumeIndex, key, pmemVolumeManager);
+      pmemVolumeManager.afterCache(key, volumeIndex);
+      LOG.info("Successfully cached one replica:{} into persistent memory"
+          + ", [cached path={}, length={}]", key, filePath, length);
     } finally {
       IOUtils.closeQuietly(blockChannel);
+      if (out != null) {
+        NativeIO.POSIX.munmap(out);
+      }
+      IOUtils.closeQuietly(file);
       if (mappableBlock == null) {
-        if (mmap != null) {
-          NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
-        }
+        FsDatasetUtil.deleteMappedFile(filePath);
       }
     }
     return mappableBlock;
   }
 
   /**
-   * Verifies the block's checksum. This is an I/O intensive operation.
+   * Verifies the block's checksum meanwhile maps block to persistent memory.
+   * This is an I/O intensive operation.
    */
-  private void verifyChecksum(long length, FileInputStream metaIn,
-                             FileChannel blockChannel, String blockFileName)
+  private void verifyChecksumAndMapBlock(
+      MappedByteBuffer out, long length, FileInputStream metaIn,
+      FileChannel blockChannel, String blockFileName)
       throws IOException {
     // Verify the checksum from the block's meta file
     // Get the DataChecksum from the meta file header
@@ -115,8 +147,8 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
     try {
       metaChannel = metaIn.getChannel();
       if (metaChannel == null) {
-        throw new IOException(
-            "Block InputStream meta file has no FileChannel.");
+        throw new IOException("Cannot get FileChannel from " +
+            "Block InputStream meta file.");
       }
       DataChecksum checksum = header.getChecksum();
       final int bytesPerChecksum = checksum.getBytesPerChecksum();
@@ -132,7 +164,9 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
         assert bytesVerified % bytesPerChecksum == 0;
         int bytesRead = fillBuffer(blockChannel, blockBuf);
         if (bytesRead == -1) {
-          throw new IOException("checksum verification failed: premature EOF");
+          throw new IOException(
+              "Checksum verification failed for the block " + blockFileName +
+                  ": premature EOF");
         }
         blockBuf.flip();
         // Number of read chunks, including partial chunk at end
@@ -142,23 +176,55 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
         checksumBuf.flip();
         checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
             bytesVerified);
-        // Success
+
+        // / Copy data to persistent file
+        out.put(blockBuf);
+        // positioning the
         bytesVerified += bytesRead;
+
+        // Clear buffer
         blockBuf.clear();
         checksumBuf.clear();
       }
+      // Forces to write data to storage device containing the mapped file
+      out.force();
     } finally {
       IOUtils.closeQuietly(metaChannel);
     }
   }
 
   @Override
+  public String getCacheCapacityConfigKey() {
+    return DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
+  }
+
+  @Override
+  public long getCacheUsed() {
+    return pmemVolumeManager.getCacheUsed();
+  }
+
+  @Override
+  public long getCacheCapacity() {
+    return pmemVolumeManager.getCacheCapacity();
+  }
+
+  @Override
   long reserve(long bytesCount) {
-    return cacheManager.reserve(bytesCount);
+    return pmemVolumeManager.reserve(bytesCount);
   }
 
   @Override
   long release(long bytesCount) {
-    return cacheManager.release(bytesCount);
+    return pmemVolumeManager.release(bytesCount);
+  }
+
+  @Override
+  public boolean isTransientCache() {
+    return false;
+  }
+
+  @Override
+  public String getCachedPath(ExtendedBlockId key) {
+    return pmemVolumeManager.getCacheFilePath(key);
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
new file mode 100644
index 0000000..ce4fa22
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Represents an HDFS block that is mapped to persistent memory by DataNode
+ * with mapped byte buffer. PMDK is NOT involved in this implementation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PmemMappedBlock implements MappableBlock {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PmemMappedBlock.class);
+  private final PmemVolumeManager pmemVolumeManager;
+  private long length;
+  private Byte volumeIndex = null;
+  private ExtendedBlockId key;
+
+  PmemMappedBlock(long length, Byte volumeIndex, ExtendedBlockId key,
+                  PmemVolumeManager pmemVolumeManager) {
+    assert length > 0;
+    this.length = length;
+    this.volumeIndex = volumeIndex;
+    this.key = key;
+    this.pmemVolumeManager = pmemVolumeManager;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
+  }
+
+  @Override
+  public void close() {
+    String cacheFilePath =
+        pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
+    try {
+      FsDatasetUtil.deleteMappedFile(cacheFilePath);
+      pmemVolumeManager.afterUncache(key);
+      LOG.info("Successfully uncached one replica:{} from persistent memory"
+          + ", [cached path={}, length={}]", key, cacheFilePath, length);
+    } catch (IOException e) {
+      LOG.warn("Failed to delete the mapped File: {}!", cacheFilePath, e);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
new file mode 100644
index 0000000..76aa2dd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manage the persistent memory volumes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PmemVolumeManager {
+
+  /**
+   * Counts used bytes for persistent memory.
+   */
+  private class UsedBytesCount {
+    private final AtomicLong usedBytes = new AtomicLong(0);
+
+    /**
+     * Try to reserve more bytes.
+     *
+     * @param bytesCount    The number of bytes to add.
+     *
+     * @return              The new number of usedBytes if we succeeded;
+     *                      -1 if we failed.
+     */
+    long reserve(long bytesCount) {
+      while (true) {
+        long cur = usedBytes.get();
+        long next = cur + bytesCount;
+        if (next > cacheCapacity) {
+          return -1;
+        }
+        if (usedBytes.compareAndSet(cur, next)) {
+          return next;
+        }
+      }
+    }
+
+    /**
+     * Release some bytes that we're using.
+     *
+     * @param bytesCount    The number of bytes to release.
+     *
+     * @return              The new number of usedBytes.
+     */
+    long release(long bytesCount) {
+      return usedBytes.addAndGet(-bytesCount);
+    }
+
+    long get() {
+      return usedBytes.get();
+    }
+  }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PmemVolumeManager.class);
+  private final ArrayList<String> pmemVolumes = new ArrayList<>();
+  // Maintain which pmem volume a block is cached to.
+  private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
+      new ConcurrentHashMap<>();
+  private final UsedBytesCount usedBytesCount;
+
+  /**
+   * The total cache capacity in bytes of persistent memory.
+   * It is 0L if the specific mappableBlockLoader couldn't cache data to pmem.
+   */
+  private final long cacheCapacity;
+  private int count = 0;
+  // Strict atomic operation is not guaranteed for the performance sake.
+  private int i = 0;
+
+  PmemVolumeManager(long maxBytes, String[] pmemVolumesConfigured)
+      throws IOException {
+    if (pmemVolumesConfigured == null || pmemVolumesConfigured.length == 0) {
+      throw new IOException("The persistent memory volume, " +
+          DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY +
+          " is not configured!");
+    }
+    this.loadVolumes(pmemVolumesConfigured);
+    this.usedBytesCount = new UsedBytesCount();
+    this.cacheCapacity = maxBytes;
+  }
+
+  public long getCacheUsed() {
+    return usedBytesCount.get();
+  }
+
+  public long getCacheCapacity() {
+    return cacheCapacity;
+  }
+
+  /**
+   * Try to reserve more bytes on persistent memory.
+   *
+   * @param bytesCount    The number of bytes to add.
+   *
+   * @return              The new number of usedBytes if we succeeded;
+   *                      -1 if we failed.
+   */
+  long reserve(long bytesCount) {
+    return usedBytesCount.reserve(bytesCount);
+  }
+
+  /**
+   * Release some bytes that we're using on persistent memory.
+   *
+   * @param bytesCount    The number of bytes to release.
+   *
+   * @return              The new number of usedBytes.
+   */
+  long release(long bytesCount) {
+    return usedBytesCount.release(bytesCount);
+  }
+
+  /**
+   * Load and verify the configured pmem volumes.
+   *
+   * @throws IOException   If there is no available pmem volume.
+   */
+  private void loadVolumes(String[] volumes) throws IOException {
+    // Check whether the volume exists
+    for (String volume: volumes) {
+      try {
+        File pmemDir = new File(volume);
+        verifyIfValidPmemVolume(pmemDir);
+        // Remove all files under the volume.
+        FileUtils.cleanDirectory(pmemDir);
+      } catch (IllegalArgumentException e) {
+        LOG.error("Failed to parse persistent memory volume " + volume, e);
+        continue;
+      } catch (IOException e) {
+        LOG.error("Bad persistent memory volume: " + volume, e);
+        continue;
+      }
+      pmemVolumes.add(volume);
+      LOG.info("Added persistent memory - " + volume);
+    }
+    count = pmemVolumes.size();
+    if (count == 0) {
+      throw new IOException(
+          "At least one valid persistent memory volume is required!");
+    }
+  }
+
+  @VisibleForTesting
+  static void verifyIfValidPmemVolume(File pmemDir)
+      throws IOException {
+    if (!pmemDir.exists()) {
+      final String message = pmemDir + " does not exist";
+      throw new IOException(message);
+    }
+
+    if (!pmemDir.isDirectory()) {
+      final String message = pmemDir + " is not a directory";
+      throw new IllegalArgumentException(message);
+    }
+
+    String uuidStr = UUID.randomUUID().toString();
+    String testFilePath = pmemDir.getPath() + "/.verify.pmem." + uuidStr;
+    byte[] contents = uuidStr.getBytes("UTF-8");
+    RandomAccessFile testFile = null;
+    MappedByteBuffer out = null;
+    try {
+      testFile = new RandomAccessFile(testFilePath, "rw");
+      out = testFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0,
+          contents.length);
+      if (out == null) {
+        throw new IOException("Failed to map the test file under " + pmemDir);
+      }
+      out.put(contents);
+      // Forces to write data to storage device containing the mapped file
+      out.force();
+    } catch (IOException e) {
+      throw new IOException(
+          "Exception while writing data to persistent storage dir: " +
+              pmemDir, e);
+    } finally {
+      if (out != null) {
+        out.clear();
+      }
+      if (testFile != null) {
+        IOUtils.closeQuietly(testFile);
+        NativeIO.POSIX.munmap(out);
+        try {
+          FsDatasetUtil.deleteMappedFile(testFilePath);
+        } catch (IOException e) {
+          LOG.warn("Failed to delete test file " + testFilePath +
+              " from persistent memory", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Choose a persistent memory volume based on a specific algorithm.
+   * Currently it is a round-robin policy.
+   *
+   * TODO: Refine volume selection policy by considering storage utilization.
+   */
+  Byte getOneVolumeIndex() throws IOException {
+    if (count != 0) {
+      return (byte)(i++ % count);
+    } else {
+      throw new IOException("No usable persistent memory is found");
+    }
+  }
+
+  @VisibleForTesting
+  String getVolumeByIndex(Byte index) {
+    return pmemVolumes.get(index);
+  }
+
+  /**
+   * The cache file is named as BlockPoolId-BlockId.
+   * So its name can be inferred by BlockPoolId and BlockId.
+   */
+  public String getCacheFileName(ExtendedBlockId key) {
+    return key.getBlockPoolId() + "-" + key.getBlockId();
+  }
+
+  /**
+   * Considering the pmem volume size is below TB level currently,
+   * it is tolerable to keep cache files under one directory.
+   * The strategy will be optimized, especially if one pmem volume
+   * has huge cache capacity.
+   *
+   * @param volumeIndex   The index of pmem volume where a replica will be
+   *                      cached to or has been cached to.
+   *
+   * @param key           The replica's ExtendedBlockId.
+   *
+   * @return              A path to which the block replica is mapped.
+   */
+  public String inferCacheFilePath(Byte volumeIndex, ExtendedBlockId key) {
+    return pmemVolumes.get(volumeIndex) + "/" + getCacheFileName(key);
+  }
+
+  /**
+   * The cache file path is pmemVolume/BlockPoolId-BlockId.
+   */
+  public String getCacheFilePath(ExtendedBlockId key) {
+    Byte volumeIndex = blockKeyToVolume.get(key);
+    if (volumeIndex == null) {
+      return  null;
+    }
+    return inferCacheFilePath(volumeIndex, key);
+  }
+
+  @VisibleForTesting
+  Map<ExtendedBlockId, Byte> getBlockKeyToVolume() {
+    return blockKeyToVolume;
+  }
+
+  /**
+   * Add cached block's ExtendedBlockId and its cache volume index to a map
+   * after cache.
+   */
+  public void afterCache(ExtendedBlockId key, Byte volumeIndex) {
+    blockKeyToVolume.put(key, volumeIndex);
+  }
+
+  /**
+   * Remove the record in blockKeyToVolume for uncached block after uncache.
+   */
+  public void afterUncache(ExtendedBlockId key) {
+    blockKeyToVolume.remove(key);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index e01e8c2..5ab8207 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2539,6 +2539,18 @@
 </property>
 
 <property>
+  <name>dfs.datanode.cache.loader.class</name>
+  <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader</value>
+  <description>
+    Currently, the available cache loaders are MemoryMappableBlockLoader, PmemMappableBlockLoader.
+    By default, MemoryMappableBlockLoader is used and it maps block replica into memory.
+    PmemMappableBlockLoader can map block to persistent memory with mapped byte buffer, which is
+    implemented by Java code. The value of dfs.datanode.cache.pmem.dirs specifies the persistent
+    memory directory.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.max.locked.memory</name>
   <value>0</value>
   <description>
@@ -2555,6 +2567,28 @@
 </property>
 
 <property>
+  <name>dfs.datanode.cache.pmem.capacity</name>
+  <value>0</value>
+  <description>
+    The amount of persistent memory in bytes that can be used to cache block
+    replicas to persistent memory. Currently, persistent memory is only enabled
+    in HDFS Centralized Cache Management feature.
+
+    By default, this parameter is 0, which disables persistent memory caching.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.cache.pmem.dirs</name>
+  <value></value>
+  <description>
+    This value specifies the persistent memory directory used for caching block
+    replica. It matters only if the value of dfs.datanode.cache.loader.class is
+    PmemMappableBlockLoader. Multiple directories separated by "," are acceptable.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.list.cache.directives.num.responses</name>
   <value>100</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
new file mode 100644
index 0000000..9b4f06f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import com.google.common.base.Supplier;
+import com.google.common.primitives.Ints;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY;
+
+/**
+ * Tests HDFS persistent memory cache by PmemMappableBlockLoader.
+ *
+ * Bogus persistent memory volume is used to cache blocks.
+ */
+public class TestCacheByPmemMappableBlockLoader {
+  protected static final org.slf4j.Logger LOG =
+      LoggerFactory.getLogger(TestCacheByPmemMappableBlockLoader.class);
+
+  protected static final long CACHE_CAPACITY = 64 * 1024;
+  protected static final long BLOCK_SIZE = 4 * 1024;
+
+  private static Configuration conf;
+  private static MiniDFSCluster cluster = null;
+  private static DistributedFileSystem fs;
+  private static DataNode dn;
+  private static FsDatasetCache cacheManager;
+  private static PmemMappableBlockLoader cacheLoader;
+  /**
+   * Used to pause DN BPServiceActor threads. BPSA threads acquire the
+   * shared read lock. The test acquires the write lock for exclusive access.
+   */
+  private static ReadWriteLock lock = new ReentrantReadWriteLock(true);
+  private static CacheManipulator prevCacheManipulator;
+  private static DataNodeFaultInjector oldInjector;
+
+  private static final String PMEM_DIR_0 =
+      MiniDFSCluster.getBaseDirectory() + "pmem0";
+  private static final String PMEM_DIR_1 =
+      MiniDFSCluster.getBaseDirectory() + "pmem1";
+
+  static {
+    GenericTestUtils.setLogLevel(
+        LoggerFactory.getLogger(FsDatasetCache.class), Level.DEBUG);
+  }
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    oldInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+      @Override
+      public void startOfferService() throws Exception {
+        lock.readLock().lock();
+      }
+
+      @Override
+      public void endOfferService() throws Exception {
+        lock.readLock().unlock();
+      }
+    });
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    DataNodeFaultInjector.set(oldInjector);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setLong(
+        DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
+    conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        CACHE_CAPACITY);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
+
+    // Configuration for pmem cache
+    conf.set(DFS_DATANODE_CACHE_LOADER_CLASS,
+        "org.apache.hadoop.hdfs.server.datanode." +
+            "fsdataset.impl.PmemMappableBlockLoader");
+    new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
+    new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
+    // Configure two bogus pmem volumes
+    conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
+    conf.setLong(DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, CACHE_CAPACITY);
+
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    cluster.waitActive();
+
+    fs = cluster.getFileSystem();
+    dn = cluster.getDataNodes().get(0);
+    cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
+    cacheLoader = (PmemMappableBlockLoader) cacheManager.getCacheLoader();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+    NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
+  }
+
+  protected static void shutdownCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testPmemVolumeManager() throws IOException {
+    PmemVolumeManager pmemVolumeManager =
+        cacheLoader.getPmemVolumeManager();
+    assertNotNull(pmemVolumeManager);
+    assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity());
+    // Test round-robin selection policy
+    long count1 = 0, count2 = 0;
+    for (int i = 0; i < 10; i++) {
+      Byte index = pmemVolumeManager.getOneVolumeIndex();
+      String volume = pmemVolumeManager.getVolumeByIndex(index);
+      if (volume.equals(PMEM_DIR_0)) {
+        count1++;
+      } else if (volume.equals(PMEM_DIR_1)) {
+        count2++;
+      } else {
+        fail("Unexpected persistent storage location:" + volume);
+      }
+    }
+    assertEquals(count1, count2);
+  }
+
+  public List<ExtendedBlockId> getExtendedBlockId(Path filePath, long fileLen)
+      throws IOException {
+    List<ExtendedBlockId> keys = new ArrayList<>();
+    HdfsBlockLocation[] locs =
+        (HdfsBlockLocation[]) fs.getFileBlockLocations(filePath, 0, fileLen);
+    for (HdfsBlockLocation loc : locs) {
+      long bkid = loc.getLocatedBlock().getBlock().getBlockId();
+      String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
+      keys.add(new ExtendedBlockId(bkid, bpid));
+    }
+    return keys;
+  }
+
+  @Test(timeout = 60000)
+  public void testCacheAndUncache() throws Exception {
+    final int maxCacheBlocksNum =
+        Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
+    BlockReaderTestUtil.enableHdfsCachingTracing();
+    Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
+    assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheCapacity());
+
+    final Path testFile = new Path("/testFile");
+    final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE;
+    DFSTestUtil.createFile(fs, testFile,
+        testFileLen, (short) 1, 0xbeef);
+    List<ExtendedBlockId> blockKeys =
+        getExtendedBlockId(testFile, testFileLen);
+    fs.addCachePool(new CachePoolInfo("testPool"));
+    final long cacheDirectiveId = fs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().setPool("testPool").
+            setPath(testFile).setReplication((short) 1).build());
+    // wait for caching
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+        long blocksCached =
+            MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
+        if (blocksCached != maxCacheBlocksNum) {
+          LOG.info("waiting for " + maxCacheBlocksNum + " blocks to " +
+              "be cached. Right now " + blocksCached + " blocks are cached.");
+          return false;
+        }
+        LOG.info(maxCacheBlocksNum + " blocks are now cached.");
+        return true;
+      }
+    }, 1000, 30000);
+
+    // The pmem cache space is expected to have been used up.
+    assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed());
+    Map<ExtendedBlockId, Byte> blockKeyToVolume =
+        cacheLoader.getPmemVolumeManager().getBlockKeyToVolume();
+    // All block keys should be kept in blockKeyToVolume
+    assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
+    assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
+    // Test each replica's cache file path
+    for (ExtendedBlockId key : blockKeys) {
+      String cachePath = cacheManager.
+          getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
+      // The cachePath shouldn't be null if the replica has been cached
+      // to pmem.
+      assertNotNull(cachePath);
+      String expectFileName =
+          cacheLoader.getPmemVolumeManager().getCacheFileName(key);
+      if (cachePath.startsWith(PMEM_DIR_0)) {
+        assertTrue(cachePath.equals(PMEM_DIR_0 + "/" + expectFileName));
+      } else if (cachePath.startsWith(PMEM_DIR_1)) {
+        assertTrue(cachePath.equals(PMEM_DIR_1 + "/" + expectFileName));
+      } else {
+        fail("The cache path is not the expected one: " + cachePath);
+      }
+    }
+
+    // Try to cache another file. Caching this file should fail
+    // due to lack of available cache space.
+    final Path smallTestFile = new Path("/smallTestFile");
+    final long smallTestFileLen =  BLOCK_SIZE;
+    DFSTestUtil.createFile(fs, smallTestFile,
+        smallTestFileLen, (short) 1, 0xbeef);
+    // Try to cache more blocks when no cache space is available.
+    final long smallFileCacheDirectiveId = fs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().setPool("testPool").
+            setPath(smallTestFile).setReplication((short) 1).build());
+
+    // Wait for enough time to verify smallTestFile could not be cached.
+    Thread.sleep(10000);
+    MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+    long blocksCached =
+        MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
+    // The cached block num should not be increased.
+    assertTrue(blocksCached == maxCacheBlocksNum);
+    // The blockKeyToVolume should just keep the block keys for the testFile.
+    assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
+    assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
+    // Stop trying to cache smallTestFile to avoid interfering the
+    // verification for uncache functionality.
+    fs.removeCacheDirective(smallFileCacheDirectiveId);
+
+    // Uncache the test file
+    fs.removeCacheDirective(cacheDirectiveId);
+    // Wait for uncaching
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+        long blocksUncached =
+            MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
+        if (blocksUncached != maxCacheBlocksNum) {
+          LOG.info("waiting for " + maxCacheBlocksNum + " blocks to be " +
+              "uncached. Right now " + blocksUncached +
+              " blocks are uncached.");
+          return false;
+        }
+        LOG.info(maxCacheBlocksNum + " blocks have been uncached.");
+        return true;
+      }
+    }, 1000, 30000);
+
+    // It is expected that no pmem cache space is used.
+    assertEquals(0, cacheManager.getPmemCacheUsed());
+    // No record should be kept by blockKeyToVolume after testFile is uncached.
+    assertEquals(blockKeyToVolume.size(), 0);
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 04/10: HDFS-14401. Refine the implementation for HDFS cache on SCM. Contributed by Feilong He.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f3571c1f096e771060dae4d31d2d8508d42d04b9
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Wed May 8 17:20:21 2019 +0530

    HDFS-14401. Refine the implementation for HDFS cache on SCM. Contributed by Feilong He.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  14 --
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |  22 ---
 .../datanode/fsdataset/impl/FsDatasetCache.java    |  31 ++--
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |   2 +
 .../fsdataset/impl/MappableBlockLoader.java        |  19 +-
 .../fsdataset/impl/MappableBlockLoaderFactory.java |  47 +++++
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |  21 +--
 .../fsdataset/impl/PmemMappableBlockLoader.java    |  40 ++---
 .../datanode/fsdataset/impl/PmemMappedBlock.java   |  10 +-
 .../datanode/fsdataset/impl/PmemVolumeManager.java | 197 +++++++++++++++------
 .../src/main/resources/hdfs-default.xml            |  24 ---
 .../impl/TestCacheByPmemMappableBlockLoader.java   |  26 ++-
 .../fsdataset/impl/TestFsDatasetCache.java         |   5 +-
 13 files changed, 256 insertions(+), 202 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index bad8352..f2df396 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@@ -392,22 +390,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms";
   public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
 
-  // Currently, the available cache loaders are MemoryMappableBlockLoader,
-  // PmemMappableBlockLoader. MemoryMappableBlockLoader is the default cache
-  // loader to cache block replica to memory.
-  public static final String DFS_DATANODE_CACHE_LOADER_CLASS =
-      "dfs.datanode.cache.loader.class";
-  public static final Class<? extends MappableBlockLoader>
-      DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT =
-      MemoryMappableBlockLoader.class;
   // Multiple dirs separated by "," are acceptable.
   public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY =
       "dfs.datanode.cache.pmem.dirs";
   public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = "";
-  // The cache capacity of persistent memory
-  public static final String DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY =
-      "dfs.datanode.cache.pmem.capacity";
-  public static final long DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT = 0L;
 
   public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
   public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 6ee8e92..139ad77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -27,10 +27,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
@@ -71,7 +67,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 
 import java.util.concurrent.TimeUnit;
@@ -121,9 +116,7 @@ public class DNConf {
   final long xceiverStopTimeout;
   final long restartReplicaExpiry;
 
-  private final Class<? extends MappableBlockLoader> cacheLoaderClass;
   final long maxLockedMemory;
-  private final long maxLockedPmem;
   private final String[] pmemDirs;
 
   private final long bpReadyTimeout;
@@ -266,17 +259,10 @@ public class DNConf {
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
 
-    this.cacheLoaderClass = getConf().getClass(DFS_DATANODE_CACHE_LOADER_CLASS,
-        DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT, MappableBlockLoader.class);
-
     this.maxLockedMemory = getConf().getLong(
         DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
 
-    this.maxLockedPmem = getConf().getLong(
-        DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY,
-        DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT);
-
     this.pmemDirs = getConf().getTrimmedStrings(
         DFS_DATANODE_CACHE_PMEM_DIRS_KEY);
 
@@ -342,10 +328,6 @@ public class DNConf {
     return maxLockedMemory;
   }
 
-  public long getMaxLockedPmem() {
-    return maxLockedPmem;
-  }
-
   /**
    * Returns true if connect to datanode via hostname
    * 
@@ -449,10 +431,6 @@ public class DNConf {
     return maxDataLength;
   }
 
-  public Class<? extends MappableBlockLoader> getCacheLoaderClass() {
-    return cacheLoaderClass;
-  }
-
   public String[] getPmemVolumes() {
     return pmemDirs;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index dce84c2..4fab214 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -183,9 +182,8 @@ public class FsDatasetCache {
     this.memCacheStats = new MemoryCacheStats(
         dataset.datanode.getDnConf().getMaxLockedMemory());
 
-    Class<? extends MappableBlockLoader> cacheLoaderClass =
-        dataset.datanode.getDnConf().getCacheLoaderClass();
-    this.cacheLoader = ReflectionUtils.newInstance(cacheLoaderClass, null);
+    this.cacheLoader = MappableBlockLoaderFactory.createCacheLoader(
+        this.getDnConf());
     cacheLoader.initialize(this);
   }
 
@@ -213,7 +211,7 @@ public class FsDatasetCache {
       return null;
     }
     ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
-    return cacheLoader.getCachedPath(key);
+    return PmemVolumeManager.getInstance().getCachePath(key);
   }
 
   /**
@@ -380,14 +378,13 @@ public class FsDatasetCache {
       MappableBlock mappableBlock = null;
       ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
           key.getBlockId(), length, genstamp);
-      long newUsedBytes = cacheLoader.reserve(length);
+      long newUsedBytes = cacheLoader.reserve(key, length);
       boolean reservedBytes = false;
       try {
         if (newUsedBytes < 0) {
-          LOG.warn("Failed to cache " + key + ": could not reserve " + length +
-              " more bytes in the cache: " +
-              cacheLoader.getCacheCapacityConfigKey() +
-              " of " + cacheLoader.getCacheCapacity() + " exceeded.");
+          LOG.warn("Failed to cache " + key + ": could not reserve " +
+              "more bytes in the cache: " + cacheLoader.getCacheCapacity() +
+              " exceeded when try to reserve " + length + "bytes.");
           return;
         }
         reservedBytes = true;
@@ -442,10 +439,10 @@ public class FsDatasetCache {
         IOUtils.closeQuietly(metaIn);
         if (!success) {
           if (reservedBytes) {
-            cacheLoader.release(length);
+            cacheLoader.release(key, length);
           }
           LOG.debug("Caching of {} was aborted.  We are now caching only {} "
-                  + "bytes in total.", key, memCacheStats.getCacheUsed());
+                  + "bytes in total.", key, cacheLoader.getCacheUsed());
           IOUtils.closeQuietly(mappableBlock);
           numBlocksFailedToCache.incrementAndGet();
 
@@ -519,7 +516,8 @@ public class FsDatasetCache {
       synchronized (FsDatasetCache.this) {
         mappableBlockMap.remove(key);
       }
-      long newUsedBytes = cacheLoader.release(value.mappableBlock.getLength());
+      long newUsedBytes = cacheLoader.
+          release(key, value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
       dataset.datanode.getMetrics().incrBlocksUncached(1);
       if (revocationTimeMs != 0) {
@@ -592,4 +590,11 @@ public class FsDatasetCache {
   MappableBlockLoader getCacheLoader() {
     return cacheLoader;
   }
+
+  /**
+   * This method can be executed during DataNode shutdown.
+   */
+  void shutdown() {
+    cacheLoader.shutdown();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 29c31ef..801b4c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2340,6 +2340,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                      "from LazyWriter.join");
       }
     }
+
+    cacheManager.shutdown();
   }
 
   @Override // FSDatasetMBean
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index a9e9610..044e5c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -65,26 +65,25 @@ public abstract class MappableBlockLoader {
   /**
    * Try to reserve some given bytes.
    *
+   * @param key           The ExtendedBlockId for a block.
+   *
    * @param bytesCount    The number of bytes to add.
    *
    * @return              The new number of usedBytes if we succeeded;
    *                      -1 if we failed.
    */
-  abstract long reserve(long bytesCount);
+  abstract long reserve(ExtendedBlockId key, long bytesCount);
 
   /**
    * Release some bytes that we're using.
    *
+   * @param key           The ExtendedBlockId for a block.
+   *
    * @param bytesCount    The number of bytes to release.
    *
    * @return              The new number of usedBytes.
    */
-  abstract long release(long bytesCount);
-
-  /**
-   * Get the config key of cache capacity.
-   */
-  abstract String getCacheCapacityConfigKey();
+  abstract long release(ExtendedBlockId key, long bytesCount);
 
   /**
    * Get the approximate amount of cache space used.
@@ -102,9 +101,11 @@ public abstract class MappableBlockLoader {
   abstract boolean isTransientCache();
 
   /**
-   * Get a cache file path if applicable. Otherwise return null.
+   * Clean up cache, can be used during DataNode shutdown.
    */
-  abstract String getCachedPath(ExtendedBlockId key);
+  void shutdown() {
+    // Do nothing.
+  }
 
   /**
    * Reads bytes into a buffer until EOF or the buffer's limit is reached.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
new file mode 100644
index 0000000..43b1b53
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
+
+/**
+ * Creates MappableBlockLoader.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class MappableBlockLoaderFactory {
+
+  private MappableBlockLoaderFactory() {
+    // Prevent instantiation
+  }
+
+  /**
+   * Create a specific cache loader according to the configuration.
+   * If persistent memory volume is not configured, return a cache loader
+   * for DRAM cache. Otherwise, return a cache loader for pmem cache.
+   */
+  public static MappableBlockLoader createCacheLoader(DNConf conf) {
+    if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) {
+      return new MemoryMappableBlockLoader();
+    }
+    return new PmemMappableBlockLoader();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 4b7af19..919835a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -22,11 +22,12 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
@@ -42,11 +43,13 @@ import java.nio.channels.FileChannel;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class MemoryMappableBlockLoader extends MappableBlockLoader {
-
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MemoryMappableBlockLoader.class);
   private MemoryCacheStats memCacheStats;
 
   @Override
   void initialize(FsDatasetCache cacheManager) throws IOException {
+    LOG.info("Initializing cache loader: MemoryMappableBlockLoader.");
     this.memCacheStats = cacheManager.getMemCacheStats();
   }
 
@@ -149,11 +152,6 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
-  public String getCacheCapacityConfigKey() {
-    return DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
-  }
-
-  @Override
   public long getCacheUsed() {
     return memCacheStats.getCacheUsed();
   }
@@ -164,12 +162,12 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
-  long reserve(long bytesCount) {
+  long reserve(ExtendedBlockId key, long bytesCount) {
     return memCacheStats.reserve(bytesCount);
   }
 
   @Override
-  long release(long bytesCount) {
+  long release(ExtendedBlockId key, long bytesCount) {
     return memCacheStats.release(bytesCount);
   }
 
@@ -177,9 +175,4 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
   public boolean isTransientCache() {
     return true;
   }
-
-  @Override
-  public String getCachedPath(ExtendedBlockId key) {
-    return null;
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index c581d31..05a9ba7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -18,12 +18,10 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
@@ -53,14 +51,10 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
 
   @Override
   void initialize(FsDatasetCache cacheManager) throws IOException {
+    LOG.info("Initializing cache loader: PmemMappableBlockLoader.");
     DNConf dnConf = cacheManager.getDnConf();
-    this.pmemVolumeManager = new PmemVolumeManager(dnConf.getMaxLockedPmem(),
-        dnConf.getPmemVolumes());
-  }
-
-  @VisibleForTesting
-  PmemVolumeManager getPmemVolumeManager() {
-    return pmemVolumeManager;
+    PmemVolumeManager.init(dnConf.getPmemVolumes());
+    pmemVolumeManager = PmemVolumeManager.getInstance();
   }
 
   /**
@@ -69,7 +63,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
    * Map the block and verify its checksum.
    *
    * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
-   * is a persistent memory volume selected by getOneLocation() method.
+   * is a persistent memory volume chosen by PmemVolumeManager.
    *
    * @param length         The current length of the block.
    * @param blockIn        The block input stream. Should be positioned at the
@@ -100,8 +94,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
         throw new IOException("Block InputStream has no FileChannel.");
       }
 
-      Byte volumeIndex = pmemVolumeManager.getOneVolumeIndex();
-      filePath = pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
+      filePath = pmemVolumeManager.getCachePath(key);
       file = new RandomAccessFile(filePath, "rw");
       out = file.getChannel().
           map(FileChannel.MapMode.READ_WRITE, 0, length);
@@ -111,9 +104,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
       }
       verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
           blockFileName);
-      mappableBlock = new PmemMappedBlock(
-          length, volumeIndex, key, pmemVolumeManager);
-      pmemVolumeManager.afterCache(key, volumeIndex);
+      mappableBlock = new PmemMappedBlock(length, key);
       LOG.info("Successfully cached one replica:{} into persistent memory"
           + ", [cached path={}, length={}]", key, filePath, length);
     } finally {
@@ -123,6 +114,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
       }
       IOUtils.closeQuietly(file);
       if (mappableBlock == null) {
+        LOG.debug("Delete {} due to unsuccessful mapping.", filePath);
         FsDatasetUtil.deleteMappedFile(filePath);
       }
     }
@@ -194,11 +186,6 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
-  public String getCacheCapacityConfigKey() {
-    return DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
-  }
-
-  @Override
   public long getCacheUsed() {
     return pmemVolumeManager.getCacheUsed();
   }
@@ -209,13 +196,13 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
-  long reserve(long bytesCount) {
-    return pmemVolumeManager.reserve(bytesCount);
+  long reserve(ExtendedBlockId key, long bytesCount) {
+    return pmemVolumeManager.reserve(key, bytesCount);
   }
 
   @Override
-  long release(long bytesCount) {
-    return pmemVolumeManager.release(bytesCount);
+  long release(ExtendedBlockId key, long bytesCount) {
+    return pmemVolumeManager.release(key, bytesCount);
   }
 
   @Override
@@ -224,7 +211,8 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
-  public String getCachedPath(ExtendedBlockId key) {
-    return pmemVolumeManager.getCacheFilePath(key);
+  void shutdown() {
+    LOG.info("Clean up cache on persistent memory during shutdown.");
+    PmemVolumeManager.getInstance().cleanup();
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
index ce4fa22..25c3d40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
@@ -35,18 +35,13 @@ import java.io.IOException;
 public class PmemMappedBlock implements MappableBlock {
   private static final Logger LOG =
       LoggerFactory.getLogger(PmemMappedBlock.class);
-  private final PmemVolumeManager pmemVolumeManager;
   private long length;
-  private Byte volumeIndex = null;
   private ExtendedBlockId key;
 
-  PmemMappedBlock(long length, Byte volumeIndex, ExtendedBlockId key,
-                  PmemVolumeManager pmemVolumeManager) {
+  PmemMappedBlock(long length, ExtendedBlockId key) {
     assert length > 0;
     this.length = length;
-    this.volumeIndex = volumeIndex;
     this.key = key;
-    this.pmemVolumeManager = pmemVolumeManager;
   }
 
   @Override
@@ -57,10 +52,9 @@ public class PmemMappedBlock implements MappableBlock {
   @Override
   public void close() {
     String cacheFilePath =
-        pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
+        PmemVolumeManager.getInstance().getCachePath(key);
     try {
       FsDatasetUtil.deleteMappedFile(cacheFilePath);
-      pmemVolumeManager.afterUncache(key);
       LOG.info("Successfully uncached one replica:{} from persistent memory"
           + ", [cached path={}, length={}]", key, cacheFilePath, length);
     } catch (IOException e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
index 76aa2dd..2d77f7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
@@ -35,6 +35,7 @@ import java.io.RandomAccessFile;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,14 +46,19 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class PmemVolumeManager {
+public final class PmemVolumeManager {
 
   /**
    * Counts used bytes for persistent memory.
    */
-  private class UsedBytesCount {
+  private static class UsedBytesCount {
+    private final long maxBytes;
     private final AtomicLong usedBytes = new AtomicLong(0);
 
+    UsedBytesCount(long maxBytes) {
+      this.maxBytes = maxBytes;
+    }
+
     /**
      * Try to reserve more bytes.
      *
@@ -65,7 +71,7 @@ public class PmemVolumeManager {
       while (true) {
         long cur = usedBytes.get();
         long next = cur + bytesCount;
-        if (next > cacheCapacity) {
+        if (next > maxBytes) {
           return -1;
         }
         if (usedBytes.compareAndSet(cur, next)) {
@@ -85,42 +91,76 @@ public class PmemVolumeManager {
       return usedBytes.addAndGet(-bytesCount);
     }
 
-    long get() {
+    long getUsedBytes() {
       return usedBytes.get();
     }
+
+    long getMaxBytes() {
+      return maxBytes;
+    }
+
+    long getAvailableBytes() {
+      return maxBytes - usedBytes.get();
+    }
   }
 
   private static final Logger LOG =
       LoggerFactory.getLogger(PmemVolumeManager.class);
+  public static final String CACHE_DIR = "hdfs_pmem_cache";
+  private static PmemVolumeManager pmemVolumeManager = null;
   private final ArrayList<String> pmemVolumes = new ArrayList<>();
   // Maintain which pmem volume a block is cached to.
   private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
       new ConcurrentHashMap<>();
-  private final UsedBytesCount usedBytesCount;
+  private final List<UsedBytesCount> usedBytesCounts = new ArrayList<>();
 
   /**
    * The total cache capacity in bytes of persistent memory.
-   * It is 0L if the specific mappableBlockLoader couldn't cache data to pmem.
    */
-  private final long cacheCapacity;
+  private long cacheCapacity;
+  private static long maxBytesPerPmem = -1;
   private int count = 0;
-  // Strict atomic operation is not guaranteed for the performance sake.
-  private int i = 0;
+  private byte nextIndex = 0;
 
-  PmemVolumeManager(long maxBytes, String[] pmemVolumesConfigured)
-      throws IOException {
-    if (pmemVolumesConfigured == null || pmemVolumesConfigured.length == 0) {
+  private PmemVolumeManager(String[] pmemVolumesConfig) throws IOException {
+    if (pmemVolumesConfig == null || pmemVolumesConfig.length == 0) {
       throw new IOException("The persistent memory volume, " +
           DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY +
           " is not configured!");
     }
-    this.loadVolumes(pmemVolumesConfigured);
-    this.usedBytesCount = new UsedBytesCount();
-    this.cacheCapacity = maxBytes;
+    this.loadVolumes(pmemVolumesConfig);
+    cacheCapacity = 0L;
+    for (UsedBytesCount counter : usedBytesCounts) {
+      cacheCapacity += counter.getMaxBytes();
+    }
+  }
+
+  public synchronized static void init(String[] pmemVolumesConfig)
+      throws IOException {
+    if (pmemVolumeManager == null) {
+      pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig);
+    }
+  }
+
+  public static PmemVolumeManager getInstance() {
+    if (pmemVolumeManager == null) {
+      throw new RuntimeException(
+          "The pmemVolumeManager should be instantiated!");
+    }
+    return pmemVolumeManager;
+  }
+
+  @VisibleForTesting
+  public static void setMaxBytes(long maxBytes) {
+    maxBytesPerPmem = maxBytes;
   }
 
   public long getCacheUsed() {
-    return usedBytesCount.get();
+    long usedBytes = 0L;
+    for (UsedBytesCount counter : usedBytesCounts) {
+      usedBytes += counter.getUsedBytes();
+    }
+    return usedBytes;
   }
 
   public long getCacheCapacity() {
@@ -130,24 +170,40 @@ public class PmemVolumeManager {
   /**
    * Try to reserve more bytes on persistent memory.
    *
+   * @param key           The ExtendedBlockId for a block.
+   *
    * @param bytesCount    The number of bytes to add.
    *
    * @return              The new number of usedBytes if we succeeded;
    *                      -1 if we failed.
    */
-  long reserve(long bytesCount) {
-    return usedBytesCount.reserve(bytesCount);
+  synchronized long reserve(ExtendedBlockId key, long bytesCount) {
+    try {
+      byte index = chooseVolume(bytesCount);
+      long usedBytes = usedBytesCounts.get(index).reserve(bytesCount);
+      // Put the entry into blockKeyToVolume if reserving bytes succeeded.
+      if (usedBytes > 0) {
+        blockKeyToVolume.put(key, index);
+      }
+      return usedBytes;
+    } catch (IOException e) {
+      LOG.warn(e.getMessage());
+      return -1L;
+    }
   }
 
   /**
    * Release some bytes that we're using on persistent memory.
    *
+   * @param key           The ExtendedBlockId for a block.
+   *
    * @param bytesCount    The number of bytes to release.
    *
    * @return              The new number of usedBytes.
    */
-  long release(long bytesCount) {
-    return usedBytesCount.release(bytesCount);
+  long release(ExtendedBlockId key, long bytesCount) {
+    Byte index = blockKeyToVolume.remove(key);
+    return usedBytesCounts.get(index).release(bytesCount);
   }
 
   /**
@@ -155,46 +211,70 @@ public class PmemVolumeManager {
    *
    * @throws IOException   If there is no available pmem volume.
    */
-  private void loadVolumes(String[] volumes) throws IOException {
+  private void loadVolumes(String[] volumes)
+      throws IOException {
     // Check whether the volume exists
-    for (String volume: volumes) {
+    for (byte n = 0; n < volumes.length; n++) {
       try {
-        File pmemDir = new File(volume);
-        verifyIfValidPmemVolume(pmemDir);
-        // Remove all files under the volume.
-        FileUtils.cleanDirectory(pmemDir);
+        File pmemDir = new File(volumes[n]);
+        File realPmemDir = verifyIfValidPmemVolume(pmemDir);
+        this.pmemVolumes.add(realPmemDir.getPath());
+        long maxBytes;
+        if (maxBytesPerPmem == -1) {
+          maxBytes = realPmemDir.getUsableSpace();
+        } else {
+          maxBytes = maxBytesPerPmem;
+        }
+        UsedBytesCount usedBytesCount = new UsedBytesCount(maxBytes);
+        this.usedBytesCounts.add(usedBytesCount);
+        LOG.info("Added persistent memory - {} with size={}",
+            volumes[n], maxBytes);
       } catch (IllegalArgumentException e) {
-        LOG.error("Failed to parse persistent memory volume " + volume, e);
+        LOG.error("Failed to parse persistent memory volume " + volumes[n], e);
         continue;
       } catch (IOException e) {
-        LOG.error("Bad persistent memory volume: " + volume, e);
+        LOG.error("Bad persistent memory volume: " + volumes[n], e);
         continue;
       }
-      pmemVolumes.add(volume);
-      LOG.info("Added persistent memory - " + volume);
     }
     count = pmemVolumes.size();
     if (count == 0) {
       throw new IOException(
           "At least one valid persistent memory volume is required!");
     }
+    cleanup();
+  }
+
+  void cleanup() {
+    // Remove all files under the volume.
+    for (String pmemDir: pmemVolumes) {
+      try {
+        FileUtils.cleanDirectory(new File(pmemDir));
+      } catch (IOException e) {
+        LOG.error("Failed to clean up " + pmemDir, e);
+      }
+    }
   }
 
   @VisibleForTesting
-  static void verifyIfValidPmemVolume(File pmemDir)
+  static File verifyIfValidPmemVolume(File pmemDir)
       throws IOException {
     if (!pmemDir.exists()) {
       final String message = pmemDir + " does not exist";
       throw new IOException(message);
     }
-
     if (!pmemDir.isDirectory()) {
       final String message = pmemDir + " is not a directory";
       throw new IllegalArgumentException(message);
     }
 
+    File realPmemDir = new File(getRealPmemDir(pmemDir.getPath()));
+    if (!realPmemDir.exists() && !realPmemDir.mkdir()) {
+      throw new IOException("Failed to create " + realPmemDir.getPath());
+    }
+
     String uuidStr = UUID.randomUUID().toString();
-    String testFilePath = pmemDir.getPath() + "/.verify.pmem." + uuidStr;
+    String testFilePath = realPmemDir.getPath() + "/.verify.pmem." + uuidStr;
     byte[] contents = uuidStr.getBytes("UTF-8");
     RandomAccessFile testFile = null;
     MappedByteBuffer out = null;
@@ -203,15 +283,17 @@ public class PmemVolumeManager {
       out = testFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0,
           contents.length);
       if (out == null) {
-        throw new IOException("Failed to map the test file under " + pmemDir);
+        throw new IOException(
+            "Failed to map the test file under " + realPmemDir);
       }
       out.put(contents);
       // Forces to write data to storage device containing the mapped file
       out.force();
+      return realPmemDir;
     } catch (IOException e) {
       throw new IOException(
           "Exception while writing data to persistent storage dir: " +
-              pmemDir, e);
+              realPmemDir, e);
     } finally {
       if (out != null) {
         out.clear();
@@ -229,18 +311,38 @@ public class PmemVolumeManager {
     }
   }
 
+  public static String getRealPmemDir(String rawPmemDir) {
+    return new File(rawPmemDir, CACHE_DIR).getAbsolutePath();
+  }
+
   /**
    * Choose a persistent memory volume based on a specific algorithm.
    * Currently it is a round-robin policy.
    *
    * TODO: Refine volume selection policy by considering storage utilization.
    */
-  Byte getOneVolumeIndex() throws IOException {
-    if (count != 0) {
-      return (byte)(i++ % count);
-    } else {
+  synchronized Byte chooseVolume(long bytesCount) throws IOException {
+    if (count == 0) {
       throw new IOException("No usable persistent memory is found");
     }
+    int k = 0;
+    long maxAvailableSpace = 0L;
+    while (k++ != count) {
+      if (nextIndex == count) {
+        nextIndex = 0;
+      }
+      byte index = nextIndex++;
+      long availableBytes = usedBytesCounts.get(index).getAvailableBytes();
+      if (availableBytes >= bytesCount) {
+        return index;
+      }
+      if (availableBytes > maxAvailableSpace) {
+        maxAvailableSpace = availableBytes;
+      }
+    }
+    throw new IOException("There is no enough persistent memory space " +
+        "for caching. The current max available space is " +
+        maxAvailableSpace + ", but " + bytesCount + "is required.");
   }
 
   @VisibleForTesting
@@ -276,7 +378,7 @@ public class PmemVolumeManager {
   /**
    * The cache file path is pmemVolume/BlockPoolId-BlockId.
    */
-  public String getCacheFilePath(ExtendedBlockId key) {
+  public String getCachePath(ExtendedBlockId key) {
     Byte volumeIndex = blockKeyToVolume.get(key);
     if (volumeIndex == null) {
       return  null;
@@ -288,19 +390,4 @@ public class PmemVolumeManager {
   Map<ExtendedBlockId, Byte> getBlockKeyToVolume() {
     return blockKeyToVolume;
   }
-
-  /**
-   * Add cached block's ExtendedBlockId and its cache volume index to a map
-   * after cache.
-   */
-  public void afterCache(ExtendedBlockId key, Byte volumeIndex) {
-    blockKeyToVolume.put(key, volumeIndex);
-  }
-
-  /**
-   * Remove the record in blockKeyToVolume for uncached block after uncache.
-   */
-  public void afterUncache(ExtendedBlockId key) {
-    blockKeyToVolume.remove(key);
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 5ab8207..f29d4351 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2539,18 +2539,6 @@
 </property>
 
 <property>
-  <name>dfs.datanode.cache.loader.class</name>
-  <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader</value>
-  <description>
-    Currently, the available cache loaders are MemoryMappableBlockLoader, PmemMappableBlockLoader.
-    By default, MemoryMappableBlockLoader is used and it maps block replica into memory.
-    PmemMappableBlockLoader can map block to persistent memory with mapped byte buffer, which is
-    implemented by Java code. The value of dfs.datanode.cache.pmem.dirs specifies the persistent
-    memory directory.
-  </description>
-</property>
-
-<property>
   <name>dfs.datanode.max.locked.memory</name>
   <value>0</value>
   <description>
@@ -2567,18 +2555,6 @@
 </property>
 
 <property>
-  <name>dfs.datanode.cache.pmem.capacity</name>
-  <value>0</value>
-  <description>
-    The amount of persistent memory in bytes that can be used to cache block
-    replicas to persistent memory. Currently, persistent memory is only enabled
-    in HDFS Centralized Cache Management feature.
-
-    By default, this parameter is 0, which disables persistent memory caching.
-  </description>
-</property>
-
-<property>
   <name>dfs.datanode.cache.pmem.dirs</name>
   <value></value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
index 9b4f06f..58812db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
@@ -21,8 +21,6 @@ import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
@@ -139,14 +137,11 @@ public class TestCacheByPmemMappableBlockLoader {
     conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
 
     // Configuration for pmem cache
-    conf.set(DFS_DATANODE_CACHE_LOADER_CLASS,
-        "org.apache.hadoop.hdfs.server.datanode." +
-            "fsdataset.impl.PmemMappableBlockLoader");
     new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
     new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
     // Configure two bogus pmem volumes
     conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
-    conf.setLong(DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, CACHE_CAPACITY);
+    PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5));
 
     prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
     NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
@@ -183,18 +178,17 @@ public class TestCacheByPmemMappableBlockLoader {
 
   @Test
   public void testPmemVolumeManager() throws IOException {
-    PmemVolumeManager pmemVolumeManager =
-        cacheLoader.getPmemVolumeManager();
+    PmemVolumeManager pmemVolumeManager = PmemVolumeManager.getInstance();
     assertNotNull(pmemVolumeManager);
     assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity());
     // Test round-robin selection policy
     long count1 = 0, count2 = 0;
     for (int i = 0; i < 10; i++) {
-      Byte index = pmemVolumeManager.getOneVolumeIndex();
+      Byte index = pmemVolumeManager.chooseVolume(BLOCK_SIZE);
       String volume = pmemVolumeManager.getVolumeByIndex(index);
-      if (volume.equals(PMEM_DIR_0)) {
+      if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_0))) {
         count1++;
-      } else if (volume.equals(PMEM_DIR_1)) {
+      } else if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_1))) {
         count2++;
       } else {
         fail("Unexpected persistent storage location:" + volume);
@@ -254,7 +248,7 @@ public class TestCacheByPmemMappableBlockLoader {
     // The pmem cache space is expected to have been used up.
     assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed());
     Map<ExtendedBlockId, Byte> blockKeyToVolume =
-        cacheLoader.getPmemVolumeManager().getBlockKeyToVolume();
+        PmemVolumeManager.getInstance().getBlockKeyToVolume();
     // All block keys should be kept in blockKeyToVolume
     assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
     assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
@@ -266,11 +260,13 @@ public class TestCacheByPmemMappableBlockLoader {
       // to pmem.
       assertNotNull(cachePath);
       String expectFileName =
-          cacheLoader.getPmemVolumeManager().getCacheFileName(key);
+          PmemVolumeManager.getInstance().getCacheFileName(key);
       if (cachePath.startsWith(PMEM_DIR_0)) {
-        assertTrue(cachePath.equals(PMEM_DIR_0 + "/" + expectFileName));
+        assertTrue(cachePath.equals(PmemVolumeManager
+            .getRealPmemDir(PMEM_DIR_0) + "/" + expectFileName));
       } else if (cachePath.startsWith(PMEM_DIR_1)) {
-        assertTrue(cachePath.equals(PMEM_DIR_1 + "/" + expectFileName));
+        assertTrue(cachePath.equals(PmemVolumeManager
+            .getRealPmemDir(PMEM_DIR_1) + "/" + expectFileName));
       } else {
         fail("The cache path is not the expected one: " + cachePath);
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
index 7e97960..bd8e20c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
@@ -401,9 +401,10 @@ public class TestFsDatasetCache {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
+        // check the log reported by FsDataSetCache
+        // in the case that cache capacity is exceeded.
         int lines = appender.countLinesWithMessage(
-            "more bytes in the cache: " +
-            DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
+            "could not reserve more bytes in the cache: ");
         return lines > 0;
       }
     }, 500, 30000);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 09/10: HDFS-14700. Clean up pmem cache before setting pmem cache capacity. Contributed by Feilong He.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit dc2fad4bd36944fc58d90c7618f125bdaac14d85
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Fri Aug 9 14:07:54 2019 +0530

    HDFS-14700. Clean up pmem cache before setting pmem cache capacity. Contributed by Feilong He.
    
    (cherry picked from commit f6fa865d6fcb0ef0a25a00615f16f383e5032373)
---
 .../datanode/fsdataset/impl/PmemVolumeManager.java    | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
index 2d77f7a..969d18b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
@@ -218,6 +218,8 @@ public final class PmemVolumeManager {
       try {
         File pmemDir = new File(volumes[n]);
         File realPmemDir = verifyIfValidPmemVolume(pmemDir);
+        // Clean up the cache left before, if any.
+        cleanup(realPmemDir);
         this.pmemVolumes.add(realPmemDir.getPath());
         long maxBytes;
         if (maxBytesPerPmem == -1) {
@@ -242,17 +244,20 @@ public final class PmemVolumeManager {
       throw new IOException(
           "At least one valid persistent memory volume is required!");
     }
-    cleanup();
+  }
+
+  void cleanup(File realPmemDir) {
+    try {
+      FileUtils.cleanDirectory(realPmemDir);
+    } catch (IOException e) {
+      LOG.error("Failed to clean up " + realPmemDir.getPath(), e);
+    }
   }
 
   void cleanup() {
     // Remove all files under the volume.
-    for (String pmemDir: pmemVolumes) {
-      try {
-        FileUtils.cleanDirectory(new File(pmemDir));
-      } catch (IOException e) {
-        LOG.error("Failed to clean up " + pmemDir, e);
-      }
+    for (String pmemVolume : pmemVolumes) {
+      cleanup(new File(pmemVolume));
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 07/10: HDFS-14458. Report pmem stats to namenode. Contributed by Feilong He.

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 75c66bf044d21c5742cdfb2e8eaebb1ac95e736f
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Mon Jul 15 13:02:37 2019 +0530

    HDFS-14458. Report pmem stats to namenode. Contributed by Feilong He.
    
    (cherry picked from commit e98adb00b7da8fa913b86ecf2049444b1d8617d4)
---
 .../{MemoryCacheStats.java => CacheStats.java}     |  6 +--
 .../datanode/fsdataset/impl/FsDatasetCache.java    | 54 ++++++----------------
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |  4 +-
 .../fsdataset/impl/MappableBlockLoader.java        |  3 +-
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |  8 ++--
 .../impl/NativePmemMappableBlockLoader.java        |  5 +-
 .../fsdataset/impl/PmemMappableBlockLoader.java    |  9 +++-
 .../impl/TestCacheByPmemMappableBlockLoader.java   | 14 +++---
 .../fsdataset/impl/TestFsDatasetCache.java         |  2 +-
 9 files changed, 45 insertions(+), 60 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/CacheStats.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java
rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/CacheStats.java
index d276c27..f79b7c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/CacheStats.java
@@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 /**
  * Keeps statistics for the memory cache.
  */
-class MemoryCacheStats {
+class CacheStats {
 
   /**
    * The approximate amount of cache space in use.
@@ -47,7 +47,7 @@ class MemoryCacheStats {
    */
   private final long maxBytes;
 
-  MemoryCacheStats(long maxBytes) {
+  CacheStats(long maxBytes) {
     this.usedBytesCount = new UsedBytesCount();
     this.maxBytes = maxBytes;
   }
@@ -81,7 +81,7 @@ class MemoryCacheStats {
   private class UsedBytesCount {
     private final AtomicLong usedBytes = new AtomicLong(0);
 
-    private MemoryCacheStats.PageRounder rounder = new PageRounder();
+    private CacheStats.PageRounder rounder = new PageRounder();
 
     /**
      * Try to reserve more bytes.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 37e548e..1514927 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -23,7 +23,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -137,7 +136,7 @@ public class FsDatasetCache {
    */
   private final MappableBlockLoader cacheLoader;
 
-  private final MemoryCacheStats memCacheStats;
+  private final CacheStats memCacheStats;
 
   /**
    * Number of cache commands that could not be completed successfully
@@ -178,30 +177,17 @@ public class FsDatasetCache {
               ".  Reconfigure this to " + minRevocationPollingMs);
     }
     this.revocationPollingMs = confRevocationPollingMs;
-    // Both lazy writer and read cache are sharing this statistics.
-    this.memCacheStats = new MemoryCacheStats(
-        dataset.datanode.getDnConf().getMaxLockedMemory());
 
     this.cacheLoader = MappableBlockLoaderFactory.createCacheLoader(
         this.getDnConf());
-    cacheLoader.initialize(this);
-  }
-
-  /**
-   * Check if pmem cache is enabled.
-   */
-  private boolean isPmemCacheEnabled() {
-    return !cacheLoader.isTransientCache();
+    // Both lazy writer and read cache are sharing this statistics.
+    this.memCacheStats = cacheLoader.initialize(this.getDnConf());
   }
 
   DNConf getDnConf() {
     return this.dataset.datanode.getDnConf();
   }
 
-  MemoryCacheStats getMemCacheStats() {
-    return memCacheStats;
-  }
-
   /**
    * Get the cache path if the replica is cached into persistent memory.
    */
@@ -557,37 +543,32 @@ public class FsDatasetCache {
   /**
    * Get the approximate amount of DRAM cache space used.
    */
-  public long getCacheUsed() {
+  public long getMemCacheUsed() {
     return memCacheStats.getCacheUsed();
   }
 
   /**
-   * Get the approximate amount of persistent memory cache space used.
-   * TODO: advertise this metric to NameNode by FSDatasetMBean
+   * Get the approximate amount of cache space used either on DRAM or
+   * on persistent memory.
+   * @return
    */
-  public long getPmemCacheUsed() {
-    if (isPmemCacheEnabled()) {
-      return cacheLoader.getCacheUsed();
-    }
-    return 0;
+  public long getCacheUsed() {
+    return cacheLoader.getCacheUsed();
   }
 
   /**
-   * Get the maximum amount of bytes we can cache on DRAM.  This is a constant.
+   * Get the maximum amount of bytes we can cache on DRAM. This is a constant.
    */
-  public long getCacheCapacity() {
+  public long getMemCacheCapacity() {
     return memCacheStats.getCacheCapacity();
   }
 
   /**
-   * Get cache capacity of persistent memory.
-   * TODO: advertise this metric to NameNode by FSDatasetMBean
+   * Get the maximum amount of bytes we can cache either on DRAM or
+   * on persistent memory. This is a constant.
    */
-  public long getPmemCacheCapacity() {
-    if (isPmemCacheEnabled()) {
-      return cacheLoader.getCacheCapacity();
-    }
-    return 0;
+  public long getCacheCapacity() {
+    return cacheLoader.getCacheCapacity();
   }
 
   public long getNumBlocksFailedToCache() {
@@ -608,11 +589,6 @@ public class FsDatasetCache {
     return (val != null) && val.state.shouldAdvertise();
   }
 
-  @VisibleForTesting
-  MappableBlockLoader getCacheLoader() {
-    return cacheLoader;
-  }
-
   /**
    * This method can be executed during DataNode shutdown.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index ee76f2a..beede18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -3186,10 +3186,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     public void evictBlocks(long bytesNeeded) throws IOException {
       int iterations = 0;
 
-      final long cacheCapacity = cacheManager.getCacheCapacity();
+      final long cacheCapacity = cacheManager.getMemCacheCapacity();
 
       while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
-             (cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) {
+             (cacheCapacity - cacheManager.getMemCacheUsed()) < bytesNeeded) {
         RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
 
         if (replicaState == null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 5b9ba3a..5118774 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.util.DataChecksum;
 
 import java.io.BufferedInputStream;
@@ -43,7 +44,7 @@ public abstract class MappableBlockLoader {
   /**
    * Initialize a specific MappableBlockLoader.
    */
-  abstract void initialize(FsDatasetCache cacheManager) throws IOException;
+  abstract CacheStats initialize(DNConf dnConf) throws IOException;
 
   /**
    * Load the block.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index dd4188c..f5a9a41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,12 +40,13 @@ import java.nio.channels.FileChannel;
 public class MemoryMappableBlockLoader extends MappableBlockLoader {
   private static final Logger LOG =
       LoggerFactory.getLogger(MemoryMappableBlockLoader.class);
-  private MemoryCacheStats memCacheStats;
+  private CacheStats memCacheStats;
 
   @Override
-  void initialize(FsDatasetCache cacheManager) throws IOException {
+  CacheStats initialize(DNConf dnConf) throws IOException {
     LOG.info("Initializing cache loader: MemoryMappableBlockLoader.");
-    this.memCacheStats = cacheManager.getMemCacheStats();
+    this.memCacheStats = new CacheStats(dnConf.getMaxLockedMemory());
+    return memCacheStats;
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
index 09e9454..a5af437 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
 import org.apache.hadoop.util.DataChecksum;
@@ -47,8 +48,8 @@ public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
       LoggerFactory.getLogger(NativePmemMappableBlockLoader.class);
 
   @Override
-  void initialize(FsDatasetCache cacheManager) throws IOException {
-    super.initialize(cacheManager);
+  CacheStats initialize(DNConf dnConf) throws IOException {
+    return super.initialize(dnConf);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index 70a42c4..19dcc4b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -42,11 +42,16 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
   private PmemVolumeManager pmemVolumeManager;
 
   @Override
-  void initialize(FsDatasetCache cacheManager) throws IOException {
+  CacheStats initialize(DNConf dnConf) throws IOException {
     LOG.info("Initializing cache loader: " + this.getClass().getName());
-    DNConf dnConf = cacheManager.getDnConf();
     PmemVolumeManager.init(dnConf.getPmemVolumes());
     pmemVolumeManager = PmemVolumeManager.getInstance();
+    // The configuration for max locked memory is shaded.
+    LOG.info("Persistent memory is used for caching data instead of " +
+        "DRAM. Max locked memory is set to zero to disable DRAM cache");
+    // TODO: PMem is not supporting Lazy Writer now, will refine this stats
+    // while implementing it.
+    return new CacheStats(0L);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
index 58812db..3c1b705 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
@@ -84,7 +84,6 @@ public class TestCacheByPmemMappableBlockLoader {
   private static DistributedFileSystem fs;
   private static DataNode dn;
   private static FsDatasetCache cacheManager;
-  private static PmemMappableBlockLoader cacheLoader;
   /**
    * Used to pause DN BPServiceActor threads. BPSA threads acquire the
    * shared read lock. The test acquires the write lock for exclusive access.
@@ -131,8 +130,6 @@ public class TestCacheByPmemMappableBlockLoader {
         DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
     conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
-        CACHE_CAPACITY);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
 
@@ -153,7 +150,6 @@ public class TestCacheByPmemMappableBlockLoader {
     fs = cluster.getFileSystem();
     dn = cluster.getDataNodes().get(0);
     cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
-    cacheLoader = (PmemMappableBlockLoader) cacheManager.getCacheLoader();
   }
 
   @After
@@ -216,7 +212,9 @@ public class TestCacheByPmemMappableBlockLoader {
         Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
     BlockReaderTestUtil.enableHdfsCachingTracing();
     Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
-    assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheCapacity());
+    assertEquals(CACHE_CAPACITY, cacheManager.getCacheCapacity());
+    // DRAM cache is expected to be disabled.
+    assertEquals(0L, cacheManager.getMemCacheCapacity());
 
     final Path testFile = new Path("/testFile");
     final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE;
@@ -246,7 +244,9 @@ public class TestCacheByPmemMappableBlockLoader {
     }, 1000, 30000);
 
     // The pmem cache space is expected to have been used up.
-    assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed());
+    assertEquals(CACHE_CAPACITY, cacheManager.getCacheUsed());
+    // There should be no cache used on DRAM.
+    assertEquals(0L, cacheManager.getMemCacheUsed());
     Map<ExtendedBlockId, Byte> blockKeyToVolume =
         PmemVolumeManager.getInstance().getBlockKeyToVolume();
     // All block keys should be kept in blockKeyToVolume
@@ -318,7 +318,7 @@ public class TestCacheByPmemMappableBlockLoader {
     }, 1000, 30000);
 
     // It is expected that no pmem cache space is used.
-    assertEquals(0, cacheManager.getPmemCacheUsed());
+    assertEquals(0, cacheManager.getCacheUsed());
     // No record should be kept by blockKeyToVolume after testFile is uncached.
     assertEquals(blockKeyToVolume.size(), 0);
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
index bd8e20c..0ce19b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
@@ -63,7 +63,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryCacheStats.PageRounder;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.CacheStats.PageRounder;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org