You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/27 07:34:40 UTC

[38/50] [abbrv] hadoop git commit: HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes a lot of time if disks are busy. Contributed by Rushabh Shah.

HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes a lot of time if disks are busy. Contributed by Rushabh Shah.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/19bc19e8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/19bc19e8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/19bc19e8

Branch: refs/heads/YARN-2928
Commit: 19bc19e82b31403eccdd9232fba3b94df01eebdc
Parents: fbc4853
Author: Kihwal Lee <ki...@apache.org>
Authored: Wed Mar 25 14:42:28 2015 -0500
Committer: Zhijie Shen <zj...@apache.org>
Committed: Thu Mar 26 23:29:48 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/protocol/BlockListAsLongs.java  |  37 +++
 .../hadoop/hdfs/server/datanode/DataNode.java   |   7 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java | 268 ++++++++++++++-----
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   3 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |   8 +-
 .../datanode/fsdataset/impl/FsVolumeList.java   |   7 +-
 .../apache/hadoop/hdfs/UpgradeUtilities.java    |   5 +-
 .../hdfs/server/datanode/DataNodeTestUtils.java |   7 +
 .../fsdataset/impl/TestWriteToReplica.java      | 152 +++++++++++
 .../namenode/TestListCorruptFileBlocks.java     |   6 +
 .../namenode/TestProcessCorruptBlocks.java      |   3 +
 .../ha/TestPendingCorruptDnMessages.java        |   3 +
 13 files changed, 430 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8f1d5fc..62c2f91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -339,6 +339,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-7713. Implement mkdirs in the HDFS Web UI. (Ravi Prakash via wheat9)
 
+    HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes
+    a lot of time if disks are busy (Rushabh S Shah via kihwal)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
index 1c89ee4..834546b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,6 +35,7 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
 
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -108,6 +111,40 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
     return builder.build();
   }
 
+  public static BlockListAsLongs readFrom(InputStream is) throws IOException {
+    CodedInputStream cis = CodedInputStream.newInstance(is);
+    int numBlocks = -1;
+    ByteString blocksBuf = null;
+    while (!cis.isAtEnd()) {
+      int tag = cis.readTag();
+      int field = WireFormat.getTagFieldNumber(tag);
+      switch(field) {
+        case 0:
+          break;
+        case 1:
+          numBlocks = (int)cis.readInt32();
+          break;
+        case 2:
+          blocksBuf = cis.readBytes();
+          break;
+        default:
+          cis.skipField(tag);
+          break;
+      }
+    }
+    if (numBlocks != -1 && blocksBuf != null) {
+      return decodeBuffer(numBlocks, blocksBuf);
+    }
+    return null;
+  }
+
+  public void writeTo(OutputStream os) throws IOException {
+    CodedOutputStream cos = CodedOutputStream.newInstance(os);
+    cos.writeInt32(1, getNumberOfBlocks());
+    cos.writeBytes(2, getBlocksBuffer());
+    cos.flush();
+  }
+  
   public static Builder builder() {
     return new BlockListAsLongs.Builder();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index d94375e..3368124 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -41,8 +41,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMOR
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
@@ -159,6 +157,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -2500,6 +2499,10 @@ public class DataNode extends ReconfigurableBase
     return blockScanner;
   }
 
+  @VisibleForTesting
+  DirectoryScanner getDirectoryScanner() {
+    return directoryScanner;
+  }
 
   public static void secureMain(String args[], SecureResources resources) {
     int errorCode = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 5a69e1e..6daf039 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -23,12 +23,12 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.io.RandomAccessFile;
 import java.io.Writer;
+import java.util.Iterator;
 import java.util.Scanner;
 
 import org.apache.commons.io.FileUtils;
@@ -39,6 +39,8 @@ import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@@ -55,6 +57,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.io.Files;
 /**
  * A block pool slice represents a portion of a block pool stored on a volume.  
  * Taken together, all BlockPoolSlices sharing a block pool ID across a 
@@ -77,7 +80,9 @@ class BlockPoolSlice {
   private volatile boolean dfsUsedSaved = false;
   private static final int SHUTDOWN_HOOK_PRIORITY = 30;
   private final boolean deleteDuplicateReplicas;
-  
+  private static final String REPLICA_CACHE_FILE = "replicas";
+  private final long replicaCacheExpiry = 5*60*1000;
+
   // TODO:FEDERATION scalability issue - a thread per DU is needed
   private final DU dfsUsage;
 
@@ -310,11 +315,14 @@ class BlockPoolSlice {
       FsDatasetImpl.LOG.info(
           "Recovered " + numRecovered + " replicas from " + lazypersistDir);
     }
-
-    // add finalized replicas
-    addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
-    // add rbw replicas
-    addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
+    
+    boolean  success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
+    if (!success) {
+      // add finalized replicas
+      addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
+      // add rbw replicas
+      addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
+    }
   }
 
   /**
@@ -401,6 +409,75 @@ class BlockPoolSlice {
     FileUtil.fullyDelete(source);
     return numRecovered;
   }
+  
+  private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
+      final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized)
+      throws IOException {
+    ReplicaInfo newReplica = null;
+    long blockId = block.getBlockId();
+    long genStamp = block.getGenerationStamp();
+    if (isFinalized) {
+      newReplica = new FinalizedReplica(blockId, 
+          block.getNumBytes(), genStamp, volume, DatanodeUtil
+          .idToBlockDir(finalizedDir, blockId));
+    } else {
+      File file = new File(rbwDir, block.getBlockName());
+      boolean loadRwr = true;
+      File restartMeta = new File(file.getParent()  +
+          File.pathSeparator + "." + file.getName() + ".restart");
+      Scanner sc = null;
+      try {
+        sc = new Scanner(restartMeta, "UTF-8");
+        // The restart meta file exists
+        if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
+          // It didn't expire. Load the replica as a RBW.
+          // We don't know the expected block length, so just use 0
+          // and don't reserve any more space for writes.
+          newReplica = new ReplicaBeingWritten(blockId,
+              validateIntegrityAndSetLength(file, genStamp), 
+              genStamp, volume, file.getParentFile(), null, 0);
+          loadRwr = false;
+        }
+        sc.close();
+        if (!restartMeta.delete()) {
+          FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
+              restartMeta.getPath());
+        }
+      } catch (FileNotFoundException fnfe) {
+        // nothing to do hereFile dir =
+      } finally {
+        if (sc != null) {
+          sc.close();
+        }
+      }
+      // Restart meta doesn't exist or expired.
+      if (loadRwr) {
+        newReplica = new ReplicaWaitingToBeRecovered(blockId,
+            validateIntegrityAndSetLength(file, genStamp),
+            genStamp, volume, file.getParentFile());
+      }
+    }
+
+    ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
+    if (oldReplica == null) {
+      volumeMap.add(bpid, newReplica);
+    } else {
+      // We have multiple replicas of the same block so decide which one
+      // to keep.
+      newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
+    }
+
+    // If we are retaining a replica on transient storage make sure
+    // it is in the lazyWriteReplicaMap so it can be persisted
+    // eventually.
+    if (newReplica.getVolume().isTransientStorage()) {
+      lazyWriteReplicaMap.addReplica(bpid, blockId,
+          (FsVolumeImpl) newReplica.getVolume());
+    } else {
+      lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
+    }
+  }
+  
 
   /**
    * Add replicas under the given directory to the volume map
@@ -434,66 +511,9 @@ class BlockPoolSlice {
       long genStamp = FsDatasetUtil.getGenerationStampFromFile(
           files, file);
       long blockId = Block.filename2id(file.getName());
-      ReplicaInfo newReplica = null;
-      if (isFinalized) {
-        newReplica = new FinalizedReplica(blockId, 
-            file.length(), genStamp, volume, file.getParentFile());
-      } else {
-
-        boolean loadRwr = true;
-        File restartMeta = new File(file.getParent()  +
-            File.pathSeparator + "." + file.getName() + ".restart");
-        Scanner sc = null;
-        try {
-          sc = new Scanner(restartMeta, "UTF-8");
-          // The restart meta file exists
-          if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
-            // It didn't expire. Load the replica as a RBW.
-            // We don't know the expected block length, so just use 0
-            // and don't reserve any more space for writes.
-            newReplica = new ReplicaBeingWritten(blockId,
-                validateIntegrityAndSetLength(file, genStamp),
-                genStamp, volume, file.getParentFile(), null, 0);
-            loadRwr = false;
-          }
-          sc.close();
-          if (!restartMeta.delete()) {
-            FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
-              restartMeta.getPath());
-          }
-        } catch (FileNotFoundException fnfe) {
-          // nothing to do hereFile dir =
-        } finally {
-          if (sc != null) {
-            sc.close();
-          }
-        }
-        // Restart meta doesn't exist or expired.
-        if (loadRwr) {
-          newReplica = new ReplicaWaitingToBeRecovered(blockId,
-              validateIntegrityAndSetLength(file, genStamp),
-              genStamp, volume, file.getParentFile());
-        }
-      }
-
-      ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
-      if (oldReplica == null) {
-        volumeMap.add(bpid, newReplica);
-      } else {
-        // We have multiple replicas of the same block so decide which one
-        // to keep.
-        newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
-      }
-
-      // If we are retaining a replica on transient storage make sure
-      // it is in the lazyWriteReplicaMap so it can be persisted
-      // eventually.
-      if (newReplica.getVolume().isTransientStorage()) {
-        lazyWriteReplicaMap.addReplica(bpid, blockId,
-                                       (FsVolumeImpl) newReplica.getVolume());
-      } else {
-        lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
-      }
+      Block block = new Block(blockId, file.length(), genStamp); 
+      addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap, 
+          isFinalized);
     }
   }
 
@@ -649,9 +669,121 @@ class BlockPoolSlice {
     return currentDir.getAbsolutePath();
   }
   
-  void shutdown() {
+  void shutdown(BlockListAsLongs blocksListToPersist) {
+    saveReplicas(blocksListToPersist);
     saveDfsUsed();
     dfsUsedSaved = true;
     dfsUsage.shutdown();
   }
+
+  private boolean readReplicasFromCache(ReplicaMap volumeMap,
+      final RamDiskReplicaTracker lazyWriteReplicaMap) {
+    ReplicaMap tmpReplicaMap = new ReplicaMap(this);
+    File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
+    // Check whether the file exists or not.
+    if (!replicaFile.exists()) {
+      LOG.info("Replica Cache file: "+  replicaFile.getPath() + 
+          " doesn't exist ");
+      return false;
+    }
+    long fileLastModifiedTime = replicaFile.lastModified();
+    if (System.currentTimeMillis() > fileLastModifiedTime + replicaCacheExpiry) {
+      LOG.info("Replica Cache file: " + replicaFile.getPath() + 
+          " has gone stale");
+      // Just to make findbugs happy
+      if (!replicaFile.delete()) {
+        LOG.info("Replica Cache file: " + replicaFile.getPath() + 
+            " cannot be deleted");
+      }
+      return false;
+    }
+    FileInputStream inputStream = null;
+    try {
+      inputStream = new FileInputStream(replicaFile);
+      BlockListAsLongs blocksList =  BlockListAsLongs.readFrom(inputStream);
+      Iterator<BlockReportReplica> iterator = blocksList.iterator();
+      while (iterator.hasNext()) {
+        BlockReportReplica replica = iterator.next();
+        switch (replica.getState()) {
+        case FINALIZED:
+          addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true);
+          break;
+        case RUR:
+        case RBW:
+        case RWR:
+          addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, false);
+          break;
+        default:
+          break;
+        }
+      }
+      inputStream.close();
+      // Now it is safe to add the replica into volumeMap
+      // In case of any exception during parsing this cache file, fall back
+      // to scan all the files on disk.
+      for (ReplicaInfo info: tmpReplicaMap.replicas(bpid)) {
+        volumeMap.add(bpid, info);
+      }
+      LOG.info("Successfully read replica from cache file : " 
+          + replicaFile.getPath());
+      return true;
+    } catch (Exception e) {
+      // Any exception we need to revert back to read from disk
+      // Log the error and return false
+      LOG.info("Exception occured while reading the replicas cache file: "
+          + replicaFile.getPath(), e );
+      return false;
+    }
+    finally {
+      if (!replicaFile.delete()) {
+        LOG.info("Failed to delete replica cache file: " +
+            replicaFile.getPath());
+      }
+      // close the inputStream
+      IOUtils.closeStream(inputStream);
+    }
+  } 
+  
+  private void saveReplicas(BlockListAsLongs blocksListToPersist) {
+    if (blocksListToPersist == null || 
+        blocksListToPersist.getNumberOfBlocks()== 0) {
+      return;
+    }
+    File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
+    if (tmpFile.exists() && !tmpFile.delete()) {
+      LOG.warn("Failed to delete tmp replicas file in " +
+        tmpFile.getPath());
+      return;
+    }
+    File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
+    if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
+      LOG.warn("Failed to delete replicas file in " +
+          replicaCacheFile.getPath());
+      return;
+    }
+    
+    FileOutputStream out = null;
+    try {
+      out = new FileOutputStream(tmpFile);
+      blocksListToPersist.writeTo(out);
+      out.close();
+      // Renaming the tmp file to replicas
+      Files.move(tmpFile, replicaCacheFile);
+    } catch (Exception e) {
+      // If write failed, the volume might be bad. Since the cache file is
+      // not critical, log the error, delete both the files (tmp and cache)
+      // and continue.
+      LOG.warn("Failed to write replicas to cache ", e);
+      if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
+        LOG.warn("Failed to delete replicas file: " + 
+            replicaCacheFile.getPath());
+      }
+    } finally {
+      IOUtils.closeStream(out);
+      if (tmpFile.exists() && !tmpFile.delete()) {
+        LOG.warn("Failed to delete tmp file in " +
+            tmpFile.getPath());
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 05c4871..cf471ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2463,8 +2463,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override
   public synchronized void shutdownBlockPool(String bpid) {
     LOG.info("Removing block pool " + bpid);
+    Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume =  getBlockReports(bpid);
     volumeMap.cleanUpBlockPool(bpid);
-    volumes.removeBlockPool(bpid);
+    volumes.removeBlockPool(bpid, blocksPerVolume);
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 23efbdf..4dbc7f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -65,7 +66,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.Time;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -805,7 +805,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
     Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
     for (Entry<String, BlockPoolSlice> entry : set) {
-      entry.getValue().shutdown();
+      entry.getValue().shutdown(null);
     }
   }
 
@@ -815,10 +815,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
     bpSlices.put(bpid, bp);
   }
   
-  void shutdownBlockPool(String bpid) {
+  void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
     BlockPoolSlice bp = bpSlices.get(bpid);
     if (bp != null) {
-      bp.shutdown();
+      bp.shutdown(blocksListsAsLongs);
     }
     bpSlices.remove(bpid);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index a5611c5..4fddfb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -35,10 +35,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 
@@ -428,9 +430,10 @@ class FsVolumeList {
         bpid + ": " + totalTimeTaken + "ms");
   }
   
-  void removeBlockPool(String bpid) {
+  void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
+      blocksPerVolume) {
     for (FsVolumeImpl v : volumes.get()) {
-      v.shutdownBlockPool(bpid);
+      v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
index 2e5348e..e9891bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
@@ -304,10 +304,11 @@ public class UpgradeUtilities {
         continue;
       }
 
-      // skip VERSION and dfsUsed file for DataNodes
+      // skip VERSION and dfsUsed and replicas file for DataNodes
       if (nodeType == DATA_NODE &&
           (list[i].getName().equals("VERSION") ||
-              list[i].getName().equals("dfsUsed"))) {
+              list[i].getName().equals("dfsUsed") ||
+              list[i].getName().equals("replicas"))) {
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
index f9a2ba1..9dee724 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
@@ -218,4 +218,11 @@ public class DataNodeTestUtils {
       }
     }
   }
+  
+  public static void runDirectoryScanner(DataNode dn) throws IOException {
+    DirectoryScanner directoryScanner = dn.getDirectoryScanner();
+    if (directoryScanner != null) {
+      dn.getDirectoryScanner().reconcile();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index 9325cdc..96a73c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -17,14 +17,25 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
@@ -34,6 +45,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -501,4 +513,144 @@ public class TestWriteToReplica {
           + "genstamp and replaced it with the newer one: " + blocks[NON_EXISTENT]);
     }
   }
+  
+  /**
+   * This is a test to check the replica map before and after the datanode 
+   * quick restart (less than 5 minutes)
+   * @throws Exception
+   */
+  @Test
+  public  void testReplicaMapAfterDatanodeRestart() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
+        .build();
+    try {
+      cluster.waitActive();
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      assertNotNull("cannot create nn1", nn1);
+      assertNotNull("cannot create nn2", nn2);
+      
+      // check number of volumes in fsdataset
+      DataNode dn = cluster.getDataNodes().get(0);
+      FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.
+          getFSDataset(dn);
+      ReplicaMap replicaMap = dataSet.volumeMap;
+      
+      List<FsVolumeImpl> volumes = dataSet.getVolumes();
+      // number of volumes should be 2 - [data1, data2]
+      assertEquals("number of volumes is wrong", 2, volumes.size());
+      ArrayList<String> bpList = new ArrayList<String>(Arrays.asList(
+          cluster.getNamesystem(0).getBlockPoolId(), 
+          cluster.getNamesystem(1).getBlockPoolId()));
+      
+      Assert.assertTrue("Cluster should have 2 block pools", 
+          bpList.size() == 2);
+      
+      createReplicas(bpList, volumes, replicaMap);
+      ReplicaMap oldReplicaMap = new ReplicaMap(this);
+      oldReplicaMap.addAll(replicaMap);
+      
+      cluster.restartDataNode(0);
+      cluster.waitActive();
+      dn = cluster.getDataNodes().get(0);
+      dataSet = (FsDatasetImpl) dn.getFSDataset();
+      testEqualityOfReplicaMap(oldReplicaMap, dataSet.volumeMap, bpList);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Compare the replica map before and after the restart
+   **/
+  private void testEqualityOfReplicaMap(ReplicaMap oldReplicaMap, ReplicaMap 
+      newReplicaMap, List<String> bpidList) {
+    // Traversing through newReplica map and remove the corresponding 
+    // replicaInfo from oldReplicaMap.
+    for (String bpid: bpidList) {
+      for (ReplicaInfo info: newReplicaMap.replicas(bpid)) {
+        assertNotNull("Volume map before restart didn't contain the "
+            + "blockpool: " + bpid, oldReplicaMap.replicas(bpid));
+        
+        ReplicaInfo oldReplicaInfo = oldReplicaMap.get(bpid, 
+            info.getBlockId());
+        // Volume map after restart contains a blockpool id which 
+        assertNotNull("Old Replica Map didnt't contain block with blockId: " +
+            info.getBlockId(), oldReplicaInfo);
+        
+        ReplicaState oldState = oldReplicaInfo.getState();
+        // Since after restart, all the RWR, RBW and RUR blocks gets 
+        // converted to RWR
+        if (info.getState() == ReplicaState.RWR) {
+           if (oldState == ReplicaState.RWR || oldState == ReplicaState.RBW 
+               || oldState == ReplicaState.RUR) {
+             oldReplicaMap.remove(bpid, oldReplicaInfo);
+           }
+        } else if (info.getState() == ReplicaState.FINALIZED && 
+            oldState == ReplicaState.FINALIZED) {
+          oldReplicaMap.remove(bpid, oldReplicaInfo);
+        }
+      }
+    }
+    
+    // We don't persist the ReplicaInPipeline replica
+    // and if the old replica map contains any replica except ReplicaInPipeline
+    // then we didn't persist that replica
+    for (String bpid: bpidList) {
+      for (ReplicaInfo replicaInfo: oldReplicaMap.replicas(bpid)) {
+        if (replicaInfo.getState() != ReplicaState.TEMPORARY) {
+          Assert.fail("After datanode restart we lost the block with blockId: "
+              +  replicaInfo.getBlockId());
+        }
+      }
+    }
+  }
+
+  private void createReplicas(List<String> bpList, List<FsVolumeImpl> volumes,
+      ReplicaMap volumeMap) throws IOException {
+    Assert.assertTrue("Volume map can't be null" , volumeMap != null);
+    
+    // Here we create all different type of replicas and add it
+    // to volume map. 
+    // Created all type of ReplicaInfo, each under Blkpool corresponding volume
+    long id = 1; // This variable is used as both blockId and genStamp
+    for (String bpId: bpList) {
+      for (FsVolumeImpl volume: volumes) {
+        ReplicaInfo finalizedReplica = new FinalizedReplica(id, 1, id, volume,
+            DatanodeUtil.idToBlockDir(volume.getFinalizedDir(bpId), id));
+        volumeMap.add(bpId, finalizedReplica);
+        id++;
+        
+        ReplicaInfo rbwReplica = new ReplicaBeingWritten(id, 1, id, volume, 
+            volume.getRbwDir(bpId), null, 100);
+        volumeMap.add(bpId, rbwReplica);
+        id++;
+
+        ReplicaInfo rwrReplica = new ReplicaWaitingToBeRecovered(id, 1, id, 
+            volume, volume.getRbwDir(bpId));
+        volumeMap.add(bpId, rwrReplica);
+        id++;
+        
+        ReplicaInfo ripReplica = new ReplicaInPipeline(id, id, volume, 
+            volume.getTmpDir(bpId), 0);
+        volumeMap.add(bpId, ripReplica);
+        id++;
+      }
+    }
+    
+    for (String bpId: bpList) {
+      for (ReplicaInfo replicaInfo: volumeMap.replicas(bpId)) {
+        File parentFile = replicaInfo.getBlockFile().getParentFile();
+        if (!parentFile.exists()) {
+          if (!parentFile.mkdirs()) {
+            throw new IOException("Failed to mkdirs " + parentFile);
+          }
+        }
+        replicaInfo.getBlockFile().createNewFile();
+        replicaInfo.getMetaFile().createNewFile();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
index 92ea111..5d319b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -483,6 +485,10 @@ public class TestListCorruptFileBlocks {
         }
       }
 
+      // Run the direcrtoryScanner to update the Datanodes volumeMap
+      DataNode dn = cluster.getDataNodes().get(0);
+      DataNodeTestUtils.runDirectoryScanner(dn);
+
       // Occasionally the BlockPoolSliceScanner can run before we have removed
       // the blocks. Restart the Datanode to trigger the scanner into running
       // once more.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java
index 168ebb9..37abc5b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.junit.Test;
 
 public class TestProcessCorruptBlocks {
@@ -269,6 +270,8 @@ public class TestProcessCorruptBlocks {
     // But the datadirectory will not change
     assertTrue(cluster.corruptReplica(dnIndex, block));
 
+    // Run directory scanner to update the DN's volume map  
+    DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0));
     DataNodeProperties dnProps = cluster.stopDataNode(0);
 
     // Each datanode has multiple data dirs, check each

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java
index 4d4fed6..443500c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.util.ThreadUtil;
 import org.junit.Test;
 
@@ -69,6 +70,8 @@ public class TestPendingCorruptDnMessages {
       ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
       assertTrue(cluster.changeGenStampOfBlock(0, block, 900));
       
+      // Run directory dsscanner to update Datanode's volumeMap
+      DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0));
       // Stop the DN so the replica with the changed gen stamp will be reported
       // when this DN starts up.
       DataNodeProperties dnProps = cluster.stopDataNode(0);