You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by kr...@apache.org on 2019/02/01 02:13:39 UTC

[lucene-solr] branch jira/solr-9515 updated: Workaround for BlockPoolSlice ForkJoinPool

This is an automated email from the ASF dual-hosted git repository.

krisden pushed a commit to branch jira/solr-9515
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-9515 by this push:
     new ee75247  Workaround for BlockPoolSlice ForkJoinPool
ee75247 is described below

commit ee752479003930a0ca607cc45801110d7f6b41f3
Author: Kevin Risden <kr...@apache.org>
AuthorDate: Thu Jan 31 20:45:22 2019 -0500

    Workaround for BlockPoolSlice ForkJoinPool
    
    Signed-off-by: Kevin Risden <kr...@apache.org>
---
 .../datanode/fsdataset/impl/BlockPoolSlice.java    | 1057 ++++++++++++++++++++
 1 file changed, 1057 insertions(+)

diff --git a/solr/core/src/test/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/solr/core/src/test/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
new file mode 100644
index 0000000..2cd1c4b
--- /dev/null
+++ b/solr/core/src/test/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -0,0 +1,1057 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Scanner;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.RecursiveAction;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CachingGetSpaceUsed;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.Timer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A block pool slice represents a portion of a block pool stored on a volume.
+ * Taken together, all BlockPoolSlices sharing a block pool ID across a
+ * cluster represent a single block pool.
+ *
+ * This class is synchronized by {@link FsVolumeImpl}.
+ */
+class BlockPoolSlice {
+  static final Logger LOG = LoggerFactory.getLogger(BlockPoolSlice.class);
+
+  private final String bpid;
+  private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
+  private final File currentDir; // StorageDirectory/current/bpid/current
+  // directory where finalized replicas are stored
+  private final File finalizedDir;
+  private final File lazypersistDir;
+  private final File rbwDir; // directory store RBW replica
+  private final File tmpDir; // directory store Temporary replica
+  private final int ioFileBufferSize;
+  @VisibleForTesting
+  public static final String DU_CACHE_FILE = "dfsUsed";
+  private final Runnable shutdownHook;
+  private volatile boolean dfsUsedSaved = false;
+  private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+  private final boolean deleteDuplicateReplicas;
+  private static final String REPLICA_CACHE_FILE = "replicas";
+  private final long replicaCacheExpiry = 5*60*1000;
+  private AtomicLong numOfBlocks = new AtomicLong();
+  private final long cachedDfsUsedCheckTime;
+  private final Timer timer;
+  private final int maxDataLength;
+  private final FileIoProvider fileIoProvider;
+
+  private static ForkJoinPool addReplicaThreadPool = null;
+  private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
+      .getRuntime().availableProcessors();
+  private static final Comparator<File> FILE_COMPARATOR =
+      new Comparator<File>() {
+        @Override
+        public int compare(File f1, File f2) {
+          return f1.getName().compareTo(f2.getName());
+        }
+      };
+
+  // TODO:FEDERATION scalability issue - a thread per DU is needed
+  private final GetSpaceUsed dfsUsage;
+
+  /**
+   * Create a blook pool slice
+   * @param bpid Block pool Id
+   * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
+   * @param bpDir directory corresponding to the BlockPool
+   * @param conf configuration
+   * @param timer include methods for getting time
+   * @throws IOException Error making directories
+   */
+  BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
+                 Configuration conf, Timer timer) throws IOException {
+    this.bpid = bpid;
+    this.volume = volume;
+    this.fileIoProvider = volume.getFileIoProvider();
+    this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
+    this.finalizedDir = new File(
+        currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+    this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
+    if (!this.finalizedDir.exists()) {
+      if (!this.finalizedDir.mkdirs()) {
+        throw new IOException("Failed to mkdirs " + this.finalizedDir);
+      }
+    }
+
+    this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
+
+    this.deleteDuplicateReplicas = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
+        DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
+
+    this.cachedDfsUsedCheckTime =
+        conf.getLong(
+            DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
+            DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS);
+
+    this.maxDataLength = conf.getInt(
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+
+    this.timer = timer;
+
+    // Files that were being written when the datanode was last shutdown
+    // are now moved back to the data directory. It is possible that
+    // in the future, we might want to do some sort of datanode-local
+    // recovery for these blocks. For example, crc validation.
+    //
+    this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
+    if (tmpDir.exists()) {
+      fileIoProvider.fullyDelete(volume, tmpDir);
+    }
+    this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
+
+    // create the rbw and tmp directories if they don't exist.
+    fileIoProvider.mkdirs(volume, rbwDir);
+    fileIoProvider.mkdirs(volume, tmpDir);
+
+    // Use cached value initially if available. Or the following call will
+    // block until the initial du command completes.
+    this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir)
+        .setConf(conf)
+        .setInitialUsed(loadDfsUsed())
+        .build();
+    if (addReplicaThreadPool == null) {
+      // initialize add replica fork join pool
+      initializeAddReplicaPool(conf);
+    }
+    // Make the dfs usage to be saved during shutdown.
+    shutdownHook = new Runnable() {
+      @Override
+      public void run() {
+        if (!dfsUsedSaved) {
+          saveDfsUsed();
+          addReplicaThreadPool.shutdownNow();
+        }
+      }
+    };
+    ShutdownHookManager.get().addShutdownHook(shutdownHook,
+        SHUTDOWN_HOOK_PRIORITY);
+  }
+
+  public class MyForkJoinThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
+    @Override
+    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+      return new NotSoInnocuousWorkerThread(pool);
+    }
+  }
+
+  public class NotSoInnocuousWorkerThread extends ForkJoinWorkerThread {
+    protected NotSoInnocuousWorkerThread(ForkJoinPool pool) {
+      super(pool);
+    }
+  }
+
+  private synchronized void initializeAddReplicaPool(Configuration conf) {
+    if (addReplicaThreadPool == null) {
+      FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset();
+      int numberOfBlockPoolSlice = dataset.getVolumeCount()
+          * dataset.getBPServiceCount();
+      int poolsize = Math.max(numberOfBlockPoolSlice,
+          VOLUMES_REPLICA_ADD_THREADPOOL_SIZE);
+      // Default pool sizes is max of (volume * number of bp_service) and
+      // number of processor.
+      int parallelism = conf.getInt(
+          DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY,
+          poolsize);
+      ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = new MyForkJoinThreadFactory();
+      addReplicaThreadPool = new ForkJoinPool(parallelism, threadFactory, null, false);
+    }
+  }
+
+  File getDirectory() {
+    return currentDir.getParentFile();
+  }
+
+  File getFinalizedDir() {
+    return finalizedDir;
+  }
+
+  File getLazypersistDir() {
+    return lazypersistDir;
+  }
+
+  File getRbwDir() {
+    return rbwDir;
+  }
+
+  File getTmpDir() {
+    return tmpDir;
+  }
+
+  /** Run DU on local drives.  It must be synchronized from caller. */
+  void decDfsUsed(long value) {
+    if (dfsUsage instanceof CachingGetSpaceUsed) {
+      ((CachingGetSpaceUsed)dfsUsage).incDfsUsed(-value);
+    }
+  }
+
+  long getDfsUsed() throws IOException {
+    return dfsUsage.getUsed();
+  }
+
+  void incDfsUsed(long value) {
+    if (dfsUsage instanceof CachingGetSpaceUsed) {
+      ((CachingGetSpaceUsed)dfsUsage).incDfsUsed(value);
+    }
+  }
+
+  /**
+   * Read in the cached DU value and return it if it is less than
+   * cachedDfsUsedCheckTime which is set by
+   * dfs.datanode.cached-dfsused.check.interval.ms parameter. Slight imprecision
+   * of dfsUsed is not critical and skipping DU can significantly shorten the
+   * startup time. If the cached value is not available or too old, -1 is
+   * returned.
+   */
+  long loadDfsUsed() {
+    long cachedDfsUsed;
+    long mtime;
+    Scanner sc;
+
+    try {
+      sc = new Scanner(new File(currentDir, DU_CACHE_FILE), "UTF-8");
+    } catch (FileNotFoundException fnfe) {
+      return -1;
+    }
+
+    try {
+      // Get the recorded dfsUsed from the file.
+      if (sc.hasNextLong()) {
+        cachedDfsUsed = sc.nextLong();
+      } else {
+        return -1;
+      }
+      // Get the recorded mtime from the file.
+      if (sc.hasNextLong()) {
+        mtime = sc.nextLong();
+      } else {
+        return -1;
+      }
+
+      // Return the cached value if mtime is okay.
+      if (mtime > 0 && (timer.now() - mtime < cachedDfsUsedCheckTime)) {
+        FsDatasetImpl.LOG.info("Cached dfsUsed found for " + currentDir + ": " +
+            cachedDfsUsed);
+        return cachedDfsUsed;
+      }
+      return -1;
+    } finally {
+      sc.close();
+    }
+  }
+
+  /**
+   * Write the current dfsUsed to the cache file.
+   */
+  void saveDfsUsed() {
+    File outFile = new File(currentDir, DU_CACHE_FILE);
+    if (!fileIoProvider.deleteWithExistsCheck(volume, outFile)) {
+      FsDatasetImpl.LOG.warn("Failed to delete old dfsUsed file in " +
+          outFile.getParent());
+    }
+
+    try {
+      long used = getDfsUsed();
+      try (Writer out = new OutputStreamWriter(
+          new FileOutputStream(outFile), "UTF-8")) {
+        // mtime is written last, so that truncated writes won't be valid.
+        out.write(Long.toString(used) + " " + Long.toString(timer.now()));
+        // This is only called as part of the volume shutdown.
+        // We explicitly avoid calling flush with fileIoProvider which triggers
+        // volume check upon io exception to avoid cyclic volume checks.
+        out.flush();
+      }
+    } catch (IOException ioe) {
+      // If write failed, the volume might be bad. Since the cache file is
+      // not critical, log the error and continue.
+      FsDatasetImpl.LOG.warn("Failed to write dfsUsed to " + outFile, ioe);
+    }
+  }
+
+  /**
+   * Temporary files. They get moved to the finalized block directory when
+   * the block is finalized.
+   */
+  File createTmpFile(Block b) throws IOException {
+    File f = new File(tmpDir, b.getBlockName());
+    File tmpFile = DatanodeUtil.createFileWithExistsCheck(
+        volume, b, f, fileIoProvider);
+    // If any exception during creation, its expected that counter will not be
+    // incremented, So no need to decrement
+    incrNumBlocks();
+    return tmpFile;
+  }
+
+  /**
+   * RBW files. They get moved to the finalized block directory when
+   * the block is finalized.
+   */
+  File createRbwFile(Block b) throws IOException {
+    File f = new File(rbwDir, b.getBlockName());
+    File rbwFile = DatanodeUtil.createFileWithExistsCheck(
+        volume, b, f, fileIoProvider);
+    // If any exception during creation, its expected that counter will not be
+    // incremented, So no need to decrement
+    incrNumBlocks();
+    return rbwFile;
+  }
+
+  File addFinalizedBlock(Block b, ReplicaInfo replicaInfo) throws IOException {
+    File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+    fileIoProvider.mkdirsWithExistsCheck(volume, blockDir);
+    File blockFile = FsDatasetImpl.moveBlockFiles(b, replicaInfo, blockDir);
+    File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
+    if (dfsUsage instanceof CachingGetSpaceUsed) {
+      ((CachingGetSpaceUsed) dfsUsage).incDfsUsed(
+          b.getNumBytes() + metaFile.length());
+    }
+    return blockFile;
+  }
+
+  /**
+   * Move a persisted replica from lazypersist directory to a subdirectory
+   * under finalized.
+   */
+  ReplicaInfo activateSavedReplica(ReplicaInfo replicaInfo,
+                                   RamDiskReplica replicaState) throws IOException {
+    File metaFile = replicaState.getSavedMetaFile();
+    File blockFile = replicaState.getSavedBlockFile();
+    final long blockId = replicaInfo.getBlockId();
+    final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
+    final File targetBlockFile = new File(blockDir, blockFile.getName());
+    final File targetMetaFile = new File(blockDir, metaFile.getName());
+    fileIoProvider.moveFile(volume, blockFile, targetBlockFile);
+    FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
+    fileIoProvider.moveFile(volume, metaFile, targetMetaFile);
+    FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
+
+    ReplicaInfo newReplicaInfo =
+        new ReplicaBuilder(ReplicaState.FINALIZED)
+            .setBlockId(blockId)
+            .setLength(replicaInfo.getBytesOnDisk())
+            .setGenerationStamp(replicaInfo.getGenerationStamp())
+            .setFsVolume(replicaState.getLazyPersistVolume())
+            .setDirectoryToUse(targetBlockFile.getParentFile())
+            .build();
+    return newReplicaInfo;
+  }
+
+  void checkDirs() throws DiskErrorException {
+    DiskChecker.checkDir(finalizedDir);
+    DiskChecker.checkDir(tmpDir);
+    DiskChecker.checkDir(rbwDir);
+  }
+
+
+
+  void getVolumeMap(ReplicaMap volumeMap,
+                    final RamDiskReplicaTracker lazyWriteReplicaMap)
+      throws IOException {
+    // Recover lazy persist replicas, they will be added to the volumeMap
+    // when we scan the finalized directory.
+    if (lazypersistDir.exists()) {
+      int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
+      FsDatasetImpl.LOG.info(
+          "Recovered " + numRecovered + " replicas from " + lazypersistDir);
+    }
+
+    boolean  success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
+    if (!success) {
+      List<IOException> exceptions = Collections
+          .synchronizedList(new ArrayList<IOException>());
+      Queue<RecursiveAction> subTaskQueue =
+          new ConcurrentLinkedQueue<RecursiveAction>();
+
+      // add finalized replicas
+      AddReplicaProcessor task = new AddReplicaProcessor(volumeMap,
+          finalizedDir, lazyWriteReplicaMap, true, exceptions, subTaskQueue);
+      ForkJoinTask<Void> finalizedTask = addReplicaThreadPool.submit(task);
+
+      // add rbw replicas
+      task = new AddReplicaProcessor(volumeMap, rbwDir, lazyWriteReplicaMap,
+          false, exceptions, subTaskQueue);
+      ForkJoinTask<Void> rbwTask = addReplicaThreadPool.submit(task);
+
+      try {
+        finalizedTask.get();
+        rbwTask.get();
+      } catch (InterruptedException | ExecutionException e) {
+        exceptions.add(new IOException(
+            "Failed to start sub tasks to add replica in replica map :"
+                + e.getMessage()));
+      }
+
+      //wait for all the tasks to finish.
+      waitForSubTaskToFinish(subTaskQueue, exceptions);
+    }
+  }
+
+  /**
+   * Wait till all the recursive task for add replica to volume completed.
+   *
+   * @param subTaskQueue
+   *          {@link AddReplicaProcessor} tasks list.
+   * @param exceptions
+   *          exceptions occurred in sub tasks.
+   * @throws IOException
+   *           throw if any sub task or multiple sub tasks failed.
+   */
+  private void waitForSubTaskToFinish(Queue<RecursiveAction> subTaskQueue,
+                                      List<IOException> exceptions) throws IOException {
+    while (!subTaskQueue.isEmpty()) {
+      RecursiveAction task = subTaskQueue.poll();
+      if (task != null) {
+        task.join();
+      }
+    }
+    if (!exceptions.isEmpty()) {
+      throw MultipleIOException.createIOException(exceptions);
+    }
+  }
+
+  /**
+   * Recover an unlinked tmp file on datanode restart. If the original block
+   * does not exist, then the tmp file is renamed to be the
+   * original file name and the original name is returned; otherwise the tmp
+   * file is deleted and null is returned.
+   */
+  File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException {
+    File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
+    if (blockFile.exists()) {
+      // If the original block file still exists, then no recovery is needed.
+      if (!fileIoProvider.delete(volume, unlinkedTmp)) {
+        throw new IOException("Unable to cleanup unlinked tmp file " +
+            unlinkedTmp);
+      }
+      return null;
+    } else {
+      fileIoProvider.rename(volume, unlinkedTmp, blockFile);
+      return blockFile;
+    }
+  }
+
+
+  /**
+   * Move replicas in the lazy persist directory to their corresponding locations
+   * in the finalized directory.
+   * @return number of replicas recovered.
+   */
+  private int moveLazyPersistReplicasToFinalized(File source)
+      throws IOException {
+    File[] files = fileIoProvider.listFiles(volume, source);
+    int numRecovered = 0;
+    for (File file : files) {
+      if (file.isDirectory()) {
+        numRecovered += moveLazyPersistReplicasToFinalized(file);
+      }
+
+      if (Block.isMetaFilename(file.getName())) {
+        File metaFile = file;
+        File blockFile = Block.metaToBlockFile(metaFile);
+        long blockId = Block.filename2id(blockFile.getName());
+        File targetDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
+
+        if (blockFile.exists()) {
+
+          try {
+            fileIoProvider.mkdirsWithExistsCheck(volume, targetDir);
+          } catch(IOException ioe) {
+            LOG.warn("Failed to mkdirs " + targetDir);
+            continue;
+          }
+
+          final File targetMetaFile = new File(targetDir, metaFile.getName());
+          try {
+            fileIoProvider.rename(volume, metaFile, targetMetaFile);
+          } catch (IOException e) {
+            LOG.warn("Failed to move meta file from "
+                + metaFile + " to " + targetMetaFile, e);
+            continue;
+          }
+
+          final File targetBlockFile = new File(targetDir, blockFile.getName());
+          try {
+            fileIoProvider.rename(volume, blockFile, targetBlockFile);
+          } catch (IOException e) {
+            LOG.warn("Failed to move block file from "
+                + blockFile + " to " + targetBlockFile, e);
+            continue;
+          }
+
+          if (targetBlockFile.exists() && targetMetaFile.exists()) {
+            ++numRecovered;
+          } else {
+            // Failure should be rare.
+            LOG.warn("Failed to move " + blockFile + " to " + targetDir);
+          }
+        }
+      }
+    }
+
+    fileIoProvider.fullyDelete(volume, source);
+    return numRecovered;
+  }
+
+  private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
+                                       final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized)
+      throws IOException {
+    ReplicaInfo newReplica = null;
+    long blockId = block.getBlockId();
+    long genStamp = block.getGenerationStamp();
+    if (isFinalized) {
+      newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
+          .setBlockId(blockId)
+          .setLength(block.getNumBytes())
+          .setGenerationStamp(genStamp)
+          .setFsVolume(volume)
+          .setDirectoryToUse(DatanodeUtil.idToBlockDir(finalizedDir, blockId))
+          .build();
+    } else {
+      File file = new File(rbwDir, block.getBlockName());
+      boolean loadRwr = true;
+      File restartMeta = new File(file.getParent()  +
+          File.pathSeparator + "." + file.getName() + ".restart");
+      Scanner sc = null;
+      try {
+        sc = new Scanner(restartMeta, "UTF-8");
+        // The restart meta file exists
+        if (sc.hasNextLong() && (sc.nextLong() > timer.now())) {
+          // It didn't expire. Load the replica as a RBW.
+          // We don't know the expected block length, so just use 0
+          // and don't reserve any more space for writes.
+          newReplica = new ReplicaBuilder(ReplicaState.RBW)
+              .setBlockId(blockId)
+              .setLength(validateIntegrityAndSetLength(file, genStamp))
+              .setGenerationStamp(genStamp)
+              .setFsVolume(volume)
+              .setDirectoryToUse(file.getParentFile())
+              .setWriterThread(null)
+              .setBytesToReserve(0)
+              .build();
+          loadRwr = false;
+        }
+        sc.close();
+        if (!fileIoProvider.delete(volume, restartMeta)) {
+          FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
+              restartMeta.getPath());
+        }
+      } catch (FileNotFoundException fnfe) {
+        // nothing to do hereFile dir =
+      } finally {
+        if (sc != null) {
+          sc.close();
+        }
+      }
+      // Restart meta doesn't exist or expired.
+      if (loadRwr) {
+        ReplicaBuilder builder = new ReplicaBuilder(ReplicaState.RWR)
+            .setBlockId(blockId)
+            .setLength(validateIntegrityAndSetLength(file, genStamp))
+            .setGenerationStamp(genStamp)
+            .setFsVolume(volume)
+            .setDirectoryToUse(file.getParentFile());
+        newReplica = builder.build();
+      }
+    }
+
+    ReplicaInfo tmpReplicaInfo = volumeMap.addAndGet(bpid, newReplica);
+    ReplicaInfo oldReplica = (tmpReplicaInfo == newReplica) ? null
+        : tmpReplicaInfo;
+    if (oldReplica != null) {
+      // We have multiple replicas of the same block so decide which one
+      // to keep.
+      newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
+    }
+
+    // If we are retaining a replica on transient storage make sure
+    // it is in the lazyWriteReplicaMap so it can be persisted
+    // eventually.
+    if (newReplica.getVolume().isTransientStorage()) {
+      lazyWriteReplicaMap.addReplica(bpid, blockId,
+          (FsVolumeImpl) newReplica.getVolume(), 0);
+    } else {
+      lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
+    }
+    if (oldReplica == null) {
+      incrNumBlocks();
+    }
+  }
+
+
+  /**
+   * Add replicas under the given directory to the volume map
+   * @param volumeMap the replicas map
+   * @param dir an input directory
+   * @param lazyWriteReplicaMap Map of replicas on transient
+   *                                storage.
+   * @param isFinalized true if the directory has finalized replicas;
+   *                    false if the directory has rbw replicas
+   * @param exceptions list of exception which need to return to parent thread.
+   * @param subTaskQueue queue of sub tasks
+   */
+  void addToReplicasMap(ReplicaMap volumeMap, File dir,
+                        final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized,
+                        List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue)
+      throws IOException {
+    File[] files = fileIoProvider.listFiles(volume, dir);
+    Arrays.sort(files, FILE_COMPARATOR);
+    for (int i = 0; i < files.length; i++) {
+      File file = files[i];
+      if (file.isDirectory()) {
+        // Launch new sub task.
+        AddReplicaProcessor subTask = new AddReplicaProcessor(volumeMap, file,
+            lazyWriteReplicaMap, isFinalized, exceptions, subTaskQueue);
+        subTask.fork();
+        subTaskQueue.add(subTask);
+      }
+
+      if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
+        file = recoverTempUnlinkedBlock(file);
+        if (file == null) { // the original block still exists, so we cover it
+          // in another iteration and can continue here
+          continue;
+        }
+      }
+      if (!Block.isBlockFilename(file)) {
+        continue;
+      }
+
+      long genStamp = FsDatasetUtil.getGenerationStampFromFile(
+          files, file, i);
+      long blockId = Block.filename2id(file.getName());
+      Block block = new Block(blockId, file.length(), genStamp);
+      addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
+          isFinalized);
+    }
+  }
+
+  /**
+   * This method is invoked during DN startup when volumes are scanned to
+   * build up the volumeMap.
+   *
+   * Given two replicas, decide which one to keep. The preference is as
+   * follows:
+   *   1. Prefer the replica with the higher generation stamp.
+   *   2. If generation stamps are equal, prefer the replica with the
+   *      larger on-disk length.
+   *   3. If on-disk length is the same, prefer the replica on persistent
+   *      storage volume.
+   *   4. All other factors being equal, keep replica1.
+   *
+   * The other replica is removed from the volumeMap and is deleted from
+   * its storage volume.
+   *
+   * @param replica1 first replica
+   * @param replica2 second replica
+   * @param volumeMap volume map to update
+   * @return the replica that is retained.
+   */
+  ReplicaInfo resolveDuplicateReplicas(
+      final ReplicaInfo replica1, final ReplicaInfo replica2,
+      final ReplicaMap volumeMap) {
+
+    if (!deleteDuplicateReplicas) {
+      // Leave both block replicas in place.
+      return replica1;
+    }
+    final ReplicaInfo replicaToDelete =
+        selectReplicaToDelete(replica1, replica2);
+    final ReplicaInfo replicaToKeep =
+        (replicaToDelete != replica1) ? replica1 : replica2;
+    // Update volumeMap and delete the replica
+    volumeMap.add(bpid, replicaToKeep);
+    if (replicaToDelete != null) {
+      deleteReplica(replicaToDelete);
+    }
+    return replicaToKeep;
+  }
+
+  @VisibleForTesting
+  static ReplicaInfo selectReplicaToDelete(final ReplicaInfo replica1,
+                                           final ReplicaInfo replica2) {
+    ReplicaInfo replicaToKeep;
+    ReplicaInfo replicaToDelete;
+
+    // it's the same block so don't ever delete it, even if GS or size
+    // differs.  caller should keep the one it just discovered on disk
+    if (replica1.getBlockURI().equals(replica2.getBlockURI())) {
+      return null;
+    }
+    if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) {
+      replicaToKeep = replica1.getGenerationStamp() > replica2.getGenerationStamp()
+          ? replica1 : replica2;
+    } else if (replica1.getNumBytes() != replica2.getNumBytes()) {
+      replicaToKeep = replica1.getNumBytes() > replica2.getNumBytes() ?
+          replica1 : replica2;
+    } else if (replica1.getVolume().isTransientStorage() &&
+        !replica2.getVolume().isTransientStorage()) {
+      replicaToKeep = replica2;
+    } else {
+      replicaToKeep = replica1;
+    }
+
+    replicaToDelete = (replicaToKeep == replica1) ? replica2 : replica1;
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("resolveDuplicateReplicas decide to keep " + replicaToKeep
+          + ".  Will try to delete " + replicaToDelete);
+    }
+    return replicaToDelete;
+  }
+
+  private void deleteReplica(final ReplicaInfo replicaToDelete) {
+    // Delete the files on disk. Failure here is okay.
+    if (!replicaToDelete.deleteBlockData()) {
+      LOG.warn("Failed to delete block file for replica " + replicaToDelete);
+    }
+    if (!replicaToDelete.deleteMetadata()) {
+      LOG.warn("Failed to delete meta file for replica " + replicaToDelete);
+    }
+  }
+
+  /**
+   * Find out the number of bytes in the block that match its crc.
+   *
+   * This algorithm assumes that data corruption caused by unexpected
+   * datanode shutdown occurs only in the last crc chunk. So it checks
+   * only the last chunk.
+   *
+   * @param blockFile the block file
+   * @param genStamp generation stamp of the block
+   * @return the number of valid bytes
+   */
+  private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
+    try {
+      final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp);
+      long blockFileLen = blockFile.length();
+      long metaFileLen = metaFile.length();
+      int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
+      if (!blockFile.exists() || blockFileLen == 0 ||
+          !metaFile.exists() || metaFileLen < crcHeaderLen) {
+        return 0;
+      }
+      try (DataInputStream checksumIn = new DataInputStream(
+          new BufferedInputStream(
+              fileIoProvider.getFileInputStream(volume, metaFile),
+              ioFileBufferSize))) {
+        // read and handle the common header here. For now just a version
+        final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+            checksumIn, metaFile);
+        int bytesPerChecksum = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize();
+        long numChunks = Math.min(
+            (blockFileLen + bytesPerChecksum - 1) / bytesPerChecksum,
+            (metaFileLen - crcHeaderLen) / checksumSize);
+        if (numChunks == 0) {
+          return 0;
+        }
+        try (InputStream blockIn = fileIoProvider.getFileInputStream(
+            volume, blockFile);
+             ReplicaInputStreams ris = new ReplicaInputStreams(blockIn,
+                 checksumIn, volume.obtainReference(), fileIoProvider)) {
+          ris.skipChecksumFully((numChunks - 1) * checksumSize);
+          long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum;
+          ris.skipDataFully(lastChunkStartPos);
+          int lastChunkSize = (int) Math.min(
+              bytesPerChecksum, blockFileLen - lastChunkStartPos);
+          byte[] buf = new byte[lastChunkSize + checksumSize];
+          ris.readChecksumFully(buf, lastChunkSize, checksumSize);
+          ris.readDataFully(buf, 0, lastChunkSize);
+          checksum.update(buf, 0, lastChunkSize);
+          long validFileLength;
+          if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
+            validFileLength = lastChunkStartPos + lastChunkSize;
+          } else { // last chunk is corrupt
+            validFileLength = lastChunkStartPos;
+          }
+          // truncate if extra bytes are present without CRC
+          if (blockFile.length() > validFileLength) {
+            try (RandomAccessFile blockRAF =
+                     fileIoProvider.getRandomAccessFile(
+                         volume, blockFile, "rw")) {
+              // truncate blockFile
+              blockRAF.setLength(validFileLength);
+            }
+          }
+          return validFileLength;
+        }
+      }
+    } catch (IOException e) {
+      FsDatasetImpl.LOG.warn("Getting exception while validating integrity " +
+          "and setting length for blockFile", e);
+      return 0;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return currentDir.getAbsolutePath();
+  }
+
+  void shutdown(BlockListAsLongs blocksListToPersist) {
+    saveReplicas(blocksListToPersist);
+    saveDfsUsed();
+    dfsUsedSaved = true;
+
+    // Remove the shutdown hook to avoid any memory leak
+    if (shutdownHook != null) {
+      ShutdownHookManager.get().removeShutdownHook(shutdownHook);
+    }
+
+    if (dfsUsage instanceof CachingGetSpaceUsed) {
+      IOUtils.cleanupWithLogger(LOG, ((CachingGetSpaceUsed) dfsUsage));
+    }
+  }
+
+  private boolean readReplicasFromCache(ReplicaMap volumeMap,
+                                        final RamDiskReplicaTracker lazyWriteReplicaMap) {
+    ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
+    File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
+    // Check whether the file exists or not.
+    if (!replicaFile.exists()) {
+      LOG.info("Replica Cache file: "+  replicaFile.getPath() +
+          " doesn't exist ");
+      return false;
+    }
+    long fileLastModifiedTime = replicaFile.lastModified();
+    if (System.currentTimeMillis() > fileLastModifiedTime + replicaCacheExpiry) {
+      LOG.info("Replica Cache file: " + replicaFile.getPath() +
+          " has gone stale");
+      // Just to make findbugs happy
+      if (!replicaFile.delete()) {
+        LOG.info("Replica Cache file: " + replicaFile.getPath() +
+            " cannot be deleted");
+      }
+      return false;
+    }
+    FileInputStream inputStream = null;
+    try {
+      inputStream = fileIoProvider.getFileInputStream(volume, replicaFile);
+      BlockListAsLongs blocksList =
+          BlockListAsLongs.readFrom(inputStream, maxDataLength);
+      if (blocksList == null) {
+        return false;
+      }
+
+      for (BlockReportReplica replica : blocksList) {
+        switch (replica.getState()) {
+          case FINALIZED:
+            addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true);
+            break;
+          case RUR:
+          case RBW:
+          case RWR:
+            addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, false);
+            break;
+          default:
+            break;
+        }
+      }
+      // Now it is safe to add the replica into volumeMap
+      // In case of any exception during parsing this cache file, fall back
+      // to scan all the files on disk.
+      for (Iterator<ReplicaInfo> iter =
+           tmpReplicaMap.replicas(bpid).iterator(); iter.hasNext(); ) {
+        ReplicaInfo info = iter.next();
+        // We use a lightweight GSet to store replicaInfo, we need to remove
+        // it from one GSet before adding to another.
+        iter.remove();
+        volumeMap.add(bpid, info);
+      }
+      LOG.info("Successfully read replica from cache file : "
+          + replicaFile.getPath());
+      return true;
+    } catch (Exception e) {
+      // Any exception we need to revert back to read from disk
+      // Log the error and return false
+      LOG.info("Exception occurred while reading the replicas cache file: "
+          + replicaFile.getPath(), e );
+      return false;
+    }
+    finally {
+      // close the inputStream
+      IOUtils.closeStream(inputStream);
+
+      if (!fileIoProvider.delete(volume, replicaFile)) {
+        LOG.info("Failed to delete replica cache file: " +
+            replicaFile.getPath());
+      }
+    }
+  }
+
+  private void saveReplicas(BlockListAsLongs blocksListToPersist) {
+    if (blocksListToPersist == null ||
+        blocksListToPersist.getNumberOfBlocks()== 0) {
+      return;
+    }
+    final File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
+    final File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
+    if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) ||
+        !fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) {
+      return;
+    }
+
+    FileOutputStream out = null;
+    try {
+      out = fileIoProvider.getFileOutputStream(volume, tmpFile);
+      blocksListToPersist.writeTo(out);
+      out.close();
+      // Renaming the tmp file to replicas
+      fileIoProvider.moveFile(volume, tmpFile, replicaCacheFile);
+    } catch (Exception e) {
+      // If write failed, the volume might be bad. Since the cache file is
+      // not critical, log the error, delete both the files (tmp and cache)
+      // and continue.
+      LOG.warn("Failed to write replicas to cache ", e);
+      fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile);
+    } finally {
+      IOUtils.closeStream(out);
+      fileIoProvider.deleteWithExistsCheck(volume, tmpFile);
+    }
+  }
+
+  void incrNumBlocks() {
+    numOfBlocks.incrementAndGet();
+  }
+
+  void decrNumBlocks() {
+    numOfBlocks.decrementAndGet();
+  }
+
+  public long getNumOfBlocks() {
+    return numOfBlocks.get();
+  }
+
+  /**
+   * Recursive action for add replica in map.
+   */
+  class AddReplicaProcessor extends RecursiveAction {
+
+    private ReplicaMap volumeMap;
+    private File dir;
+    private RamDiskReplicaTracker lazyWriteReplicaMap;
+    private boolean isFinalized;
+    private List<IOException> exceptions;
+    private Queue<RecursiveAction> subTaskQueue;
+
+    /**
+     * @param volumeMap
+     *          the replicas map
+     * @param dir
+     *          an input directory
+     * @param lazyWriteReplicaMap
+     *          Map of replicas on transient storage.
+     * @param isFinalized
+     *          true if the directory has finalized replicas; false if the
+     *          directory has rbw replicas
+     * @param exceptions
+     *          List of exception which need to return to parent thread.
+     * @param subTaskQueue
+     *          queue of sub tasks
+     */
+    AddReplicaProcessor(ReplicaMap volumeMap, File dir,
+                        RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized,
+                        List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue) {
+      this.volumeMap = volumeMap;
+      this.dir = dir;
+      this.lazyWriteReplicaMap = lazyWriteReplicaMap;
+      this.isFinalized = isFinalized;
+      this.exceptions = exceptions;
+      this.subTaskQueue = subTaskQueue;
+    }
+
+    @Override
+    protected void compute() {
+      try {
+        addToReplicasMap(volumeMap, dir, lazyWriteReplicaMap, isFinalized,
+            exceptions, subTaskQueue);
+      } catch (IOException e) {
+        LOG.warn("Caught exception while adding replicas from " + volume
+            + " in subtask. Will throw later.", e);
+        exceptions.add(e);
+      }
+    }
+  }
+
+  /**
+   * Return the size of fork pool used for adding replica in map.
+   */
+  @VisibleForTesting
+  public static int getAddReplicaForkPoolSize() {
+    return addReplicaThreadPool.getPoolSize();
+  }
+}