You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2019/03/31 06:55:00 UTC

[hadoop] branch trunk updated: HDFS-14355 : Implement HDFS cache on SCM by using pure java mapped byte buffer. Contributed by Feilong He.

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 35ff31d  HDFS-14355 : Implement HDFS cache on SCM by using pure java mapped byte buffer. Contributed by Feilong He.
35ff31d is described below

commit 35ff31dd9462cf4fb4ebf5556ee8ae6bcd7c5c3a
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Sat Mar 30 23:33:25 2019 -0700

    HDFS-14355 : Implement HDFS cache on SCM by using pure java mapped byte buffer. Contributed by Feilong He.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  19 ++
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |  31 ++
 .../datanode/fsdataset/impl/FsDatasetCache.java    |  90 +++++-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |  20 +-
 .../datanode/fsdataset/impl/FsDatasetUtil.java     |  27 ++
 .../fsdataset/impl/MappableBlockLoader.java        |  41 ++-
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |  45 ++-
 ...ockLoader.java => PmemMappableBlockLoader.java} | 132 ++++++---
 .../datanode/fsdataset/impl/PmemMappedBlock.java   |  70 +++++
 .../datanode/fsdataset/impl/PmemVolumeManager.java | 306 +++++++++++++++++++
 .../src/main/resources/hdfs-default.xml            |  34 +++
 .../impl/TestCacheByPmemMappableBlockLoader.java   | 329 +++++++++++++++++++++
 12 files changed, 1077 insertions(+), 67 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b3cd0a4..5139f1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@@ -390,6 +392,23 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms";
   public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
 
+  // Currently, the available cache loaders are MemoryMappableBlockLoader,
+  // PmemMappableBlockLoader. MemoryMappableBlockLoader is the default cache
+  // loader to cache block replica to memory.
+  public static final String DFS_DATANODE_CACHE_LOADER_CLASS =
+      "dfs.datanode.cache.loader.class";
+  public static final Class<? extends MappableBlockLoader>
+      DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT =
+      MemoryMappableBlockLoader.class;
+  // Multiple dirs separated by "," are acceptable.
+  public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY =
+      "dfs.datanode.cache.pmem.dirs";
+  public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = "";
+  // The cache capacity of persistent memory
+  public static final String DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY =
+      "dfs.datanode.cache.pmem.capacity";
+  public static final long DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT = 0L;
+
   public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
   public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index de26c90..69375b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -27,6 +27,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
@@ -64,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 
 import java.util.concurrent.TimeUnit;
@@ -112,7 +118,10 @@ public class DNConf {
   final long xceiverStopTimeout;
   final long restartReplicaExpiry;
 
+  private final Class<? extends MappableBlockLoader> cacheLoaderClass;
   final long maxLockedMemory;
+  private final long maxLockedPmem;
+  private final String[] pmemDirs;
 
   private final long bpReadyTimeout;
 
@@ -251,10 +260,20 @@ public class DNConf {
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
 
+    this.cacheLoaderClass = getConf().getClass(DFS_DATANODE_CACHE_LOADER_CLASS,
+        DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT, MappableBlockLoader.class);
+
     this.maxLockedMemory = getConf().getLongBytes(
         DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
 
+    this.maxLockedPmem = getConf().getLongBytes(
+        DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY,
+        DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT);
+
+    this.pmemDirs = getConf().getTrimmedStrings(
+        DFS_DATANODE_CACHE_PMEM_DIRS_KEY);
+
     this.restartReplicaExpiry = getConf().getLong(
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
@@ -317,6 +336,10 @@ public class DNConf {
     return maxLockedMemory;
   }
 
+  public long getMaxLockedPmem() {
+    return maxLockedPmem;
+  }
+
   /**
    * Returns true if connect to datanode via hostname
    * 
@@ -419,4 +442,12 @@ public class DNConf {
   int getMaxDataLength() {
     return maxDataLength;
   }
+
+  public Class<? extends MappableBlockLoader> getCacheLoaderClass() {
+    return cacheLoaderClass;
+  }
+
+  public String[] getPmemVolumes() {
+    return pmemDirs;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index f90a4b1..dce84c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -48,10 +49,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,7 +132,11 @@ public class FsDatasetCache {
 
   private final long revocationPollingMs;
 
-  private final MappableBlockLoader mappableBlockLoader;
+  /**
+   * A specific cacheLoader could cache block either to DRAM or
+   * to persistent memory.
+   */
+  private final MappableBlockLoader cacheLoader;
 
   private final MemoryCacheStats memCacheStats;
 
@@ -173,11 +179,41 @@ public class FsDatasetCache {
               ".  Reconfigure this to " + minRevocationPollingMs);
     }
     this.revocationPollingMs = confRevocationPollingMs;
-
-    this.mappableBlockLoader = new MemoryMappableBlockLoader(this);
     // Both lazy writer and read cache are sharing this statistics.
     this.memCacheStats = new MemoryCacheStats(
         dataset.datanode.getDnConf().getMaxLockedMemory());
+
+    Class<? extends MappableBlockLoader> cacheLoaderClass =
+        dataset.datanode.getDnConf().getCacheLoaderClass();
+    this.cacheLoader = ReflectionUtils.newInstance(cacheLoaderClass, null);
+    cacheLoader.initialize(this);
+  }
+
+  /**
+   * Check if pmem cache is enabled.
+   */
+  private boolean isPmemCacheEnabled() {
+    return !cacheLoader.isTransientCache();
+  }
+
+  DNConf getDnConf() {
+    return this.dataset.datanode.getDnConf();
+  }
+
+  MemoryCacheStats getMemCacheStats() {
+    return memCacheStats;
+  }
+
+  /**
+   * Get the cache path if the replica is cached into persistent memory.
+   */
+  String getReplicaCachePath(String bpid, long blockId) {
+    if (cacheLoader.isTransientCache() ||
+        !isCached(bpid, blockId)) {
+      return null;
+    }
+    ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
+    return cacheLoader.getCachedPath(key);
   }
 
   /**
@@ -344,14 +380,14 @@ public class FsDatasetCache {
       MappableBlock mappableBlock = null;
       ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
           key.getBlockId(), length, genstamp);
-      long newUsedBytes = mappableBlockLoader.reserve(length);
+      long newUsedBytes = cacheLoader.reserve(length);
       boolean reservedBytes = false;
       try {
         if (newUsedBytes < 0) {
           LOG.warn("Failed to cache " + key + ": could not reserve " + length +
               " more bytes in the cache: " +
-              DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
-              " of " + memCacheStats.getCacheCapacity() + " exceeded.");
+              cacheLoader.getCacheCapacityConfigKey() +
+              " of " + cacheLoader.getCacheCapacity() + " exceeded.");
           return;
         }
         reservedBytes = true;
@@ -370,8 +406,9 @@ public class FsDatasetCache {
           LOG.warn("Failed to cache " + key + ": failed to open file", e);
           return;
         }
+
         try {
-          mappableBlock = mappableBlockLoader.load(length, blockIn, metaIn,
+          mappableBlock = cacheLoader.load(length, blockIn, metaIn,
               blockFileName, key);
         } catch (ChecksumException e) {
           // Exception message is bogus since this wasn't caused by a file read
@@ -381,6 +418,7 @@ public class FsDatasetCache {
           LOG.warn("Failed to cache the block [key=" + key + "]!", e);
           return;
         }
+
         synchronized (FsDatasetCache.this) {
           Value value = mappableBlockMap.get(key);
           Preconditions.checkNotNull(value);
@@ -404,7 +442,7 @@ public class FsDatasetCache {
         IOUtils.closeQuietly(metaIn);
         if (!success) {
           if (reservedBytes) {
-            mappableBlockLoader.release(length);
+            cacheLoader.release(length);
           }
           LOG.debug("Caching of {} was aborted.  We are now caching only {} "
                   + "bytes in total.", key, memCacheStats.getCacheUsed());
@@ -481,8 +519,7 @@ public class FsDatasetCache {
       synchronized (FsDatasetCache.this) {
         mappableBlockMap.remove(key);
       }
-      long newUsedBytes = mappableBlockLoader
-          .release(value.mappableBlock.getLength());
+      long newUsedBytes = cacheLoader.release(value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
       dataset.datanode.getMetrics().incrBlocksUncached(1);
       if (revocationTimeMs != 0) {
@@ -498,19 +535,41 @@ public class FsDatasetCache {
   // Stats related methods for FSDatasetMBean
 
   /**
-   * Get the approximate amount of cache space used.
+   * Get the approximate amount of DRAM cache space used.
    */
   public long getCacheUsed() {
     return memCacheStats.getCacheUsed();
   }
 
   /**
-   * Get the maximum amount of bytes we can cache.  This is a constant.
+   * Get the approximate amount of persistent memory cache space used.
+   * TODO: advertise this metric to NameNode by FSDatasetMBean
+   */
+  public long getPmemCacheUsed() {
+    if (isPmemCacheEnabled()) {
+      return cacheLoader.getCacheUsed();
+    }
+    return 0;
+  }
+
+  /**
+   * Get the maximum amount of bytes we can cache on DRAM.  This is a constant.
    */
   public long getCacheCapacity() {
     return memCacheStats.getCacheCapacity();
   }
 
+  /**
+   * Get cache capacity of persistent memory.
+   * TODO: advertise this metric to NameNode by FSDatasetMBean
+   */
+  public long getPmemCacheCapacity() {
+    if (isPmemCacheEnabled()) {
+      return cacheLoader.getCacheCapacity();
+    }
+    return 0;
+  }
+
   public long getNumBlocksFailedToCache() {
     return numBlocksFailedToCache.get();
   }
@@ -528,4 +587,9 @@ public class FsDatasetCache {
     Value val = mappableBlockMap.get(block);
     return (val != null) && val.state.shouldAdvertise();
   }
+
+  @VisibleForTesting
+  MappableBlockLoader getCacheLoader() {
+    return cacheLoader;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index ad43b45..6fe0d0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -787,11 +787,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       datanode.getMetrics().incrRamDiskBlocksReadHits();
     }
 
-    if (info != null) {
-      return info.getDataInputStream(seekOffset);
-    } else {
+    if (info == null) {
       throw new IOException("No data exists for block " + b);
     }
+    return getBlockInputStreamWithCheckingPmemCache(info, b, seekOffset);
+  }
+
+  /**
+   * Check whether the replica is cached to persistent memory.
+   * If so, get DataInputStream of the corresponding cache file on pmem.
+   */
+  private InputStream getBlockInputStreamWithCheckingPmemCache(
+      ReplicaInfo info, ExtendedBlock b, long seekOffset) throws IOException {
+    String cachePath = cacheManager.getReplicaCachePath(
+        b.getBlockPoolId(), b.getBlockId());
+    if (cachePath != null) {
+      return FsDatasetUtil.getInputStreamAndSeek(
+          new File(cachePath), seekOffset);
+    }
+    return info.getDataInputStream(seekOffset);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 8a3b237..92c0888 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -27,6 +27,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.net.URI;
+import java.nio.channels.Channels;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Arrays;
 
 import com.google.common.base.Preconditions;
@@ -116,6 +119,19 @@ public class FsDatasetUtil {
     }
   }
 
+  public static InputStream getInputStreamAndSeek(File file, long offset)
+      throws IOException {
+    RandomAccessFile raf = null;
+    try {
+      raf = new RandomAccessFile(file, "r");
+      raf.seek(offset);
+      return Channels.newInputStream(raf.getChannel());
+    } catch(IOException ioe) {
+      IOUtils.cleanupWithLogger(null, raf);
+      throw ioe;
+    }
+  }
+
   /**
    * Find the meta-file for the specified block file and then return the
    * generation stamp from the name of the meta-file. Generally meta file will
@@ -183,4 +199,15 @@ public class FsDatasetUtil {
 
     FsDatasetImpl.computeChecksum(wrapper, dstMeta, smallBufferSize, conf);
   }
+
+  public static void deleteMappedFile(String filePath) throws IOException {
+    if (filePath == null) {
+      throw new IOException("The filePath should not be null!");
+    }
+    boolean result = Files.deleteIfExists(Paths.get(filePath));
+    if (!result) {
+      throw new IOException(
+          "Failed to delete the mapped file: " + filePath);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 0f5ec2d..a9e9610 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -35,6 +35,11 @@ import java.nio.channels.FileChannel;
 public abstract class MappableBlockLoader {
 
   /**
+   * Initialize a specific MappableBlockLoader.
+   */
+  abstract void initialize(FsDatasetCache cacheManager) throws IOException;
+
+  /**
    * Load the block.
    *
    * Map the block, and then verify its checksum.
@@ -60,24 +65,48 @@ public abstract class MappableBlockLoader {
   /**
    * Try to reserve some given bytes.
    *
-   * @param bytesCount
-   *          The number of bytes to add.
+   * @param bytesCount    The number of bytes to add.
    *
-   * @return The new number of usedBytes if we succeeded; -1 if we failed.
+   * @return              The new number of usedBytes if we succeeded;
+   *                      -1 if we failed.
    */
   abstract long reserve(long bytesCount);
 
   /**
    * Release some bytes that we're using.
    *
-   * @param bytesCount
-   *          The number of bytes to release.
+   * @param bytesCount    The number of bytes to release.
    *
-   * @return The new number of usedBytes.
+   * @return              The new number of usedBytes.
    */
   abstract long release(long bytesCount);
 
   /**
+   * Get the config key of cache capacity.
+   */
+  abstract String getCacheCapacityConfigKey();
+
+  /**
+   * Get the approximate amount of cache space used.
+   */
+  abstract long getCacheUsed();
+
+  /**
+   * Get the maximum amount of cache bytes.
+   */
+  abstract long getCacheCapacity();
+
+  /**
+   * Check whether the cache is non-volatile.
+   */
+  abstract boolean isTransientCache();
+
+  /**
+   * Get a cache file path if applicable. Otherwise return null.
+   */
+  abstract String getCachedPath(ExtendedBlockId key);
+
+  /**
    * Reads bytes into a buffer until EOF or the buffer's limit is reached.
    */
   protected int fillBuffer(FileChannel channel, ByteBuffer buf)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index d93193e..4b7af19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -42,16 +43,11 @@ import java.nio.channels.FileChannel;
 @InterfaceStability.Unstable
 public class MemoryMappableBlockLoader extends MappableBlockLoader {
 
-  private final FsDatasetCache cacheManager;
+  private MemoryCacheStats memCacheStats;
 
-  /**
-   * Constructs memory mappable loader.
-   *
-   * @param cacheManager
-   *          FsDatasetCache reference.
-   */
-  MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
-    this.cacheManager = cacheManager;
+  @Override
+  void initialize(FsDatasetCache cacheManager) throws IOException {
+    this.memCacheStats = cacheManager.getMemCacheStats();
   }
 
   /**
@@ -72,7 +68,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
    * @return               The Mappable block.
    */
   @Override
-  public MappableBlock load(long length, FileInputStream blockIn,
+  MappableBlock load(long length, FileInputStream blockIn,
                             FileInputStream metaIn, String blockFileName,
                             ExtendedBlockId key)
       throws IOException {
@@ -153,12 +149,37 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
+  public String getCacheCapacityConfigKey() {
+    return DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+  }
+
+  @Override
+  public long getCacheUsed() {
+    return memCacheStats.getCacheUsed();
+  }
+
+  @Override
+  public long getCacheCapacity() {
+    return memCacheStats.getCacheCapacity();
+  }
+
+  @Override
   long reserve(long bytesCount) {
-    return cacheManager.reserve(bytesCount);
+    return memCacheStats.reserve(bytesCount);
   }
 
   @Override
   long release(long bytesCount) {
-    return cacheManager.release(bytesCount);
+    return memCacheStats.release(bytesCount);
+  }
+
+  @Override
+  public boolean isTransientCache() {
+    return true;
+  }
+
+  @Override
+  public String getCachedPath(ExtendedBlockId key) {
+    return null;
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
similarity index 55%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index d93193e..c581d31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -18,46 +18,58 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
 /**
- * Maps block to memory.
+ * Maps block to persistent memory by using mapped byte buffer.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MemoryMappableBlockLoader extends MappableBlockLoader {
+public class PmemMappableBlockLoader extends MappableBlockLoader {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PmemMappableBlockLoader.class);
+  private PmemVolumeManager pmemVolumeManager;
 
-  private final FsDatasetCache cacheManager;
+  @Override
+  void initialize(FsDatasetCache cacheManager) throws IOException {
+    DNConf dnConf = cacheManager.getDnConf();
+    this.pmemVolumeManager = new PmemVolumeManager(dnConf.getMaxLockedPmem(),
+        dnConf.getPmemVolumes());
+  }
 
-  /**
-   * Constructs memory mappable loader.
-   *
-   * @param cacheManager
-   *          FsDatasetCache reference.
-   */
-  MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
-    this.cacheManager = cacheManager;
+  @VisibleForTesting
+  PmemVolumeManager getPmemVolumeManager() {
+    return pmemVolumeManager;
   }
 
   /**
    * Load the block.
    *
-   * mmap and mlock the block, and then verify its checksum.
+   * Map the block and verify its checksum.
+   *
+   * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
+   * is a persistent memory volume selected by getOneLocation() method.
    *
    * @param length         The current length of the block.
    * @param blockIn        The block input stream. Should be positioned at the
@@ -67,43 +79,63 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
    * @param blockFileName  The block file name, for logging purposes.
    * @param key            The extended block ID.
    *
-   * @throws IOException   If mapping block to memory fails or checksum fails.
-
+   * @throws IOException   If mapping block fails or checksum fails.
+   *
    * @return               The Mappable block.
    */
   @Override
-  public MappableBlock load(long length, FileInputStream blockIn,
+  MappableBlock load(long length, FileInputStream blockIn,
                             FileInputStream metaIn, String blockFileName,
                             ExtendedBlockId key)
       throws IOException {
-    MemoryMappedBlock mappableBlock = null;
-    MappedByteBuffer mmap = null;
+    PmemMappedBlock mappableBlock = null;
+    String filePath = null;
+
     FileChannel blockChannel = null;
+    RandomAccessFile file = null;
+    MappedByteBuffer out = null;
     try {
       blockChannel = blockIn.getChannel();
       if (blockChannel == null) {
         throw new IOException("Block InputStream has no FileChannel.");
       }
-      mmap = blockChannel.map(FileChannel.MapMode.READ_ONLY, 0, length);
-      NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
-      verifyChecksum(length, metaIn, blockChannel, blockFileName);
-      mappableBlock = new MemoryMappedBlock(mmap, length);
+
+      Byte volumeIndex = pmemVolumeManager.getOneVolumeIndex();
+      filePath = pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
+      file = new RandomAccessFile(filePath, "rw");
+      out = file.getChannel().
+          map(FileChannel.MapMode.READ_WRITE, 0, length);
+      if (out == null) {
+        throw new IOException("Failed to map the block " + blockFileName +
+            " to persistent storage.");
+      }
+      verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
+          blockFileName);
+      mappableBlock = new PmemMappedBlock(
+          length, volumeIndex, key, pmemVolumeManager);
+      pmemVolumeManager.afterCache(key, volumeIndex);
+      LOG.info("Successfully cached one replica:{} into persistent memory"
+          + ", [cached path={}, length={}]", key, filePath, length);
     } finally {
       IOUtils.closeQuietly(blockChannel);
+      if (out != null) {
+        NativeIO.POSIX.munmap(out);
+      }
+      IOUtils.closeQuietly(file);
       if (mappableBlock == null) {
-        if (mmap != null) {
-          NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
-        }
+        FsDatasetUtil.deleteMappedFile(filePath);
       }
     }
     return mappableBlock;
   }
 
   /**
-   * Verifies the block's checksum. This is an I/O intensive operation.
+   * Verifies the block's checksum meanwhile maps block to persistent memory.
+   * This is an I/O intensive operation.
    */
-  private void verifyChecksum(long length, FileInputStream metaIn,
-                             FileChannel blockChannel, String blockFileName)
+  private void verifyChecksumAndMapBlock(
+      MappedByteBuffer out, long length, FileInputStream metaIn,
+      FileChannel blockChannel, String blockFileName)
       throws IOException {
     // Verify the checksum from the block's meta file
     // Get the DataChecksum from the meta file header
@@ -115,8 +147,8 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
     try {
       metaChannel = metaIn.getChannel();
       if (metaChannel == null) {
-        throw new IOException(
-            "Block InputStream meta file has no FileChannel.");
+        throw new IOException("Cannot get FileChannel from " +
+            "Block InputStream meta file.");
       }
       DataChecksum checksum = header.getChecksum();
       final int bytesPerChecksum = checksum.getBytesPerChecksum();
@@ -132,7 +164,9 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
         assert bytesVerified % bytesPerChecksum == 0;
         int bytesRead = fillBuffer(blockChannel, blockBuf);
         if (bytesRead == -1) {
-          throw new IOException("checksum verification failed: premature EOF");
+          throw new IOException(
+              "Checksum verification failed for the block " + blockFileName +
+                  ": premature EOF");
         }
         blockBuf.flip();
         // Number of read chunks, including partial chunk at end
@@ -142,23 +176,55 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
         checksumBuf.flip();
         checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
             bytesVerified);
-        // Success
+
+        // / Copy data to persistent file
+        out.put(blockBuf);
+        // positioning the
         bytesVerified += bytesRead;
+
+        // Clear buffer
         blockBuf.clear();
         checksumBuf.clear();
       }
+      // Forces to write data to storage device containing the mapped file
+      out.force();
     } finally {
       IOUtils.closeQuietly(metaChannel);
     }
   }
 
   @Override
+  public String getCacheCapacityConfigKey() {
+    return DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
+  }
+
+  @Override
+  public long getCacheUsed() {
+    return pmemVolumeManager.getCacheUsed();
+  }
+
+  @Override
+  public long getCacheCapacity() {
+    return pmemVolumeManager.getCacheCapacity();
+  }
+
+  @Override
   long reserve(long bytesCount) {
-    return cacheManager.reserve(bytesCount);
+    return pmemVolumeManager.reserve(bytesCount);
   }
 
   @Override
   long release(long bytesCount) {
-    return cacheManager.release(bytesCount);
+    return pmemVolumeManager.release(bytesCount);
+  }
+
+  @Override
+  public boolean isTransientCache() {
+    return false;
+  }
+
+  @Override
+  public String getCachedPath(ExtendedBlockId key) {
+    return pmemVolumeManager.getCacheFilePath(key);
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
new file mode 100644
index 0000000..ce4fa22
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Represents an HDFS block that is mapped to persistent memory by DataNode
+ * with mapped byte buffer. PMDK is NOT involved in this implementation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PmemMappedBlock implements MappableBlock {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PmemMappedBlock.class);
+  private final PmemVolumeManager pmemVolumeManager;
+  private long length;
+  private Byte volumeIndex = null;
+  private ExtendedBlockId key;
+
+  PmemMappedBlock(long length, Byte volumeIndex, ExtendedBlockId key,
+                  PmemVolumeManager pmemVolumeManager) {
+    assert length > 0;
+    this.length = length;
+    this.volumeIndex = volumeIndex;
+    this.key = key;
+    this.pmemVolumeManager = pmemVolumeManager;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
+  }
+
+  @Override
+  public void close() {
+    String cacheFilePath =
+        pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
+    try {
+      FsDatasetUtil.deleteMappedFile(cacheFilePath);
+      pmemVolumeManager.afterUncache(key);
+      LOG.info("Successfully uncached one replica:{} from persistent memory"
+          + ", [cached path={}, length={}]", key, cacheFilePath, length);
+    } catch (IOException e) {
+      LOG.warn("Failed to delete the mapped File: {}!", cacheFilePath, e);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
new file mode 100644
index 0000000..76aa2dd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manage the persistent memory volumes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PmemVolumeManager {
+
+  /**
+   * Counts used bytes for persistent memory.
+   */
+  private class UsedBytesCount {
+    private final AtomicLong usedBytes = new AtomicLong(0);
+
+    /**
+     * Try to reserve more bytes.
+     *
+     * @param bytesCount    The number of bytes to add.
+     *
+     * @return              The new number of usedBytes if we succeeded;
+     *                      -1 if we failed.
+     */
+    long reserve(long bytesCount) {
+      while (true) {
+        long cur = usedBytes.get();
+        long next = cur + bytesCount;
+        if (next > cacheCapacity) {
+          return -1;
+        }
+        if (usedBytes.compareAndSet(cur, next)) {
+          return next;
+        }
+      }
+    }
+
+    /**
+     * Release some bytes that we're using.
+     *
+     * @param bytesCount    The number of bytes to release.
+     *
+     * @return              The new number of usedBytes.
+     */
+    long release(long bytesCount) {
+      return usedBytes.addAndGet(-bytesCount);
+    }
+
+    long get() {
+      return usedBytes.get();
+    }
+  }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PmemVolumeManager.class);
+  private final ArrayList<String> pmemVolumes = new ArrayList<>();
+  // Maintain which pmem volume a block is cached to.
+  private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
+      new ConcurrentHashMap<>();
+  private final UsedBytesCount usedBytesCount;
+
+  /**
+   * The total cache capacity in bytes of persistent memory.
+   * It is 0L if the specific mappableBlockLoader couldn't cache data to pmem.
+   */
+  private final long cacheCapacity;
+  private int count = 0;
+  // Strict atomic operation is not guaranteed for the performance sake.
+  private int i = 0;
+
+  PmemVolumeManager(long maxBytes, String[] pmemVolumesConfigured)
+      throws IOException {
+    if (pmemVolumesConfigured == null || pmemVolumesConfigured.length == 0) {
+      throw new IOException("The persistent memory volume, " +
+          DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY +
+          " is not configured!");
+    }
+    this.loadVolumes(pmemVolumesConfigured);
+    this.usedBytesCount = new UsedBytesCount();
+    this.cacheCapacity = maxBytes;
+  }
+
+  public long getCacheUsed() {
+    return usedBytesCount.get();
+  }
+
+  public long getCacheCapacity() {
+    return cacheCapacity;
+  }
+
+  /**
+   * Try to reserve more bytes on persistent memory.
+   *
+   * @param bytesCount    The number of bytes to add.
+   *
+   * @return              The new number of usedBytes if we succeeded;
+   *                      -1 if we failed.
+   */
+  long reserve(long bytesCount) {
+    return usedBytesCount.reserve(bytesCount);
+  }
+
+  /**
+   * Release some bytes that we're using on persistent memory.
+   *
+   * @param bytesCount    The number of bytes to release.
+   *
+   * @return              The new number of usedBytes.
+   */
+  long release(long bytesCount) {
+    return usedBytesCount.release(bytesCount);
+  }
+
+  /**
+   * Load and verify the configured pmem volumes.
+   *
+   * @throws IOException   If there is no available pmem volume.
+   */
+  private void loadVolumes(String[] volumes) throws IOException {
+    // Check whether the volume exists
+    for (String volume: volumes) {
+      try {
+        File pmemDir = new File(volume);
+        verifyIfValidPmemVolume(pmemDir);
+        // Remove all files under the volume.
+        FileUtils.cleanDirectory(pmemDir);
+      } catch (IllegalArgumentException e) {
+        LOG.error("Failed to parse persistent memory volume " + volume, e);
+        continue;
+      } catch (IOException e) {
+        LOG.error("Bad persistent memory volume: " + volume, e);
+        continue;
+      }
+      pmemVolumes.add(volume);
+      LOG.info("Added persistent memory - " + volume);
+    }
+    count = pmemVolumes.size();
+    if (count == 0) {
+      throw new IOException(
+          "At least one valid persistent memory volume is required!");
+    }
+  }
+
+  @VisibleForTesting
+  static void verifyIfValidPmemVolume(File pmemDir)
+      throws IOException {
+    if (!pmemDir.exists()) {
+      final String message = pmemDir + " does not exist";
+      throw new IOException(message);
+    }
+
+    if (!pmemDir.isDirectory()) {
+      final String message = pmemDir + " is not a directory";
+      throw new IllegalArgumentException(message);
+    }
+
+    String uuidStr = UUID.randomUUID().toString();
+    String testFilePath = pmemDir.getPath() + "/.verify.pmem." + uuidStr;
+    byte[] contents = uuidStr.getBytes("UTF-8");
+    RandomAccessFile testFile = null;
+    MappedByteBuffer out = null;
+    try {
+      testFile = new RandomAccessFile(testFilePath, "rw");
+      out = testFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0,
+          contents.length);
+      if (out == null) {
+        throw new IOException("Failed to map the test file under " + pmemDir);
+      }
+      out.put(contents);
+      // Forces to write data to storage device containing the mapped file
+      out.force();
+    } catch (IOException e) {
+      throw new IOException(
+          "Exception while writing data to persistent storage dir: " +
+              pmemDir, e);
+    } finally {
+      if (out != null) {
+        out.clear();
+      }
+      if (testFile != null) {
+        IOUtils.closeQuietly(testFile);
+        NativeIO.POSIX.munmap(out);
+        try {
+          FsDatasetUtil.deleteMappedFile(testFilePath);
+        } catch (IOException e) {
+          LOG.warn("Failed to delete test file " + testFilePath +
+              " from persistent memory", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Choose a persistent memory volume based on a specific algorithm.
+   * Currently it is a round-robin policy.
+   *
+   * TODO: Refine volume selection policy by considering storage utilization.
+   */
+  Byte getOneVolumeIndex() throws IOException {
+    if (count != 0) {
+      return (byte)(i++ % count);
+    } else {
+      throw new IOException("No usable persistent memory is found");
+    }
+  }
+
+  @VisibleForTesting
+  String getVolumeByIndex(Byte index) {
+    return pmemVolumes.get(index);
+  }
+
+  /**
+   * The cache file is named as BlockPoolId-BlockId.
+   * So its name can be inferred by BlockPoolId and BlockId.
+   */
+  public String getCacheFileName(ExtendedBlockId key) {
+    return key.getBlockPoolId() + "-" + key.getBlockId();
+  }
+
+  /**
+   * Considering the pmem volume size is below TB level currently,
+   * it is tolerable to keep cache files under one directory.
+   * The strategy will be optimized, especially if one pmem volume
+   * has huge cache capacity.
+   *
+   * @param volumeIndex   The index of pmem volume where a replica will be
+   *                      cached to or has been cached to.
+   *
+   * @param key           The replica's ExtendedBlockId.
+   *
+   * @return              A path to which the block replica is mapped.
+   */
+  public String inferCacheFilePath(Byte volumeIndex, ExtendedBlockId key) {
+    return pmemVolumes.get(volumeIndex) + "/" + getCacheFileName(key);
+  }
+
+  /**
+   * The cache file path is pmemVolume/BlockPoolId-BlockId.
+   */
+  public String getCacheFilePath(ExtendedBlockId key) {
+    Byte volumeIndex = blockKeyToVolume.get(key);
+    if (volumeIndex == null) {
+      return  null;
+    }
+    return inferCacheFilePath(volumeIndex, key);
+  }
+
+  @VisibleForTesting
+  Map<ExtendedBlockId, Byte> getBlockKeyToVolume() {
+    return blockKeyToVolume;
+  }
+
+  /**
+   * Add cached block's ExtendedBlockId and its cache volume index to a map
+   * after cache.
+   */
+  public void afterCache(ExtendedBlockId key, Byte volumeIndex) {
+    blockKeyToVolume.put(key, volumeIndex);
+  }
+
+  /**
+   * Remove the record in blockKeyToVolume for uncached block after uncache.
+   */
+  public void afterUncache(ExtendedBlockId key) {
+    blockKeyToVolume.remove(key);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index e9b35be..8a8b55d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2527,6 +2527,18 @@
 </property>
 
 <property>
+  <name>dfs.datanode.cache.loader.class</name>
+  <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader</value>
+  <description>
+    Currently, the available cache loaders are MemoryMappableBlockLoader, PmemMappableBlockLoader.
+    By default, MemoryMappableBlockLoader is used and it maps block replica into memory.
+    PmemMappableBlockLoader can map block to persistent memory with mapped byte buffer, which is
+    implemented by Java code. The value of dfs.datanode.cache.pmem.dirs specifies the persistent
+    memory directory.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.max.locked.memory</name>
   <value>0</value>
   <description>
@@ -2544,6 +2556,28 @@
 </property>
 
 <property>
+  <name>dfs.datanode.cache.pmem.capacity</name>
+  <value>0</value>
+  <description>
+    The amount of persistent memory in bytes that can be used to cache block
+    replicas to persistent memory. Currently, persistent memory is only enabled
+    in HDFS Centralized Cache Management feature.
+
+    By default, this parameter is 0, which disables persistent memory caching.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.cache.pmem.dirs</name>
+  <value></value>
+  <description>
+    This value specifies the persistent memory directory used for caching block
+    replica. It matters only if the value of dfs.datanode.cache.loader.class is
+    PmemMappableBlockLoader. Multiple directories separated by "," are acceptable.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.list.cache.directives.num.responses</name>
   <value>100</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
new file mode 100644
index 0000000..9b4f06f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import com.google.common.base.Supplier;
+import com.google.common.primitives.Ints;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY;
+
+/**
+ * Tests HDFS persistent memory cache by PmemMappableBlockLoader.
+ *
+ * Bogus persistent memory volume is used to cache blocks.
+ */
+public class TestCacheByPmemMappableBlockLoader {
+  protected static final org.slf4j.Logger LOG =
+      LoggerFactory.getLogger(TestCacheByPmemMappableBlockLoader.class);
+
+  protected static final long CACHE_CAPACITY = 64 * 1024;
+  protected static final long BLOCK_SIZE = 4 * 1024;
+
+  private static Configuration conf;
+  private static MiniDFSCluster cluster = null;
+  private static DistributedFileSystem fs;
+  private static DataNode dn;
+  private static FsDatasetCache cacheManager;
+  private static PmemMappableBlockLoader cacheLoader;
+  /**
+   * Used to pause DN BPServiceActor threads. BPSA threads acquire the
+   * shared read lock. The test acquires the write lock for exclusive access.
+   */
+  private static ReadWriteLock lock = new ReentrantReadWriteLock(true);
+  private static CacheManipulator prevCacheManipulator;
+  private static DataNodeFaultInjector oldInjector;
+
+  private static final String PMEM_DIR_0 =
+      MiniDFSCluster.getBaseDirectory() + "pmem0";
+  private static final String PMEM_DIR_1 =
+      MiniDFSCluster.getBaseDirectory() + "pmem1";
+
+  static {
+    GenericTestUtils.setLogLevel(
+        LoggerFactory.getLogger(FsDatasetCache.class), Level.DEBUG);
+  }
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    oldInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+      @Override
+      public void startOfferService() throws Exception {
+        lock.readLock().lock();
+      }
+
+      @Override
+      public void endOfferService() throws Exception {
+        lock.readLock().unlock();
+      }
+    });
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    DataNodeFaultInjector.set(oldInjector);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setLong(
+        DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
+    conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        CACHE_CAPACITY);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
+
+    // Configuration for pmem cache
+    conf.set(DFS_DATANODE_CACHE_LOADER_CLASS,
+        "org.apache.hadoop.hdfs.server.datanode." +
+            "fsdataset.impl.PmemMappableBlockLoader");
+    new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
+    new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
+    // Configure two bogus pmem volumes
+    conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
+    conf.setLong(DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, CACHE_CAPACITY);
+
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    cluster.waitActive();
+
+    fs = cluster.getFileSystem();
+    dn = cluster.getDataNodes().get(0);
+    cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
+    cacheLoader = (PmemMappableBlockLoader) cacheManager.getCacheLoader();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+    NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
+  }
+
+  protected static void shutdownCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testPmemVolumeManager() throws IOException {
+    PmemVolumeManager pmemVolumeManager =
+        cacheLoader.getPmemVolumeManager();
+    assertNotNull(pmemVolumeManager);
+    assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity());
+    // Test round-robin selection policy
+    long count1 = 0, count2 = 0;
+    for (int i = 0; i < 10; i++) {
+      Byte index = pmemVolumeManager.getOneVolumeIndex();
+      String volume = pmemVolumeManager.getVolumeByIndex(index);
+      if (volume.equals(PMEM_DIR_0)) {
+        count1++;
+      } else if (volume.equals(PMEM_DIR_1)) {
+        count2++;
+      } else {
+        fail("Unexpected persistent storage location:" + volume);
+      }
+    }
+    assertEquals(count1, count2);
+  }
+
+  public List<ExtendedBlockId> getExtendedBlockId(Path filePath, long fileLen)
+      throws IOException {
+    List<ExtendedBlockId> keys = new ArrayList<>();
+    HdfsBlockLocation[] locs =
+        (HdfsBlockLocation[]) fs.getFileBlockLocations(filePath, 0, fileLen);
+    for (HdfsBlockLocation loc : locs) {
+      long bkid = loc.getLocatedBlock().getBlock().getBlockId();
+      String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
+      keys.add(new ExtendedBlockId(bkid, bpid));
+    }
+    return keys;
+  }
+
+  @Test(timeout = 60000)
+  public void testCacheAndUncache() throws Exception {
+    final int maxCacheBlocksNum =
+        Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
+    BlockReaderTestUtil.enableHdfsCachingTracing();
+    Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
+    assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheCapacity());
+
+    final Path testFile = new Path("/testFile");
+    final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE;
+    DFSTestUtil.createFile(fs, testFile,
+        testFileLen, (short) 1, 0xbeef);
+    List<ExtendedBlockId> blockKeys =
+        getExtendedBlockId(testFile, testFileLen);
+    fs.addCachePool(new CachePoolInfo("testPool"));
+    final long cacheDirectiveId = fs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().setPool("testPool").
+            setPath(testFile).setReplication((short) 1).build());
+    // wait for caching
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+        long blocksCached =
+            MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
+        if (blocksCached != maxCacheBlocksNum) {
+          LOG.info("waiting for " + maxCacheBlocksNum + " blocks to " +
+              "be cached. Right now " + blocksCached + " blocks are cached.");
+          return false;
+        }
+        LOG.info(maxCacheBlocksNum + " blocks are now cached.");
+        return true;
+      }
+    }, 1000, 30000);
+
+    // The pmem cache space is expected to have been used up.
+    assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed());
+    Map<ExtendedBlockId, Byte> blockKeyToVolume =
+        cacheLoader.getPmemVolumeManager().getBlockKeyToVolume();
+    // All block keys should be kept in blockKeyToVolume
+    assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
+    assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
+    // Test each replica's cache file path
+    for (ExtendedBlockId key : blockKeys) {
+      String cachePath = cacheManager.
+          getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
+      // The cachePath shouldn't be null if the replica has been cached
+      // to pmem.
+      assertNotNull(cachePath);
+      String expectFileName =
+          cacheLoader.getPmemVolumeManager().getCacheFileName(key);
+      if (cachePath.startsWith(PMEM_DIR_0)) {
+        assertTrue(cachePath.equals(PMEM_DIR_0 + "/" + expectFileName));
+      } else if (cachePath.startsWith(PMEM_DIR_1)) {
+        assertTrue(cachePath.equals(PMEM_DIR_1 + "/" + expectFileName));
+      } else {
+        fail("The cache path is not the expected one: " + cachePath);
+      }
+    }
+
+    // Try to cache another file. Caching this file should fail
+    // due to lack of available cache space.
+    final Path smallTestFile = new Path("/smallTestFile");
+    final long smallTestFileLen =  BLOCK_SIZE;
+    DFSTestUtil.createFile(fs, smallTestFile,
+        smallTestFileLen, (short) 1, 0xbeef);
+    // Try to cache more blocks when no cache space is available.
+    final long smallFileCacheDirectiveId = fs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().setPool("testPool").
+            setPath(smallTestFile).setReplication((short) 1).build());
+
+    // Wait for enough time to verify smallTestFile could not be cached.
+    Thread.sleep(10000);
+    MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+    long blocksCached =
+        MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
+    // The cached block num should not be increased.
+    assertTrue(blocksCached == maxCacheBlocksNum);
+    // The blockKeyToVolume should just keep the block keys for the testFile.
+    assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
+    assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
+    // Stop trying to cache smallTestFile to avoid interfering the
+    // verification for uncache functionality.
+    fs.removeCacheDirective(smallFileCacheDirectiveId);
+
+    // Uncache the test file
+    fs.removeCacheDirective(cacheDirectiveId);
+    // Wait for uncaching
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+        long blocksUncached =
+            MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
+        if (blocksUncached != maxCacheBlocksNum) {
+          LOG.info("waiting for " + maxCacheBlocksNum + " blocks to be " +
+              "uncached. Right now " + blocksUncached +
+              " blocks are uncached.");
+          return false;
+        }
+        LOG.info(maxCacheBlocksNum + " blocks have been uncached.");
+        return true;
+      }
+    }, 1000, 30000);
+
+    // It is expected that no pmem cache space is used.
+    assertEquals(0, cacheManager.getPmemCacheUsed());
+    // No record should be kept by blockKeyToVolume after testFile is uncached.
+    assertEquals(blockKeyToVolume.size(), 0);
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org