You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2020/01/02 06:53:49 UTC

[hadoop] branch branch-3.1 updated: HDFS-14740. Recover data blocks from persistent memory read cache during datanode restarts. Contributed by Feilong He.

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 8b748c0  HDFS-14740. Recover data blocks from persistent memory read cache during datanode restarts. Contributed by Feilong He.
8b748c0 is described below

commit 8b748c0308ab504f66f2a984eaec1a3d87116f2f
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Thu Jan 2 11:44:00 2020 +0530

    HDFS-14740. Recover data blocks from persistent memory read cache during datanode restarts. Contributed by Feilong He.
    
    (cherry picked from commit d79cce20abbbf321f6dcce03f4087544124a7cd2)
---
 .../org/apache/hadoop/io/nativeio/NativeIO.java    |  21 ++-
 .../src/org/apache/hadoop/io/nativeio/NativeIO.c   |  25 +--
 .../apache/hadoop/io/nativeio/TestNativeIO.java    |  13 +-
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  10 +-
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |  17 +-
 .../datanode/fsdataset/impl/FsDatasetCache.java    |  28 +++-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |  15 +-
 .../datanode/fsdataset/impl/MappableBlock.java     |   7 +
 .../fsdataset/impl/MappableBlockLoader.java        |   7 +
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |   7 +
 .../datanode/fsdataset/impl/MemoryMappedBlock.java |   6 +
 .../impl/NativePmemMappableBlockLoader.java        |  31 +++-
 .../fsdataset/impl/NativePmemMappedBlock.java      |   9 +-
 .../fsdataset/impl/PmemMappableBlockLoader.java    |  37 ++++-
 .../datanode/fsdataset/impl/PmemMappedBlock.java   |  10 +-
 .../datanode/fsdataset/impl/PmemVolumeManager.java | 120 +++++++++++---
 .../src/main/resources/hdfs-default.xml            |  15 +-
 .../site/markdown/CentralizedCacheManagement.md    |   6 +-
 .../impl/TestCacheByPmemMappableBlockLoader.java   |  20 ++-
 ...BlockLoader.java => TestPmemCacheRecovery.java} | 183 +++++++++++----------
 20 files changed, 425 insertions(+), 162 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index f2b3456..feea6d8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -223,28 +223,31 @@ public class NativeIO {
      * JNI wrapper of persist memory operations.
      */
     public static class Pmem {
-      // check whether the address is a Pmem address or DIMM address
+      // Check whether the address is a Pmem address or DIMM address
       public static boolean isPmem(long address, long length) {
         return NativeIO.POSIX.isPmemCheck(address, length);
       }
 
-      // create a pmem file and memory map it
-      public static PmemMappedRegion mapBlock(String path, long length) {
-        return NativeIO.POSIX.pmemCreateMapFile(path, length);
+      // Map a file in persistent memory, if the given file exists,
+      // directly map it. If not, create the named file on persistent memory
+      // and then map it.
+      public static PmemMappedRegion mapBlock(
+          String path, long length, boolean isFileExist) {
+        return NativeIO.POSIX.pmemMapFile(path, length, isFileExist);
       }
 
-      // unmap a pmem file
+      // Unmap a pmem file
       public static boolean unmapBlock(long address, long length) {
         return NativeIO.POSIX.pmemUnMap(address, length);
       }
 
-      // copy data from disk file(src) to pmem file(dest), without flush
+      // Copy data from disk file(src) to pmem file(dest), without flush
       public static void memCopy(byte[] src, long dest, boolean isPmem,
           long length) {
         NativeIO.POSIX.pmemCopy(src, dest, isPmem, length);
       }
 
-      // flush the memory content to persistent storage
+      // Flush the memory content to persistent storage
       public static void memSync(PmemMappedRegion region) {
         if (region.isPmem()) {
           NativeIO.POSIX.pmemDrain();
@@ -260,8 +263,8 @@ public class NativeIO {
 
     private static native String getPmdkLibPath();
     private static native boolean isPmemCheck(long address, long length);
-    private static native PmemMappedRegion pmemCreateMapFile(String path,
-        long length);
+    private static native PmemMappedRegion pmemMapFile(String path,
+        long length, boolean isFileExist);
     private static native boolean pmemUnMap(long address, long length);
     private static native void pmemCopy(byte[] src, long dest, boolean isPmem,
         long length);
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
index b0b5151..1d7c508 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
@@ -1486,11 +1486,11 @@ JNIEnv *env, jclass thisClass, jlong address, jlong length) {
 
 /*
  * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
- * Method:    pmemCreateMapFile
+ * Method:    pmemMapFile
  * Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion;
  */
-JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCreateMapFile(
-JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
+JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemMapFile(
+JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength, jboolean isFileExist) {
   #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
     /* create a pmem file and memory map it */
     const char * path = NULL;
@@ -1505,17 +1505,20 @@ JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
       return NULL;
     }
 
-    if (fileLength <= 0) {
-      (*env)->ReleaseStringUTFChars(env, filePath, path);
-      THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
-      return NULL;
+    if (isFileExist) {
+      pmemaddr = pmdkLoader->pmem_map_file(path, 0, 0, 0666, &mapped_len, &is_pmem);
+    } else {
+      if (fileLength <= 0) {
+        (*env)->ReleaseStringUTFChars(env, filePath, path);
+        THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
+        return NULL;
+      }
+      pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
+                0666, &mapped_len, &is_pmem);
     }
 
-    pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
-        0666, &mapped_len, &is_pmem);
-
     if (!pmemaddr) {
-      snprintf(msg, sizeof(msg), "Failed to create pmem file. file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
+      snprintf(msg, sizeof(msg), "Failed to map file on persistent memory.file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
       THROW(env, "java/io/IOException", msg);
       (*env)->ReleaseStringUTFChars(env, filePath, path);
       return NULL;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
index a14928c..c21fa44 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
@@ -800,7 +800,7 @@ public class TestNativeIO {
 
     // Incorrect file length
     try {
-      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
       fail("Illegal length parameter should be detected");
     } catch (Exception e) {
       LOG.info(e.getMessage());
@@ -810,7 +810,7 @@ public class TestNativeIO {
     filePath = "/mnt/pmem0/test_native_io";
     length = -1L;
     try {
-      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
       fail("Illegal length parameter should be detected");
     }catch (Exception e) {
       LOG.info(e.getMessage());
@@ -837,10 +837,10 @@ public class TestNativeIO {
     for (int i = 0; i < fileNumber; i++) {
       String path = filePath + i;
       LOG.info("File path = " + path);
-      NativeIO.POSIX.Pmem.mapBlock(path, length);
+      NativeIO.POSIX.Pmem.mapBlock(path, length, false);
     }
     try {
-      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
       fail("Request map extra file when persistent memory is all occupied");
     } catch (Exception e) {
       LOG.info(e.getMessage());
@@ -863,7 +863,7 @@ public class TestNativeIO {
     length = volumeSize + 1024L;
     try {
       LOG.info("File length = " + length);
-      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length, false);
       fail("File length exceeds persistent memory total volume size");
     }catch (Exception e) {
       LOG.info(e.getMessage());
@@ -881,7 +881,8 @@ public class TestNativeIO {
     // memory device.
     String filePath = "/mnt/pmem0/copy";
     long length = 4096;
-    PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+    PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(
+        filePath, length, false);
     assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length));
     assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100));
     assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5bae88c..161581b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -381,9 +381,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
 
   // Multiple dirs separated by "," are acceptable.
-  public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY =
-      "dfs.datanode.cache.pmem.dirs";
-  public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = "";
+  public static final String DFS_DATANODE_PMEM_CACHE_DIRS_KEY =
+      "dfs.datanode.pmem.cache.dirs";
+  public static final String DFS_DATANODE_PMEM_CACHE_DIRS_DEFAULT = "";
+  public static final String  DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY =
+      "dfs.datanode.pmem.cache.recovery";
+  public static final boolean DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT =
+      true;
 
   public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
   public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 139ad77..9f792472 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -27,12 +27,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@@ -93,6 +95,7 @@ public class DNConf {
   final boolean encryptDataTransfer;
   final boolean connectToDnViaHostname;
   final boolean overwriteDownstreamDerivedQOP;
+  private final boolean pmemCacheRecoveryEnabled;
 
   final long readaheadLength;
   final long heartBeatInterval;
@@ -264,7 +267,7 @@ public class DNConf {
         DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
 
     this.pmemDirs = getConf().getTrimmedStrings(
-        DFS_DATANODE_CACHE_PMEM_DIRS_KEY);
+        DFS_DATANODE_PMEM_CACHE_DIRS_KEY);
 
     this.restartReplicaExpiry = getConf().getLong(
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
@@ -285,6 +288,10 @@ public class DNConf {
     String[] dataDirs =
         getConf().getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
     this.volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
+
+    this.pmemCacheRecoveryEnabled = getConf().getBoolean(
+        DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY,
+        DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT);
   }
 
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@@ -294,7 +301,7 @@ public class DNConf {
   
   /**
    * Returns the configuration.
-   * 
+   *
    * @return Configuration the configuration
    */
   public Configuration getConf() {
@@ -434,4 +441,8 @@ public class DNConf {
   public String[] getPmemVolumes() {
     return pmemDirs;
   }
+
+  public boolean getPmemCacheRecoveryEnabled() {
+    return pmemCacheRecoveryEnabled;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 8c2a758..1496b44 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -33,7 +33,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -184,6 +186,30 @@ public class FsDatasetCache {
     this.memCacheStats = cacheLoader.initialize(this.getDnConf());
   }
 
+  /**
+   * For persistent memory cache, create cache subdirectory specified with
+   * blockPoolId to store cache data.
+   * Recover the status of cache in persistent memory, if any.
+   */
+  public void initCache(String bpid) throws IOException {
+    if (cacheLoader.isTransientCache()) {
+      return;
+    }
+    PmemVolumeManager.getInstance().createBlockPoolDir(bpid);
+    if (getDnConf().getPmemCacheRecoveryEnabled()) {
+      final Map<ExtendedBlockId, MappableBlock> keyToMappableBlock =
+          PmemVolumeManager.getInstance().recoverCache(bpid, cacheLoader);
+      Set<Map.Entry<ExtendedBlockId, MappableBlock>> entrySet
+          = keyToMappableBlock.entrySet();
+      for (Map.Entry<ExtendedBlockId, MappableBlock> entry : entrySet) {
+        mappableBlockMap.put(entry.getKey(),
+            new Value(keyToMappableBlock.get(entry.getKey()), State.CACHED));
+        numBlocksCached.addAndGet(1);
+        dataset.datanode.getMetrics().incrBlocksCached(1);
+      }
+    }
+  }
+
   DNConf getDnConf() {
     return this.dataset.datanode.getDnConf();
   }
@@ -191,7 +217,7 @@ public class FsDatasetCache {
   /**
    * Get the cache path if the replica is cached into persistent memory.
    */
-  String getReplicaCachePath(String bpid, long blockId) {
+  String getReplicaCachePath(String bpid, long blockId) throws IOException {
     if (cacheLoader.isTransientCache() ||
         !isCached(bpid, blockId)) {
       return null;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index beede18..faf0935 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -270,6 +270,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @VisibleForTesting
   final AutoCloseableLock datasetLock;
   private final Condition datasetLockCondition;
+  private static String blockPoolId = "";
   
   /**
    * An FSDataset has a directory where it loads its data files.
@@ -2843,6 +2844,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (volumeExceptions.hasExceptions()) {
       throw volumeExceptions;
     }
+    // For test use only.
+    if (!blockPoolId.isEmpty()) {
+      bpid = blockPoolId;
+    }
+    cacheManager.initCache(bpid);
+  }
+
+  @VisibleForTesting
+  public static void setBlockPoolId(String bpid) {
+    blockPoolId = bpid;
   }
 
   @Override
@@ -3372,8 +3383,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   void stopAllDataxceiverThreads(FsVolumeImpl volume) {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
-      for (String blockPoolId : volumeMap.getBlockPoolList()) {
-        Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
+      for (String bpid : volumeMap.getBlockPoolList()) {
+        Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
         for (ReplicaInfo replicaInfo : replicas) {
           if ((replicaInfo.getState() == ReplicaState.TEMPORARY
               || replicaInfo.getState() == ReplicaState.RBW)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
index a00c442..ad1a0b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
 
 import java.io.Closeable;
 
@@ -41,4 +42,10 @@ public interface MappableBlock extends Closeable {
    * Return -1 if not applicable.
    */
   long getAddress();
+
+  /**
+   * Get cached block's ExtendedBlockId.
+   * @return cached block's ExtendedBlockId..
+   */
+  ExtendedBlockId getKey();
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 5118774..02bea81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.util.DataChecksum;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -112,6 +113,12 @@ public abstract class MappableBlockLoader {
   abstract boolean isNativeLoader();
 
   /**
+   * Get mappableBlock recovered from persistent memory.
+   */
+  abstract MappableBlock getRecoveredMappableBlock(
+      File cacheFile, String bpid, byte volumeIndex) throws IOException;
+
+  /**
    * Clean up cache, can be used during DataNode shutdown.
    */
   void shutdown() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index f5a9a41..a7853f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.nativeio.NativeIO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.MappedByteBuffer;
@@ -119,6 +120,12 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
+  public MappableBlock getRecoveredMappableBlock(
+      File cacheFile, String bpid, byte volumeIndex) throws IOException {
+    return null;
+  }
+
+  @Override
   public boolean isNativeLoader() {
     return false;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
index 47dfeae..8fad33a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
@@ -22,6 +22,7 @@ import java.nio.MappedByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.io.nativeio.NativeIO;
 
 /**
@@ -50,6 +51,11 @@ public class MemoryMappedBlock implements MappableBlock {
   }
 
   @Override
+  public ExtendedBlockId getKey() {
+    return null;
+  }
+
+  @Override
   public void close() {
     if (mmap != null) {
       NativeIO.POSIX.munmap(mmap);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
index a5af437..55e5dec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -57,8 +58,8 @@ public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
    *
    * Map the block and verify its checksum.
    *
-   * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
-   * is a persistent memory volume chosen by PmemVolumeManager.
+   * The block will be mapped to PmemDir/BlockPoolId/subdir#/subdir#/BlockId,
+   * in which PmemDir is a persistent memory volume chosen by PmemVolumeManager.
    *
    * @param length         The current length of the block.
    * @param blockIn        The block input stream. Should be positioned at the
@@ -91,7 +92,7 @@ public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
 
       assert NativeIO.isAvailable();
       filePath = PmemVolumeManager.getInstance().getCachePath(key);
-      region = POSIX.Pmem.mapBlock(filePath, length);
+      region = POSIX.Pmem.mapBlock(filePath, length, false);
       if (region == null) {
         throw new IOException("Failed to map the block " + blockFileName +
             " to persistent storage.");
@@ -189,4 +190,28 @@ public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
   public boolean isNativeLoader() {
     return true;
   }
+
+  @Override
+  public MappableBlock getRecoveredMappableBlock(
+      File cacheFile, String bpid, byte volumeIndex) throws IOException {
+    NativeIO.POSIX.PmemMappedRegion region =
+        NativeIO.POSIX.Pmem.mapBlock(cacheFile.getAbsolutePath(),
+            cacheFile.length(), true);
+    if (region == null) {
+      throw new IOException("Failed to recover the block "
+          + cacheFile.getName() + " in persistent storage.");
+    }
+    ExtendedBlockId key =
+        new ExtendedBlockId(super.getBlockId(cacheFile), bpid);
+    MappableBlock mappableBlock = new NativePmemMappedBlock(
+        region.getAddress(), region.getLength(), key);
+    PmemVolumeManager.getInstance().recoverBlockKeyToVolume(key, volumeIndex);
+
+    String path = PmemVolumeManager.getInstance().getCachePath(key);
+    long addr = mappableBlock.getAddress();
+    long length = mappableBlock.getLength();
+    LOG.info("Recovering persistent memory cache for block {}, " +
+        "path = {}, address = {}, length = {}", key, path, addr, length);
+    return mappableBlock;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
index 92012b2..3d52762 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
@@ -59,11 +59,16 @@ public class NativePmemMappedBlock implements MappableBlock {
   }
 
   @Override
+  public ExtendedBlockId getKey() {
+    return key;
+  }
+
+  @Override
   public void close() {
     if (pmemMappedAddress != -1L) {
-      String cacheFilePath =
-          PmemVolumeManager.getInstance().getCachePath(key);
       try {
+        String cacheFilePath =
+            PmemVolumeManager.getInstance().getCachePath(key);
         // Current libpmem will report error when pmem_unmap is called with
         // length not aligned with page size, although the length is returned
         // by pmem_map_file.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index 19dcc4b..e8c6ac1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -40,12 +41,15 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
   private static final Logger LOG =
       LoggerFactory.getLogger(PmemMappableBlockLoader.class);
   private PmemVolumeManager pmemVolumeManager;
+  private boolean cacheRecoveryEnabled;
 
   @Override
   CacheStats initialize(DNConf dnConf) throws IOException {
     LOG.info("Initializing cache loader: " + this.getClass().getName());
-    PmemVolumeManager.init(dnConf.getPmemVolumes());
+    PmemVolumeManager.init(dnConf.getPmemVolumes(),
+        dnConf.getPmemCacheRecoveryEnabled());
     pmemVolumeManager = PmemVolumeManager.getInstance();
+    cacheRecoveryEnabled = dnConf.getPmemCacheRecoveryEnabled();
     // The configuration for max locked memory is shaded.
     LOG.info("Persistent memory is used for caching data instead of " +
         "DRAM. Max locked memory is set to zero to disable DRAM cache");
@@ -59,8 +63,8 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
    *
    * Map the block and verify its checksum.
    *
-   * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
-   * is a persistent memory volume chosen by PmemVolumeManager.
+   * The block will be mapped to PmemDir/BlockPoolId/subdir#/subdir#/BlockId,
+   * in which PmemDir is a persistent memory volume chosen by PmemVolumeManager.
    *
    * @param length         The current length of the block.
    * @param blockIn        The block input stream. Should be positioned at the
@@ -142,8 +146,31 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
+  public MappableBlock getRecoveredMappableBlock(
+      File cacheFile, String bpid, byte volumeIndex) throws IOException {
+    ExtendedBlockId key = new ExtendedBlockId(getBlockId(cacheFile), bpid);
+    MappableBlock mappableBlock = new PmemMappedBlock(cacheFile.length(), key);
+    PmemVolumeManager.getInstance().recoverBlockKeyToVolume(key, volumeIndex);
+
+    String path = PmemVolumeManager.getInstance().getCachePath(key);
+    long length = mappableBlock.getLength();
+    LOG.info("Recovering persistent memory cache for block {}, " +
+        "path = {}, length = {}", key, path, length);
+    return mappableBlock;
+  }
+
+  /**
+   * Parse the file name and get the BlockId.
+   */
+  public long getBlockId(File file) {
+    return Long.parseLong(file.getName());
+  }
+
+  @Override
   void shutdown() {
-    LOG.info("Clean up cache on persistent memory during shutdown.");
-    PmemVolumeManager.getInstance().cleanup();
+    if (!cacheRecoveryEnabled) {
+      LOG.info("Clean up cache on persistent memory during shutdown.");
+      PmemVolumeManager.getInstance().cleanup();
+    }
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
index a49626a..4502817 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
@@ -55,10 +55,16 @@ public class PmemMappedBlock implements MappableBlock {
   }
 
   @Override
+  public ExtendedBlockId getKey() {
+    return key;
+  }
+
+  @Override
   public void close() {
-    String cacheFilePath =
-        PmemVolumeManager.getInstance().getCachePath(key);
+    String cacheFilePath = null;
     try {
+      cacheFilePath =
+          PmemVolumeManager.getInstance().getCachePath(key);
       FsDatasetUtil.deleteMappedFile(cacheFilePath);
       LOG.info("Successfully uncached one replica:{} from persistent memory"
           + ", [cached path={}, length={}]", key, cacheFilePath, length);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
index 969d18b..51b7681 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -35,6 +36,7 @@ import java.io.RandomAccessFile;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -52,7 +54,7 @@ public final class PmemVolumeManager {
    * Counts used bytes for persistent memory.
    */
   private static class UsedBytesCount {
-    private final long maxBytes;
+    private long maxBytes;
     private final AtomicLong usedBytes = new AtomicLong(0);
 
     UsedBytesCount(long maxBytes) {
@@ -102,6 +104,10 @@ public final class PmemVolumeManager {
     long getAvailableBytes() {
       return maxBytes - usedBytes.get();
     }
+
+    void setMaxBytes(long maxBytes) {
+      this.maxBytes = maxBytes;
+    }
   }
 
   private static final Logger LOG =
@@ -113,6 +119,7 @@ public final class PmemVolumeManager {
   private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
       new ConcurrentHashMap<>();
   private final List<UsedBytesCount> usedBytesCounts = new ArrayList<>();
+  private boolean cacheRecoveryEnabled;
 
   /**
    * The total cache capacity in bytes of persistent memory.
@@ -122,12 +129,14 @@ public final class PmemVolumeManager {
   private int count = 0;
   private byte nextIndex = 0;
 
-  private PmemVolumeManager(String[] pmemVolumesConfig) throws IOException {
+  private PmemVolumeManager(String[] pmemVolumesConfig,
+                            boolean cacheRecoveryEnabled) throws IOException {
     if (pmemVolumesConfig == null || pmemVolumesConfig.length == 0) {
       throw new IOException("The persistent memory volume, " +
-          DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY +
+          DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY +
           " is not configured!");
     }
+    this.cacheRecoveryEnabled = cacheRecoveryEnabled;
     this.loadVolumes(pmemVolumesConfig);
     cacheCapacity = 0L;
     for (UsedBytesCount counter : usedBytesCounts) {
@@ -135,10 +144,12 @@ public final class PmemVolumeManager {
     }
   }
 
-  public synchronized static void init(String[] pmemVolumesConfig)
+  public synchronized static void init(
+      String[] pmemVolumesConfig, boolean cacheRecoveryEnabled)
       throws IOException {
     if (pmemVolumeManager == null) {
-      pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig);
+      pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig,
+          cacheRecoveryEnabled);
     }
   }
 
@@ -151,6 +162,11 @@ public final class PmemVolumeManager {
   }
 
   @VisibleForTesting
+  public static void reset() {
+    pmemVolumeManager = null;
+  }
+
+  @VisibleForTesting
   public static void setMaxBytes(long maxBytes) {
     maxBytesPerPmem = maxBytes;
   }
@@ -218,8 +234,10 @@ public final class PmemVolumeManager {
       try {
         File pmemDir = new File(volumes[n]);
         File realPmemDir = verifyIfValidPmemVolume(pmemDir);
-        // Clean up the cache left before, if any.
-        cleanup(realPmemDir);
+        if (!cacheRecoveryEnabled) {
+          // Clean up the cache left before, if any.
+          cleanup(realPmemDir);
+        }
         this.pmemVolumes.add(realPmemDir.getPath());
         long maxBytes;
         if (maxBytesPerPmem == -1) {
@@ -261,6 +279,41 @@ public final class PmemVolumeManager {
     }
   }
 
+  /**
+   * Recover cache from the cached files in the configured pmem volumes.
+   */
+  public Map<ExtendedBlockId, MappableBlock> recoverCache(
+      String bpid, MappableBlockLoader cacheLoader) throws IOException {
+    final Map<ExtendedBlockId, MappableBlock> keyToMappableBlock
+        = new ConcurrentHashMap<>();
+    for (byte volumeIndex = 0; volumeIndex < pmemVolumes.size();
+         volumeIndex++) {
+      long maxBytes = usedBytesCounts.get(volumeIndex).getMaxBytes();
+      long usedBytes = 0;
+      File cacheDir = new File(pmemVolumes.get(volumeIndex), bpid);
+      Collection<File> cachedFileList = FileUtils.listFiles(cacheDir,
+          TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+      // Scan the cached files in pmem volumes for cache recovery.
+      for (File cachedFile : cachedFileList) {
+        MappableBlock mappableBlock = cacheLoader.
+            getRecoveredMappableBlock(cachedFile, bpid, volumeIndex);
+        ExtendedBlockId key = mappableBlock.getKey();
+        keyToMappableBlock.put(key, mappableBlock);
+        usedBytes += cachedFile.length();
+      }
+      // Update maxBytes and cache capacity according to cache space
+      // used by recovered cached files.
+      usedBytesCounts.get(volumeIndex).setMaxBytes(maxBytes + usedBytes);
+      cacheCapacity += usedBytes;
+      usedBytesCounts.get(volumeIndex).reserve(usedBytes);
+    }
+    return keyToMappableBlock;
+  }
+
+  public void recoverBlockKeyToVolume(ExtendedBlockId key, byte volumeIndex) {
+    blockKeyToVolume.put(key, volumeIndex);
+  }
+
   @VisibleForTesting
   static File verifyIfValidPmemVolume(File pmemDir)
       throws IOException {
@@ -316,6 +369,18 @@ public final class PmemVolumeManager {
     }
   }
 
+  /**
+   * Create cache subdirectory specified with blockPoolId.
+   */
+  public void createBlockPoolDir(String bpid) throws IOException {
+    for (String volume : pmemVolumes) {
+      File cacheDir = new File(volume, bpid);
+      if (!cacheDir.exists() && !cacheDir.mkdir()) {
+        throw new IOException("Failed to create " + cacheDir.getPath());
+      }
+    }
+  }
+
   public static String getRealPmemDir(String rawPmemDir) {
     return new File(rawPmemDir, CACHE_DIR).getAbsolutePath();
   }
@@ -355,19 +420,22 @@ public final class PmemVolumeManager {
     return pmemVolumes.get(index);
   }
 
+  ArrayList<String> getVolumes() {
+    return pmemVolumes;
+  }
+
   /**
-   * The cache file is named as BlockPoolId-BlockId.
-   * So its name can be inferred by BlockPoolId and BlockId.
+   * A cache file is named after the corresponding BlockId.
+   * Thus, cache file name can be inferred according to BlockId.
    */
-  public String getCacheFileName(ExtendedBlockId key) {
-    return key.getBlockPoolId() + "-" + key.getBlockId();
+  public String idToCacheFileName(ExtendedBlockId key) {
+    return String.valueOf(key.getBlockId());
   }
 
   /**
-   * Considering the pmem volume size is below TB level currently,
-   * it is tolerable to keep cache files under one directory.
-   * The strategy will be optimized, especially if one pmem volume
-   * has huge cache capacity.
+   * Create and get the directory where a cache file with this key and
+   * volumeIndex should be stored. Use hierarchical strategy of storing
+   * blocks to avoid keeping cache files under one directory.
    *
    * @param volumeIndex   The index of pmem volume where a replica will be
    *                      cached to or has been cached to.
@@ -376,19 +444,31 @@ public final class PmemVolumeManager {
    *
    * @return              A path to which the block replica is mapped.
    */
-  public String inferCacheFilePath(Byte volumeIndex, ExtendedBlockId key) {
-    return pmemVolumes.get(volumeIndex) + "/" + getCacheFileName(key);
+  public String idToCacheFilePath(Byte volumeIndex, ExtendedBlockId key)
+      throws IOException {
+    final String cacheSubdirPrefix = "subdir";
+    long blockId = key.getBlockId();
+    String bpid = key.getBlockPoolId();
+    int d1 = (int) ((blockId >> 16) & 0x1F);
+    int d2 = (int) ((blockId >> 8) & 0x1F);
+    String parentDir = pmemVolumes.get(volumeIndex) + "/" + bpid;
+    String subDir = cacheSubdirPrefix + d1 + "/" + cacheSubdirPrefix + d2;
+    File filePath = new File(parentDir, subDir);
+    if (!filePath.exists() && !filePath.mkdirs()) {
+      throw new IOException("Failed to create " + filePath.getPath());
+    }
+    return filePath.getAbsolutePath() + "/" + idToCacheFileName(key);
   }
 
   /**
-   * The cache file path is pmemVolume/BlockPoolId-BlockId.
+   * The cache file path is pmemVolume/BlockPoolId/subdir#/subdir#/BlockId.
    */
-  public String getCachePath(ExtendedBlockId key) {
+  public String getCachePath(ExtendedBlockId key) throws IOException {
     Byte volumeIndex = blockKeyToVolume.get(key);
     if (volumeIndex == null) {
       return  null;
     }
-    return inferCacheFilePath(volumeIndex, key);
+    return idToCacheFilePath(volumeIndex, key);
   }
 
   @VisibleForTesting
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index f0ce288..0d804af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2527,12 +2527,21 @@
 </property>
 
 <property>
-  <name>dfs.datanode.cache.pmem.dirs</name>
+  <name>dfs.datanode.pmem.cache.dirs</name>
   <value></value>
   <description>
     This value specifies the persistent memory directory used for caching block
-    replica. It matters only if the value of dfs.datanode.cache.loader.class is
-    PmemMappableBlockLoader. Multiple directories separated by "," are acceptable.
+    replica. Multiple directories separated by "," are acceptable.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.pmem.cache.recovery</name>
+  <value>true</value>
+  <description>
+    This value specifies whether previous cache on persistent memory will be recovered.
+    This configuration can take effect only if persistent memory cache is enabled by
+    specifying value for 'dfs.datanode.pmem.cache.dirs'.
   </description>
 </property>
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/CentralizedCacheManagement.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/CentralizedCacheManagement.md
index 85cc242..7cad51d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/CentralizedCacheManagement.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/CentralizedCacheManagement.md
@@ -224,7 +224,7 @@ Be sure to configure one of the following properties for DRAM cache or persisten
 
     This setting is shared with the [Lazy Persist Writes feature](./MemoryStorage.html). The Data Node will ensure that the combined memory used by Lazy Persist Writes and Centralized Cache Management does not exceed the amount configured in `dfs.datanode.max.locked.memory`.
 
-*   dfs.datanode.cache.pmem.dirs
+*   dfs.datanode.pmem.cache.dirs
 
     This property specifies the cache volume of persistent memory. For multiple volumes, they should be separated by “,”, e.g. “/mnt/pmem0, /mnt/pmem1”. The default value is empty. If this property is configured, the volume capacity will be detected. And there is no need to configure `dfs.datanode.max.locked.memory`.
 
@@ -254,6 +254,10 @@ The following properties are not required, but may be specified for tuning:
 
     The percentage of the Java heap which we will allocate to the cached blocks map. The cached blocks map is a hash map which uses chained hashing. Smaller maps may be accessed more slowly if the number of cached blocks is large; larger maps will consume more memory. The default is 0.25 percent.
 
+*   dfs.datanode.pmem.cache.recovery
+
+    This parameter is used to determine whether to recover the status for previous cache on persistent memory during the start of DataNode. If it is enabled, DataNode will recover the status for previously cached data on persistent memory. Thus, re-caching data will be avoided. If this property is not enabled, DataNode will clean up the previous cache, if any, on persistent memory. This property can only work when persistent memory is enabled, i.e., `dfs.datanode.pmem.cache.dirs` is configured.
+
 ### OS Limits
 
 If you get the error "Cannot start datanode because the configured max locked memory size... is more than the datanode's available RLIMIT\_MEMLOCK ulimit," that means that the operating system is imposing a lower limit on the amount of memory that you can lock than what you have configured. To fix this, you must adjust the ulimit -l value that the DataNode runs with. Usually, this value is configured in `/etc/security/limits.conf`. However, it will vary depending on what operating system [...]
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
index 3c1b705..ea31612 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -137,7 +137,7 @@ public class TestCacheByPmemMappableBlockLoader {
     new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
     new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
     // Configure two bogus pmem volumes
-    conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
+    conf.set(DFS_DATANODE_PMEM_CACHE_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
     PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5));
 
     prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
@@ -259,14 +259,18 @@ public class TestCacheByPmemMappableBlockLoader {
       // The cachePath shouldn't be null if the replica has been cached
       // to pmem.
       assertNotNull(cachePath);
-      String expectFileName =
-          PmemVolumeManager.getInstance().getCacheFileName(key);
+      Path path = new Path(cachePath);
+      String fileName = path.getName();
       if (cachePath.startsWith(PMEM_DIR_0)) {
-        assertTrue(cachePath.equals(PmemVolumeManager
-            .getRealPmemDir(PMEM_DIR_0) + "/" + expectFileName));
+        String expectPath = PmemVolumeManager.
+            getRealPmemDir(PMEM_DIR_0) + "/" + key.getBlockPoolId();
+        assertTrue(path.toString().startsWith(expectPath));
+        assertTrue(key.getBlockId() == Long.parseLong(fileName));
       } else if (cachePath.startsWith(PMEM_DIR_1)) {
-        assertTrue(cachePath.equals(PmemVolumeManager
-            .getRealPmemDir(PMEM_DIR_1) + "/" + expectFileName));
+        String expectPath = PmemVolumeManager.
+            getRealPmemDir(PMEM_DIR_1) + "/" + key.getBlockPoolId();
+        assertTrue(path.toString().startsWith(expectPath));
+        assertTrue(key.getBlockId() == Long.parseLong(fileName));
       } else {
         fail("The cache path is not the expected one: " + cachePath);
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestPmemCacheRecovery.java
similarity index 65%
copy from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestPmemCacheRecovery.java
index 3c1b705..c4c5aa0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestPmemCacheRecovery.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -65,18 +65,16 @@ import org.slf4j.event.Level;
 import com.google.common.base.Supplier;
 import com.google.common.primitives.Ints;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY;
-
 /**
  * Tests HDFS persistent memory cache by PmemMappableBlockLoader.
  *
  * Bogus persistent memory volume is used to cache blocks.
  */
-public class TestCacheByPmemMappableBlockLoader {
+public class TestPmemCacheRecovery {
   protected static final org.slf4j.Logger LOG =
       LoggerFactory.getLogger(TestCacheByPmemMappableBlockLoader.class);
 
-  protected static final long CACHE_CAPACITY = 64 * 1024;
+  protected static final long CACHE_AMOUNT = 64 * 1024;
   protected static final long BLOCK_SIZE = 4 * 1024;
 
   private static Configuration conf;
@@ -84,6 +82,7 @@ public class TestCacheByPmemMappableBlockLoader {
   private static DistributedFileSystem fs;
   private static DataNode dn;
   private static FsDatasetCache cacheManager;
+  private static String blockPoolId = "";
   /**
    * Used to pause DN BPServiceActor threads. BPSA threads acquire the
    * shared read lock. The test acquires the write lock for exclusive access.
@@ -126,19 +125,20 @@ public class TestCacheByPmemMappableBlockLoader {
   @Before
   public void setUp() throws Exception {
     conf = new HdfsConfiguration();
-    conf.setLong(
-        DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
+    conf.setBoolean(DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY, true);
+    conf.setLong(DFSConfigKeys.
+        DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
     conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
-    conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
+    conf.setInt(
+        DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
 
     // Configuration for pmem cache
     new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
     new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
     // Configure two bogus pmem volumes
-    conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
-    PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5));
+    conf.set(DFS_DATANODE_PMEM_CACHE_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
 
     prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
     NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
@@ -165,39 +165,45 @@ public class TestCacheByPmemMappableBlockLoader {
     NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
   }
 
+  protected static void restartCluster() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setBoolean(DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY, true);
+    conf.setLong(DFSConfigKeys.
+        DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
+    conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(
+        DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
+    // Configure two bogus pmem volumes
+    conf.set(DFS_DATANODE_PMEM_CACHE_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
+
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+
+    FsDatasetImpl.setBlockPoolId(blockPoolId);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    cluster.waitActive();
+
+    fs = cluster.getFileSystem();
+    dn = cluster.getDataNodes().get(0);
+    cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
+  }
+
   protected static void shutdownCluster() {
     if (cluster != null) {
       cluster.shutdown();
       cluster = null;
     }
-  }
-
-  @Test
-  public void testPmemVolumeManager() throws IOException {
-    PmemVolumeManager pmemVolumeManager = PmemVolumeManager.getInstance();
-    assertNotNull(pmemVolumeManager);
-    assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity());
-    // Test round-robin selection policy
-    long count1 = 0, count2 = 0;
-    for (int i = 0; i < 10; i++) {
-      Byte index = pmemVolumeManager.chooseVolume(BLOCK_SIZE);
-      String volume = pmemVolumeManager.getVolumeByIndex(index);
-      if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_0))) {
-        count1++;
-      } else if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_1))) {
-        count2++;
-      } else {
-        fail("Unexpected persistent storage location:" + volume);
-      }
-    }
-    assertEquals(count1, count2);
+    PmemVolumeManager.reset();
   }
 
   public List<ExtendedBlockId> getExtendedBlockId(Path filePath, long fileLen)
       throws IOException {
     List<ExtendedBlockId> keys = new ArrayList<>();
-    HdfsBlockLocation[] locs =
-        (HdfsBlockLocation[]) fs.getFileBlockLocations(filePath, 0, fileLen);
+    HdfsBlockLocation[] locs = (HdfsBlockLocation[]) fs.getFileBlockLocations(
+        filePath, 0, fileLen);
     for (HdfsBlockLocation loc : locs) {
       long bkid = loc.getLocatedBlock().getBlock().getBlockId();
       String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
@@ -207,17 +213,14 @@ public class TestCacheByPmemMappableBlockLoader {
   }
 
   @Test(timeout = 60000)
-  public void testCacheAndUncache() throws Exception {
-    final int maxCacheBlocksNum =
-        Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
+  public void testCacheRecovery() throws Exception {
+    final int cacheBlocksNum =
+        Ints.checkedCast(CACHE_AMOUNT / BLOCK_SIZE);
     BlockReaderTestUtil.enableHdfsCachingTracing();
-    Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
-    assertEquals(CACHE_CAPACITY, cacheManager.getCacheCapacity());
-    // DRAM cache is expected to be disabled.
-    assertEquals(0L, cacheManager.getMemCacheCapacity());
+    Assert.assertEquals(0, CACHE_AMOUNT % BLOCK_SIZE);
 
     final Path testFile = new Path("/testFile");
-    final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE;
+    final long testFileLen = cacheBlocksNum * BLOCK_SIZE;
     DFSTestUtil.createFile(fs, testFile,
         testFileLen, (short) 1, 0xbeef);
     List<ExtendedBlockId> blockKeys =
@@ -233,72 +236,86 @@ public class TestCacheByPmemMappableBlockLoader {
         MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
         long blocksCached =
             MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
-        if (blocksCached != maxCacheBlocksNum) {
-          LOG.info("waiting for " + maxCacheBlocksNum + " blocks to " +
+        if (blocksCached != cacheBlocksNum) {
+          LOG.info("waiting for " + cacheBlocksNum + " blocks to " +
               "be cached. Right now " + blocksCached + " blocks are cached.");
           return false;
         }
-        LOG.info(maxCacheBlocksNum + " blocks are now cached.");
+        LOG.info(cacheBlocksNum + " blocks are now cached.");
         return true;
       }
     }, 1000, 30000);
 
-    // The pmem cache space is expected to have been used up.
-    assertEquals(CACHE_CAPACITY, cacheManager.getCacheUsed());
-    // There should be no cache used on DRAM.
-    assertEquals(0L, cacheManager.getMemCacheUsed());
+    assertEquals(CACHE_AMOUNT, cacheManager.getCacheUsed());
     Map<ExtendedBlockId, Byte> blockKeyToVolume =
         PmemVolumeManager.getInstance().getBlockKeyToVolume();
     // All block keys should be kept in blockKeyToVolume
-    assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
+    assertEquals(blockKeyToVolume.size(), cacheBlocksNum);
     assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
     // Test each replica's cache file path
     for (ExtendedBlockId key : blockKeys) {
+      if (blockPoolId.isEmpty()) {
+        blockPoolId = key.getBlockPoolId();
+      }
       String cachePath = cacheManager.
           getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
       // The cachePath shouldn't be null if the replica has been cached
       // to pmem.
       assertNotNull(cachePath);
-      String expectFileName =
-          PmemVolumeManager.getInstance().getCacheFileName(key);
+      Path path = new Path(cachePath);
+      String fileName = path.getName();
       if (cachePath.startsWith(PMEM_DIR_0)) {
-        assertTrue(cachePath.equals(PmemVolumeManager
-            .getRealPmemDir(PMEM_DIR_0) + "/" + expectFileName));
+        String expectPath = PmemVolumeManager.
+            getRealPmemDir(PMEM_DIR_0) + "/" + key.getBlockPoolId();
+        assertTrue(path.toString().startsWith(expectPath));
+        assertTrue(key.getBlockId() == Long.parseLong(fileName));
       } else if (cachePath.startsWith(PMEM_DIR_1)) {
-        assertTrue(cachePath.equals(PmemVolumeManager
-            .getRealPmemDir(PMEM_DIR_1) + "/" + expectFileName));
+        String expectPath = PmemVolumeManager.
+            getRealPmemDir(PMEM_DIR_1) + "/" + key.getBlockPoolId();
+        assertTrue(path.toString().startsWith(expectPath));
+        assertTrue(key.getBlockId() == Long.parseLong(fileName));
       } else {
         fail("The cache path is not the expected one: " + cachePath);
       }
     }
 
-    // Try to cache another file. Caching this file should fail
-    // due to lack of available cache space.
-    final Path smallTestFile = new Path("/smallTestFile");
-    final long smallTestFileLen =  BLOCK_SIZE;
-    DFSTestUtil.createFile(fs, smallTestFile,
-        smallTestFileLen, (short) 1, 0xbeef);
-    // Try to cache more blocks when no cache space is available.
-    final long smallFileCacheDirectiveId = fs.addCacheDirective(
-        new CacheDirectiveInfo.Builder().setPool("testPool").
-            setPath(smallTestFile).setReplication((short) 1).build());
-
-    // Wait for enough time to verify smallTestFile could not be cached.
-    Thread.sleep(10000);
-    MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
-    long blocksCached =
-        MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
-    // The cached block num should not be increased.
-    assertTrue(blocksCached == maxCacheBlocksNum);
-    // The blockKeyToVolume should just keep the block keys for the testFile.
-    assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
+    // Trigger cache recovery
+    shutdownCluster();
+    restartCluster();
+
+    assertEquals(CACHE_AMOUNT, cacheManager.getCacheUsed());
+    blockKeyToVolume = PmemVolumeManager.getInstance().getBlockKeyToVolume();
+    // All block keys should be kept in blockKeyToVolume
+    assertEquals(blockKeyToVolume.size(), cacheBlocksNum);
     assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
-    // Stop trying to cache smallTestFile to avoid interfering the
-    // verification for uncache functionality.
-    fs.removeCacheDirective(smallFileCacheDirectiveId);
+    // Test each replica's cache file path
+    for (ExtendedBlockId key : blockKeys) {
+      String cachePath = cacheManager.
+          getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
+      // The cachePath shouldn't be null if the replica has been cached
+      // to pmem.
+      assertNotNull(cachePath);
+      Path path = new Path(cachePath);
+      String fileName = path.getName();
+      if (cachePath.startsWith(PMEM_DIR_0)) {
+        String expectPath = PmemVolumeManager.
+            getRealPmemDir(PMEM_DIR_0) + "/" + key.getBlockPoolId();
+        assertTrue(path.toString().startsWith(expectPath));
+        assertTrue(key.getBlockId() == Long.parseLong(fileName));
+      } else if (cachePath.startsWith(PMEM_DIR_1)) {
+        String expectPath = PmemVolumeManager.
+            getRealPmemDir(PMEM_DIR_1) + "/" + key.getBlockPoolId();
+        assertTrue(path.toString().startsWith(expectPath));
+        assertTrue(key.getBlockId() == Long.parseLong(fileName));
+      } else {
+        fail("The cache path is not the expected one: " + cachePath);
+      }
+    }
 
     // Uncache the test file
-    fs.removeCacheDirective(cacheDirectiveId);
+    for (ExtendedBlockId key : blockKeys) {
+      cacheManager.uncacheBlock(blockPoolId, key.getBlockId());
+    }
     // Wait for uncaching
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
@@ -306,13 +323,13 @@ public class TestCacheByPmemMappableBlockLoader {
         MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
         long blocksUncached =
             MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
-        if (blocksUncached != maxCacheBlocksNum) {
-          LOG.info("waiting for " + maxCacheBlocksNum + " blocks to be " +
+        if (blocksUncached != cacheBlocksNum) {
+          LOG.info("waiting for " + cacheBlocksNum + " blocks to be " +
               "uncached. Right now " + blocksUncached +
               " blocks are uncached.");
           return false;
         }
-        LOG.info(maxCacheBlocksNum + " blocks have been uncached.");
+        LOG.info(cacheBlocksNum + " blocks have been uncached.");
         return true;
       }
     }, 1000, 30000);
@@ -322,4 +339,4 @@ public class TestCacheByPmemMappableBlockLoader {
     // No record should be kept by blockKeyToVolume after testFile is uncached.
     assertEquals(blockKeyToVolume.size(), 0);
   }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org