You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/02/14 00:08:04 UTC

[2/3] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
new file mode 100644
index 0000000..781b4d3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * VolumeScanner scans a single volume.  Each VolumeScanner has its own thread.<p/>
+ * They are all managed by the DataNode's BlockScanner.
+ */
+public class VolumeScanner extends Thread {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(VolumeScanner.class);
+
+  /**
+   * Number of seconds in a minute.
+   */
+  private final static int SECONDS_PER_MINUTE = 60;
+
+  /**
+   * Number of minutes in an hour.
+   */
+  private final static int MINUTES_PER_HOUR = 60;
+
+  /**
+   * Name of the block iterator used by this scanner.
+   */
+  private final static String BLOCK_ITERATOR_NAME = "scanner";
+
+  /**
+   * The configuration.
+   */
+  private final Conf conf;
+
+  /**
+   * The DataNode this VolumEscanner is associated with.
+   */
+  private final DataNode datanode;
+
+  /**
+   * A reference to the volume that we're scanning.
+   */
+  private final FsVolumeReference ref;
+
+  /**
+   * The volume that we're scanning.
+   */
+  final FsVolumeSpi volume;
+
+  /**
+   * The number of scanned bytes in each minute of the last hour.<p/>
+   *
+   * This array is managed as a circular buffer.  We take the monotonic time and
+   * divide it up into one-minute periods.  Each entry in the array represents
+   * how many bytes were scanned during that period.
+   */
+  private final long scannedBytes[] = new long[MINUTES_PER_HOUR];
+
+  /**
+   * The sum of all the values of scannedBytes.
+   */
+  private long scannedBytesSum = 0;
+
+  /**
+   * The throttler to use with BlockSender objects.
+   */
+  private final DataTransferThrottler throttler = new DataTransferThrottler(1);
+
+  /**
+   * The null output stream to use with BlockSender objects.
+   */
+  private final DataOutputStream nullStream =
+      new DataOutputStream(new IOUtils.NullOutputStream());
+
+  /**
+   * The block iterators associated with this VolumeScanner.<p/>
+   *
+   * Each block pool has its own BlockIterator.
+   */
+  private final List<BlockIterator> blockIters =
+      new LinkedList<BlockIterator>();
+
+  /**
+   * The current block iterator, or null if there is none.
+   */
+  private BlockIterator curBlockIter = null;
+
+  /**
+   * True if the thread is stopping.<p/>
+   * Protected by this object's lock.
+   */
+  private boolean stopping = false;
+
+  /**
+   * The current minute, in monotonic terms.
+   */
+  private long curMinute = 0;
+
+  /**
+   * Handles scan results.
+   */
+  private final ScanResultHandler resultHandler;
+
+  private final Statistics stats = new Statistics();
+
+  static class Statistics {
+    long bytesScannedInPastHour = 0;
+    long blocksScannedInCurrentPeriod = 0;
+    long blocksScannedSinceRestart = 0;
+    long scansSinceRestart = 0;
+    long scanErrorsSinceRestart = 0;
+    long nextBlockPoolScanStartMs = -1;
+    long blockPoolPeriodEndsMs = -1;
+    ExtendedBlock lastBlockScanned = null;
+    boolean eof = false;
+
+    Statistics() {
+    }
+
+    Statistics(Statistics other) {
+      this.bytesScannedInPastHour = other.bytesScannedInPastHour;
+      this.blocksScannedInCurrentPeriod = other.blocksScannedInCurrentPeriod;
+      this.blocksScannedSinceRestart = other.blocksScannedSinceRestart;
+      this.scansSinceRestart = other.scansSinceRestart;
+      this.scanErrorsSinceRestart = other.scanErrorsSinceRestart;
+      this.nextBlockPoolScanStartMs = other.nextBlockPoolScanStartMs;
+      this.blockPoolPeriodEndsMs = other.blockPoolPeriodEndsMs;
+      this.lastBlockScanned = other.lastBlockScanned;
+      this.eof = other.eof;
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().
+          append("Statistics{").
+          append("bytesScannedInPastHour=").append(bytesScannedInPastHour).
+          append(", blocksScannedInCurrentPeriod=").
+              append(blocksScannedInCurrentPeriod).
+          append(", blocksScannedSinceRestart=").
+              append(blocksScannedSinceRestart).
+          append(", scansSinceRestart=").append(scansSinceRestart).
+          append(", scanErrorsSinceRestart=").append(scanErrorsSinceRestart).
+          append(", nextBlockPoolScanStartMs=").append(nextBlockPoolScanStartMs).
+          append(", blockPoolPeriodEndsMs=").append(blockPoolPeriodEndsMs).
+          append(", lastBlockScanned=").append(lastBlockScanned).
+          append(", eof=").append(eof).
+          append("}").toString();
+    }
+  }
+
+  private static double positiveMsToHours(long ms) {
+    if (ms <= 0) {
+      return 0;
+    } else {
+      return TimeUnit.HOURS.convert(ms, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  public void printStats(StringBuilder p) {
+    p.append("Block scanner information for volume " +
+        volume.getStorageID() + " with base path " + volume.getBasePath() +
+        "%n");
+    synchronized (stats) {
+      p.append(String.format("Bytes verified in last hour       : %57d%n",
+          stats.bytesScannedInPastHour));
+      p.append(String.format("Blocks scanned in current period  : %57d%n",
+          stats.blocksScannedInCurrentPeriod));
+      p.append(String.format("Blocks scanned since restart      : %57d%n",
+          stats.blocksScannedSinceRestart));
+      p.append(String.format("Block pool scans since restart    : %57d%n",
+          stats.scansSinceRestart));
+      p.append(String.format("Block scan errors since restart   : %57d%n",
+          stats.scanErrorsSinceRestart));
+      if (stats.nextBlockPoolScanStartMs > 0) {
+        p.append(String.format("Hours until next block pool scan  : %57.3f%n",
+            positiveMsToHours(stats.nextBlockPoolScanStartMs -
+                Time.monotonicNow())));
+      }
+      if (stats.blockPoolPeriodEndsMs > 0) {
+        p.append(String.format("Hours until possible pool rescan  : %57.3f%n",
+            positiveMsToHours(stats.blockPoolPeriodEndsMs -
+                Time.now())));
+      }
+      p.append(String.format("Last block scanned                : %57s%n",
+          ((stats.lastBlockScanned == null) ? "none" :
+          stats.lastBlockScanned.toString())));
+      p.append(String.format("More blocks to scan in period     : %57s%n",
+          !stats.eof));
+      p.append("%n");
+    }
+  }
+
+  static class ScanResultHandler {
+    private VolumeScanner scanner;
+
+    public void setup(VolumeScanner scanner) {
+      LOG.trace("Starting VolumeScanner {}",
+          scanner.volume.getBasePath());
+      this.scanner = scanner;
+    }
+
+    public void handle(ExtendedBlock block, IOException e) {
+      FsVolumeSpi volume = scanner.volume;
+      if (e == null) {
+        LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath());
+        return;
+      }
+      // If the block does not exist anymore, then it's not an error.
+      if (!volume.getDataset().contains(block)) {
+        LOG.debug("Volume {}: block {} is no longer in the dataset.",
+            volume.getBasePath(), block);
+        return;
+      }
+      // If the block exists, the exception may due to a race with write:
+      // The BlockSender got an old block path in rbw. BlockReceiver removed
+      // the rbw block from rbw to finalized but BlockSender tried to open the
+      // file before BlockReceiver updated the VolumeMap. The state of the
+      // block can be changed again now, so ignore this error here. If there
+      // is a block really deleted by mistake, DirectoryScan should catch it.
+      if (e instanceof FileNotFoundException ) {
+        LOG.info("Volume {}: verification failed for {} because of " +
+                "FileNotFoundException.  This may be due to a race with write.",
+            volume.getBasePath(), block);
+        return;
+      }
+      LOG.warn("Reporting bad {} on {}", block, volume.getBasePath());
+      try {
+        scanner.datanode.reportBadBlocks(block);
+      } catch (IOException ie) {
+        // This is bad, but not bad enough to shut down the scanner.
+        LOG.warn("Cannot report bad " + block.getBlockId(), e);
+      }
+    }
+  }
+
+  VolumeScanner(Conf conf, DataNode datanode, FsVolumeReference ref) {
+    this.conf = conf;
+    this.datanode = datanode;
+    this.ref = ref;
+    this.volume = ref.getVolume();
+    ScanResultHandler handler;
+    try {
+      handler = conf.resultHandler.newInstance();
+    } catch (Throwable e) {
+      LOG.error("unable to instantiate {}", conf.resultHandler, e);
+      handler = new ScanResultHandler();
+    }
+    this.resultHandler = handler;
+    setName("VolumeScannerThread(" + volume.getBasePath() + ")");
+    setDaemon(true);
+  }
+
+  private void saveBlockIterator(BlockIterator iter) {
+    try {
+      iter.save();
+    } catch (IOException e) {
+      LOG.warn("{}: error saving {}.", this, iter, e);
+    }
+  }
+
+  private void expireOldScannedBytesRecords(long monotonicMs) {
+    long newMinute =
+        TimeUnit.MINUTES.convert(monotonicMs, TimeUnit.MILLISECONDS);
+    newMinute = newMinute % MINUTES_PER_HOUR;
+    if (curMinute == newMinute) {
+      return;
+    }
+    // If a minute or more has gone past since we last updated the scannedBytes
+    // array, zero out the slots corresponding to those minutes.
+    for (long m = curMinute + 1; m <= newMinute; m++) {
+      LOG.trace("{}: updateScannedBytes is zeroing out slot {}.  " +
+              "curMinute = {}; newMinute = {}", this, m % MINUTES_PER_HOUR,
+          curMinute, newMinute);
+      scannedBytesSum -= scannedBytes[(int)(m % MINUTES_PER_HOUR)];
+      scannedBytes[(int)(m % MINUTES_PER_HOUR)] = 0;
+    }
+    curMinute = newMinute;
+  }
+
+  /**
+   * Find a usable block iterator.<p/>
+   *
+   * We will consider available block iterators in order.  This property is
+   * important so that we don't keep rescanning the same block pool id over
+   * and over, while other block pools stay unscanned.<p/>
+   *
+   * A block pool is always ready to scan if the iterator is not at EOF.  If
+   * the iterator is at EOF, the block pool will be ready to scan when
+   * conf.scanPeriodMs milliseconds have elapsed since the iterator was last
+   * rewound.<p/>
+   *
+   * @return                     0 if we found a usable block iterator; the
+   *                               length of time we should delay before
+   *                               checking again otherwise.
+   */
+  private synchronized long findNextUsableBlockIter() {
+    int numBlockIters = blockIters.size();
+    if (numBlockIters == 0) {
+      LOG.debug("{}: no block pools are registered.", this);
+      return Long.MAX_VALUE;
+    }
+    int curIdx;
+    if (curBlockIter == null) {
+      curIdx = 0;
+    } else {
+      curIdx = blockIters.indexOf(curBlockIter);
+      Preconditions.checkState(curIdx >= 0);
+    }
+    // Note that this has to be wall-clock time, not monotonic time.  This is
+    // because the time saved in the cursor file is a wall-clock time.  We do
+    // not want to save a monotonic time in the cursor file, because it resets
+    // every time the machine reboots (on most platforms).
+    long nowMs = Time.now();
+    long minTimeoutMs = Long.MAX_VALUE;
+    for (int i = 0; i < numBlockIters; i++) {
+      int idx = (curIdx + i + 1) % numBlockIters;
+      BlockIterator iter = blockIters.get(idx);
+      if (!iter.atEnd()) {
+        LOG.info("Now scanning bpid {} on volume {}",
+            iter.getBlockPoolId(), volume.getBasePath());
+        curBlockIter = iter;
+        return 0L;
+      }
+      long iterStartMs = iter.getIterStartMs();
+      long waitMs = (iterStartMs + conf.scanPeriodMs) - nowMs;
+      if (waitMs <= 0) {
+        iter.rewind();
+        LOG.info("Now rescanning bpid {} on volume {}, after more than " +
+            "{} hour(s)", iter.getBlockPoolId(), volume.getBasePath(),
+            TimeUnit.HOURS.convert(conf.scanPeriodMs, TimeUnit.MILLISECONDS));
+        curBlockIter = iter;
+        return 0L;
+      }
+      minTimeoutMs = Math.min(minTimeoutMs, waitMs);
+    }
+    LOG.info("{}: no suitable block pools found to scan.  Waiting {} ms.",
+        this, minTimeoutMs);
+    return minTimeoutMs;
+  }
+
+  /**
+   * Scan a block.
+   *
+   * @param cblock               The block to scan.
+   * @param bytesPerSec          The bytes per second to scan at.
+   *
+   * @return                     The length of the block that was scanned, or
+   *                               -1 if the block could not be scanned.
+   */
+  private long scanBlock(ExtendedBlock cblock, long bytesPerSec) {
+    // 'cblock' has a valid blockId and block pool id, but we don't yet know the
+    // genstamp the block is supposed to have.  Ask the FsDatasetImpl for this
+    // information.
+    ExtendedBlock block = null;
+    try {
+      Block b = volume.getDataset().getStoredBlock(
+          cblock.getBlockPoolId(), cblock.getBlockId());
+      if (b == null) {
+        LOG.info("FileNotFound while finding block {} on volume {}",
+            cblock, volume.getBasePath());
+      } else {
+        block = new ExtendedBlock(cblock.getBlockPoolId(), b);
+      }
+    } catch (FileNotFoundException e) {
+      LOG.info("FileNotFoundException while finding block {} on volume {}",
+          cblock, volume.getBasePath());
+    } catch (IOException e) {
+      LOG.warn("I/O error while finding block {} on volume {}",
+            cblock, volume.getBasePath());
+    }
+    if (block == null) {
+      return -1; // block not found.
+    }
+    BlockSender blockSender = null;
+    try {
+      blockSender = new BlockSender(block, 0, -1,
+          false, true, true, datanode, null,
+          CachingStrategy.newDropBehind());
+      throttler.setBandwidth(bytesPerSec);
+      long bytesRead = blockSender.sendBlock(nullStream, null, throttler);
+      resultHandler.handle(block, null);
+      return bytesRead;
+    } catch (IOException e) {
+      resultHandler.handle(block, e);
+    } finally {
+      IOUtils.cleanup(null, blockSender);
+    }
+    return -1;
+  }
+
+  @VisibleForTesting
+  static boolean calculateShouldScan(long targetBytesPerSec,
+                                     long scannedBytesSum) {
+    long effectiveBytesPerSec =
+        scannedBytesSum / (SECONDS_PER_MINUTE * MINUTES_PER_HOUR);
+    boolean shouldScan = effectiveBytesPerSec <= targetBytesPerSec;
+    LOG.trace("calculateShouldScan: effectiveBytesPerSec = {}, and " +
+        "targetBytesPerSec = {}.  shouldScan = {}",
+        effectiveBytesPerSec, targetBytesPerSec, shouldScan);
+    return shouldScan;
+  }
+
+  /**
+   * Run an iteration of the VolumeScanner loop.
+   *
+   * @return     The number of milliseconds to delay before running the loop
+   *               again, or 0 to re-run the loop immediately.
+   */
+  private long runLoop() {
+    long bytesScanned = -1;
+    boolean scanError = false;
+    ExtendedBlock block = null;
+    try {
+      long monotonicMs = Time.monotonicNow();
+      expireOldScannedBytesRecords(monotonicMs);
+
+      if (!calculateShouldScan(conf.targetBytesPerSec, scannedBytesSum)) {
+        // If neededBytesPerSec is too low, then wait few seconds for some old
+        // scannedBytes records to expire.
+        return 30000L;
+      }
+
+      // Find a usable block pool to scan.
+      if ((curBlockIter == null) || curBlockIter.atEnd()) {
+        long timeout = findNextUsableBlockIter();
+        if (timeout > 0) {
+          LOG.trace("{}: no block pools are ready to scan yet.  Waiting " +
+              "{} ms.", this, timeout);
+          synchronized (stats) {
+            stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
+          }
+          return timeout;
+        }
+        synchronized (stats) {
+          stats.scansSinceRestart++;
+          stats.blocksScannedInCurrentPeriod = 0;
+          stats.nextBlockPoolScanStartMs = -1;
+        }
+        return 0L;
+      }
+
+      try {
+        block = curBlockIter.nextBlock();
+      } catch (IOException e) {
+        // There was an error listing the next block in the volume.  This is a
+        // serious issue.
+        LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
+        // On the next loop iteration, curBlockIter#eof will be set to true, and
+        // we will pick a different block iterator.
+        return 0L;
+      }
+      if (block == null) {
+        // The BlockIterator is at EOF.
+        LOG.info("{}: finished scanning block pool {}",
+            this, curBlockIter.getBlockPoolId());
+        saveBlockIterator(curBlockIter);
+        return 0;
+      }
+      long saveDelta = monotonicMs - curBlockIter.getLastSavedMs();
+      if (saveDelta >= conf.cursorSaveMs) {
+        LOG.debug("{}: saving block iterator {} after {} ms.",
+            this, curBlockIter, saveDelta);
+        saveBlockIterator(curBlockIter);
+      }
+      bytesScanned = scanBlock(block, conf.targetBytesPerSec);
+      if (bytesScanned >= 0) {
+        scannedBytesSum += bytesScanned;
+        scannedBytes[(int)(curMinute % MINUTES_PER_HOUR)] += bytesScanned;
+      } else {
+        scanError = true;
+      }
+      return 0L;
+    } finally {
+      synchronized (stats) {
+        stats.bytesScannedInPastHour = scannedBytesSum;
+        if (bytesScanned >= 0) {
+          stats.blocksScannedInCurrentPeriod++;
+          stats.blocksScannedSinceRestart++;
+        }
+        if (scanError) {
+          stats.scanErrorsSinceRestart++;
+        }
+        if (block != null) {
+          stats.lastBlockScanned = block;
+        }
+        if (curBlockIter == null) {
+          stats.eof = true;
+          stats.blockPoolPeriodEndsMs = -1;
+        } else {
+          stats.eof = curBlockIter.atEnd();
+          stats.blockPoolPeriodEndsMs =
+              curBlockIter.getIterStartMs() + conf.scanPeriodMs;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      LOG.trace("{}: thread starting.", this);
+      resultHandler.setup(this);
+      try {
+        long timeout = 0;
+        while (true) {
+          // Take the lock to check if we should stop.
+          synchronized (this) {
+            if (stopping) {
+              break;
+            }
+            if (timeout > 0) {
+              wait(timeout);
+              if (stopping) {
+                break;
+              }
+            }
+          }
+          timeout = runLoop();
+        }
+      } catch (InterruptedException e) {
+        // We are exiting because of an InterruptedException,
+        // probably sent by VolumeScanner#shutdown.
+        LOG.trace("{} exiting because of InterruptedException.", this);
+      } catch (Throwable e) {
+        LOG.error("{} exiting because of exception ", this, e);
+      }
+      LOG.info("{} exiting.", this);
+      // Save the current position of all block iterators and close them.
+      for (BlockIterator iter : blockIters) {
+        saveBlockIterator(iter);
+        IOUtils.cleanup(null, iter);
+      }
+    } finally {
+      // When the VolumeScanner exits, release the reference we were holding
+      // on the volume.  This will allow the volume to be removed later.
+      IOUtils.cleanup(null, ref);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "VolumeScanner(" + volume.getBasePath() +
+        ", " + volume.getStorageID() + ")";
+  }
+
+  /**
+   * Shut down this scanner.
+   */
+  public synchronized void shutdown() {
+    stopping = true;
+    notify();
+    this.interrupt();
+  }
+
+  /**
+   * Allow the scanner to scan the given block pool.
+   *
+   * @param bpid       The block pool id.
+   */
+  public synchronized void enableBlockPoolId(String bpid) {
+    for (BlockIterator iter : blockIters) {
+      if (iter.getBlockPoolId().equals(bpid)) {
+        LOG.warn("{}: already enabled scanning on block pool {}", this, bpid);
+        return;
+      }
+    }
+    BlockIterator iter = null;
+    try {
+      // Load a block iterator for the next block pool on the volume.
+      iter = volume.loadBlockIterator(bpid, BLOCK_ITERATOR_NAME);
+      LOG.trace("{}: loaded block iterator for {}.", this, bpid);
+    } catch (FileNotFoundException e) {
+      LOG.debug("{}: failed to load block iterator: " + e.getMessage(), this);
+    } catch (IOException e) {
+      LOG.warn("{}: failed to load block iterator.", this, e);
+    }
+    if (iter == null) {
+      iter = volume.newBlockIterator(bpid, BLOCK_ITERATOR_NAME);
+      LOG.trace("{}: created new block iterator for {}.", this, bpid);
+    }
+    iter.setMaxStalenessMs(conf.maxStalenessMs);
+    blockIters.add(iter);
+    notify();
+  }
+
+  /**
+   * Disallow the scanner from scanning the given block pool.
+   *
+   * @param bpid       The block pool id.
+   */
+  public synchronized void disableBlockPoolId(String bpid) {
+    Iterator<BlockIterator> i = blockIters.iterator();
+    while (i.hasNext()) {
+      BlockIterator iter = i.next();
+      if (iter.getBlockPoolId().equals(bpid)) {
+        LOG.trace("{}: disabling scanning on block pool {}", this, bpid);
+        i.remove();
+        IOUtils.cleanup(null, iter);
+        if (curBlockIter == iter) {
+          curBlockIter = null;
+        }
+        notify();
+        return;
+      }
+    }
+    LOG.warn("{}: can't remove block pool {}, because it was never " +
+        "added.", this, bpid);
+  }
+
+  @VisibleForTesting
+  Statistics getStatistics() {
+    synchronized (stats) {
+      return new Statistics(stats);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index cc7aec5..c554bc39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -90,24 +90,30 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
     }
   }
 
-  /**
-   * Create rolling logs.
-   *
-   * @param prefix the prefix of the log names.
-   * @return rolling logs
-   */
-  public RollingLogs createRollingLogs(String bpid, String prefix
-      ) throws IOException;
-
   /** @return a list of volumes. */
   public List<V> getVolumes();
 
-  /** Add an array of StorageLocation to FsDataset. */
+  /**
+   * Add a new volume to the FsDataset.<p/>
+   *
+   * If the FSDataset supports block scanning, this function registers
+   * the new volume with the block scanner.
+   *
+   * @param location      The storage location for the new volume.
+   * @param nsInfos       Namespace information for the new volume.
+   */
   public void addVolume(
       final StorageLocation location,
       final List<NamespaceInfo> nsInfos) throws IOException;
 
-  /** Removes a collection of volumes from FsDataset. */
+  /**
+   * Removes a collection of volumes from FsDataset.
+   *
+   * If the FSDataset supports block scanning, this function removes
+   * the volumes from the block scanner.
+   *
+   * @param volumes      The storage locations of the volumes to remove.
+   */
   public void removeVolumes(Collection<StorageLocation> volumes);
 
   /** @return a storage with the given storage ID */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index d9c37cb..9b28e67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**
  * This is an interface for the underlying volume.
@@ -69,4 +71,112 @@ public interface FsVolumeSpi {
 
   /** Returns true if the volume is NOT backed by persistent storage. */
   public boolean isTransientStorage();
+
+  /**
+   * BlockIterator will return ExtendedBlock entries from a block pool in
+   * this volume.  The entries will be returned in sorted order.<p/>
+   *
+   * BlockIterator objects themselves do not always have internal
+   * synchronization, so they can only safely be used by a single thread at a
+   * time.<p/>
+   *
+   * Closing the iterator does not save it.  You must call save to save it.
+   */
+  public interface BlockIterator extends Closeable {
+    /**
+     * Get the next block.<p/>
+     *
+     * Note that this block may be removed in between the time we list it,
+     * and the time the caller tries to use it, or it may represent a stale
+     * entry.  Callers should handle the case where the returned block no
+     * longer exists.
+     *
+     * @return               The next block, or null if there are no
+     *                         more blocks.  Null if there was an error
+     *                         determining the next block.
+     *
+     * @throws IOException   If there was an error getting the next block in
+     *                         this volume.  In this case, EOF will be set on
+     *                         the iterator.
+     */
+    public ExtendedBlock nextBlock() throws IOException;
+
+    /**
+     * Returns true if we got to the end of the block pool.
+     */
+    public boolean atEnd();
+
+    /**
+     * Repositions the iterator at the beginning of the block pool.
+     */
+    public void rewind();
+
+    /**
+     * Save this block iterator to the underlying volume.
+     * Any existing saved block iterator with this name will be overwritten.
+     * maxStalenessMs will not be saved.
+     *
+     * @throws IOException   If there was an error when saving the block
+     *                         iterator.
+     */
+    public void save() throws IOException;
+
+    /**
+     * Set the maximum staleness of entries that we will return.<p/>
+     *
+     * A maximum staleness of 0 means we will never return stale entries; a
+     * larger value will allow us to reduce resource consumption in exchange
+     * for returning more potentially stale entries.  Even with staleness set
+     * to 0, consumers of this API must handle race conditions where block
+     * disappear before they can be processed.
+     */
+    public void setMaxStalenessMs(long maxStalenessMs);
+
+    /**
+     * Get the wall-clock time, measured in milliseconds since the Epoch,
+     * when this iterator was created.
+     */
+    public long getIterStartMs();
+
+    /**
+     * Get the wall-clock time, measured in milliseconds since the Epoch,
+     * when this iterator was last saved.  Returns iterStartMs if the
+     * iterator was never saved.
+     */
+    public long getLastSavedMs();
+
+    /**
+     * Get the id of the block pool which this iterator traverses.
+     */
+    public String getBlockPoolId();
+  }
+
+  /**
+   * Create a new block iterator.  It will start at the beginning of the
+   * block set.
+   *
+   * @param bpid             The block pool id to iterate over.
+   * @param name             The name of the block iterator to create.
+   *
+   * @return                 The new block iterator.
+   */
+  public BlockIterator newBlockIterator(String bpid, String name);
+
+  /**
+   * Load a saved block iterator.
+   *
+   * @param bpid             The block pool id to iterate over.
+   * @param name             The name of the block iterator to load.
+   *
+   * @return                 The saved block iterator.
+   * @throws IOException     If there was an IO error loading the saved
+   *                           block iterator.
+   */
+  public BlockIterator loadBlockIterator(String bpid, String name)
+      throws IOException;
+
+  /**
+   * Get the FSDatasetSpi which this volume is a part of.
+   */
+  public FsDatasetSpi getDataset();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
deleted file mode 100644
index 5d54770..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.fsdataset;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Rolling logs consist of a current log and a set of previous logs.
- *
- * The implementation should support a single appender and multiple readers.
- */
-public interface RollingLogs {
-  /**
-   * To iterate the lines of the logs.
-   */
-  public interface LineIterator extends Iterator<String>, Closeable {
-    /** Is the iterator iterating the previous? */
-    public boolean isPrevious();
-
-    /**
-     * Is the last read entry from previous? This should be called after
-     * reading.
-     */
-    public boolean isLastReadFromPrevious();
-  }
-
-  /**
-   * To append text to the logs.
-   */
-  public interface Appender extends Appendable, Closeable {
-  }
-
-  /**
-   * Create an iterator to iterate the lines in the logs.
-   * 
-   * @param skipPrevious Should it skip reading the previous log? 
-   * @return a new iterator.
-   */
-  public LineIterator iterator(boolean skipPrevious) throws IOException;
-
-  /**
-   * @return the only appender to append text to the logs.
-   *   The same object is returned if it is invoked multiple times.
-   */
-  public Appender appender();
-
-  /**
-   * Roll current to previous.
-   *
-   * @return true if the rolling succeeded.
-   *   When it returns false, it is not equivalent to an error. 
-   *   It means that the rolling cannot be performed at the moment,
-   *   e.g. the logs are being read.
-   */
-  public boolean roll() throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index cefb206..082fb9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -70,7 +70,7 @@ import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -92,7 +92,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
@@ -294,7 +293,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
             RoundRobinVolumeChoosingPolicy.class,
             VolumeChoosingPolicy.class), conf);
-    volumes = new FsVolumeList(volsFailed, blockChooserImpl);
+    volumes = new FsVolumeList(volsFailed, datanode.getBlockScanner(),
+        blockChooserImpl);
     asyncDiskService = new FsDatasetAsyncDiskService(datanode);
     asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
 
@@ -326,6 +326,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // storageMap and asyncDiskService, consistent.
     FsVolumeImpl fsVolume = new FsVolumeImpl(
         this, sd.getStorageUuid(), dir, this.conf, storageType);
+    FsVolumeReference ref = fsVolume.obtainReference();
     ReplicaMap tempVolumeMap = new ReplicaMap(this);
     fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
@@ -336,7 +337,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               DatanodeStorage.State.NORMAL,
               storageType));
       asyncDiskService.addVolume(sd.getCurrentDir());
-      volumes.addVolume(fsVolume);
+      volumes.addVolume(ref);
     }
 
     LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
@@ -375,6 +376,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throw MultipleIOException.createIOException(exceptions);
     }
 
+    final FsVolumeReference ref = fsVolume.obtainReference();
     setupAsyncLazyPersistThread(fsVolume);
 
     builder.build();
@@ -385,7 +387,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               DatanodeStorage.State.NORMAL,
               storageType));
       asyncDiskService.addVolume(sd.getCurrentDir());
-      volumes.addVolume(fsVolume);
+      volumes.addVolume(ref);
     }
     LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
   }
@@ -429,9 +431,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               it.remove();
             }
           }
-          // Delete blocks from the block scanner in batch.
-          datanode.getBlockScanner().deleteBlocks(bpid,
-              blocks.toArray(new Block[blocks.size()]));
         }
 
         storageMap.remove(sd.getStorageUuid());
@@ -790,7 +789,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     // Replace the old block if any to reschedule the scanning.
-    datanode.getBlockScanner().addBlock(block, false);
     return replicaInfo;
   }
 
@@ -2025,10 +2023,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // Block is in memory and not on the disk
           // Remove the block from volumeMap
           volumeMap.remove(bpid, blockId);
-          final DataBlockScanner blockScanner = datanode.getBlockScanner();
-          if (blockScanner != null) {
-            blockScanner.deleteBlock(bpid, new Block(blockId));
-          }
           if (vol.isTransientStorage()) {
             ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
           }
@@ -2051,12 +2045,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, 
             diskFile.length(), diskGS, vol, diskFile.getParentFile());
         volumeMap.add(bpid, diskBlockInfo);
-        final DataBlockScanner blockScanner = datanode.getBlockScanner();
-        if (!vol.isTransientStorage()) {
-          if (blockScanner != null) {
-            blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo), false);
-          }
-        } else {
+        if (vol.isTransientStorage()) {
           ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
         }
         LOG.warn("Added missing block to memory " + diskBlockInfo);
@@ -2557,23 +2546,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     dataStorage.clearRollingUpgradeMarker(bpid);
   }
 
-  @Override
-  public RollingLogs createRollingLogs(String bpid, String prefix
-      ) throws IOException {
-    String dir = null;
-    final List<FsVolumeImpl> volumes = getVolumes();
-    for (FsVolumeImpl vol : volumes) {
-      String bpDir = vol.getPath(bpid);
-      if (RollingLogsImpl.isFilePresent(bpDir, prefix)) {
-        dir = bpDir;
-        break;
-      }
-    }
-    if (dir == null) {
-      dir = volumes.get(0).getPath(bpid);
-    }
-    return new RollingLogsImpl(dir, prefix);
-  }
 
   @Override
   public void onCompleteLazyPersist(String bpId, long blockId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index a2d4f2e..e4877be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -17,9 +17,18 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
+import java.io.OutputStreamWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -41,15 +50,24 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.CloseableReferenceCount;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.Time;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The underlying volume used to store replica.
@@ -59,6 +77,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 @InterfaceAudience.Private
 @VisibleForTesting
 public class FsVolumeImpl implements FsVolumeSpi {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(FsVolumeImpl.class);
+
   private final FsDatasetImpl dataset;
   private final String storageID;
   private final StorageType storageType;
@@ -395,6 +416,332 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
   }
 
+  private enum SubdirFilter implements FilenameFilter {
+    INSTANCE;
+
+    @Override
+    public boolean accept(File dir, String name) {
+      return name.startsWith("subdir");
+    }
+  }
+
+  private enum BlockFileFilter implements FilenameFilter {
+    INSTANCE;
+
+    @Override
+    public boolean accept(File dir, String name) {
+      return !name.endsWith(".meta") && name.startsWith("blk_");
+    }
+  }
+
+  @VisibleForTesting
+  public static String nextSorted(List<String> arr, String prev) {
+    int res = 0;
+    if (prev != null) {
+      res = Collections.binarySearch(arr, prev);
+      if (res < 0) {
+        res = -1 - res;
+      } else {
+        res++;
+      }
+    }
+    if (res >= arr.size()) {
+      return null;
+    }
+    return arr.get(res);
+  }
+
+  private static class BlockIteratorState {
+    BlockIteratorState() {
+      lastSavedMs = iterStartMs = Time.now();
+      curFinalizedDir = null;
+      curFinalizedSubDir = null;
+      curEntry = null;
+      atEnd = false;
+    }
+
+    // The wall-clock ms since the epoch at which this iterator was last saved.
+    @JsonProperty
+    private long lastSavedMs;
+
+    // The wall-clock ms since the epoch at which this iterator was created.
+    @JsonProperty
+    private long iterStartMs;
+
+    @JsonProperty
+    private String curFinalizedDir;
+
+    @JsonProperty
+    private String curFinalizedSubDir;
+
+    @JsonProperty
+    private String curEntry;
+
+    @JsonProperty
+    private boolean atEnd;
+  }
+
+  /**
+   * A BlockIterator implementation for FsVolumeImpl.
+   */
+  private class BlockIteratorImpl implements FsVolumeSpi.BlockIterator {
+    private final File bpidDir;
+    private final String name;
+    private final String bpid;
+    private long maxStalenessMs = 0;
+
+    private List<String> cache;
+    private long cacheMs;
+
+    private BlockIteratorState state;
+
+    BlockIteratorImpl(String bpid, String name) {
+      this.bpidDir = new File(currentDir, bpid);
+      this.name = name;
+      this.bpid = bpid;
+      rewind();
+    }
+
+    /**
+     * Get the next subdirectory within the block pool slice.
+     *
+     * @return         The next subdirectory within the block pool slice, or
+     *                   null if there are no more.
+     */
+    private String getNextSubDir(String prev, File dir)
+          throws IOException {
+      List<String> children =
+          IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
+      cache = null;
+      cacheMs = 0;
+      if (children.size() == 0) {
+        LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}",
+            storageID, bpid, dir.getAbsolutePath());
+        return null;
+      }
+      Collections.sort(children);
+      String nextSubDir = nextSorted(children, prev);
+      if (nextSubDir == null) {
+        LOG.trace("getNextSubDir({}, {}): no more subdirectories found in {}",
+            storageID, bpid, dir.getAbsolutePath());
+      } else {
+        LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} " +
+            "within {}", storageID, bpid, nextSubDir, dir.getAbsolutePath());
+      }
+      return nextSubDir;
+    }
+
+    private String getNextFinalizedDir() throws IOException {
+      File dir = Paths.get(
+          bpidDir.getAbsolutePath(), "current", "finalized").toFile();
+      return getNextSubDir(state.curFinalizedDir, dir);
+    }
+
+    private String getNextFinalizedSubDir() throws IOException {
+      if (state.curFinalizedDir == null) {
+        return null;
+      }
+      File dir = Paths.get(
+          bpidDir.getAbsolutePath(), "current", "finalized",
+              state.curFinalizedDir).toFile();
+      return getNextSubDir(state.curFinalizedSubDir, dir);
+    }
+
+    private List<String> getSubdirEntries() throws IOException {
+      if (state.curFinalizedSubDir == null) {
+        return null; // There are no entries in the null subdir.
+      }
+      long now = Time.monotonicNow();
+      if (cache != null) {
+        long delta = now - cacheMs;
+        if (delta < maxStalenessMs) {
+          return cache;
+        } else {
+          LOG.trace("getSubdirEntries({}, {}): purging entries cache for {} " +
+            "after {} ms.", storageID, bpid, state.curFinalizedSubDir, delta);
+          cache = null;
+        }
+      }
+      File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized",
+                    state.curFinalizedDir, state.curFinalizedSubDir).toFile();
+      List<String> entries =
+          IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE);
+      if (entries.size() == 0) {
+        entries = null;
+      } else {
+        Collections.sort(entries);
+      }
+      if (entries == null) {
+        LOG.trace("getSubdirEntries({}, {}): no entries found in {}",
+            storageID, bpid, dir.getAbsolutePath());
+      } else {
+        LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}", 
+            storageID, bpid, entries.size(), dir.getAbsolutePath());
+      }
+      cache = entries;
+      cacheMs = now;
+      return cache;
+    }
+
+    /**
+     * Get the next block.<p/>
+     *
+     * Each volume has a hierarchical structure.<p/>
+     *
+     * <code>
+     * BPID B0
+     *   finalized/
+     *     subdir0
+     *       subdir0
+     *         blk_000
+     *         blk_001
+     *       ...
+     *     subdir1
+     *       subdir0
+     *         ...
+     *   rbw/
+     * </code>
+     *
+     * When we run out of entries at one level of the structure, we search
+     * progressively higher levels.  For example, when we run out of blk_
+     * entries in a subdirectory, we search for the next subdirectory.
+     * And so on.
+     */
+    @Override
+    public ExtendedBlock nextBlock() throws IOException {
+      if (state.atEnd) {
+        return null;
+      }
+      try {
+        while (true) {
+          List<String> entries = getSubdirEntries();
+          if (entries != null) {
+            state.curEntry = nextSorted(entries, state.curEntry);
+            if (state.curEntry == null) {
+              LOG.trace("nextBlock({}, {}): advancing from {} to next " +
+                  "subdirectory.", storageID, bpid, state.curFinalizedSubDir);
+            } else {
+              ExtendedBlock block =
+                  new ExtendedBlock(bpid, Block.filename2id(state.curEntry));
+              LOG.trace("nextBlock({}, {}): advancing to {}",
+                  storageID, bpid, block);
+              return block;
+            }
+          }
+          state.curFinalizedSubDir = getNextFinalizedSubDir();
+          if (state.curFinalizedSubDir == null) {
+            state.curFinalizedDir = getNextFinalizedDir();
+            if (state.curFinalizedDir == null) {
+              state.atEnd = true;
+              return null;
+            }
+          }
+        }
+      } catch (IOException e) {
+        state.atEnd = true;
+        LOG.error("nextBlock({}, {}): I/O error", storageID, bpid, e);
+        throw e;
+      }
+    }
+
+    @Override
+    public boolean atEnd() {
+      return state.atEnd;
+    }
+
+    @Override
+    public void rewind() {
+      cache = null;
+      cacheMs = 0;
+      state = new BlockIteratorState();
+    }
+
+    @Override
+    public void save() throws IOException {
+      state.lastSavedMs = Time.now();
+      boolean success = false;
+      ObjectMapper mapper = new ObjectMapper();
+      try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+                new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) {
+        mapper.writerWithDefaultPrettyPrinter().writeValue(writer, state);
+        success = true;
+      } finally {
+        if (!success) {
+          if (getTempSaveFile().delete()) {
+            LOG.debug("save({}, {}): error deleting temporary file.",
+                storageID, bpid);
+          }
+        }
+      }
+      Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(),
+          StandardCopyOption.ATOMIC_MOVE);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("save({}, {}): saved {}", storageID, bpid,
+            mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state));
+      }
+    }
+
+    public void load() throws IOException {
+      ObjectMapper mapper = new ObjectMapper();
+      File file = getSaveFile();
+      this.state = mapper.reader(BlockIteratorState.class).readValue(file);
+      LOG.trace("load({}, {}): loaded iterator {} from {}: {}", storageID,
+          bpid, name, file.getAbsoluteFile(),
+          mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state));
+    }
+
+    File getSaveFile() {
+      return new File(bpidDir, name + ".cursor");
+    }
+
+    File getTempSaveFile() {
+      return new File(bpidDir, name + ".cursor.tmp");
+    }
+
+    @Override
+    public void setMaxStalenessMs(long maxStalenessMs) {
+      this.maxStalenessMs = maxStalenessMs;
+    }
+
+    @Override
+    public void close() throws IOException {
+      // No action needed for this volume implementation.
+    }
+
+    @Override
+    public long getIterStartMs() {
+      return state.iterStartMs;
+    }
+
+    @Override
+    public long getLastSavedMs() {
+      return state.lastSavedMs;
+    }
+
+    @Override
+    public String getBlockPoolId() {
+      return bpid;
+    }
+  }
+
+  @Override
+  public BlockIterator newBlockIterator(String bpid, String name) {
+    return new BlockIteratorImpl(bpid, name);
+  }
+
+  @Override
+  public BlockIterator loadBlockIterator(String bpid, String name)
+      throws IOException {
+    BlockIteratorImpl iter = new BlockIteratorImpl(bpid, name);
+    iter.load();
+    return iter;
+  }
+
+  @Override
+  public FsDatasetSpi getDataset() {
+    return dataset;
+  }
+
   /**
    * RBW files. They get moved to the finalized block directory when
    * the block is finalized.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index c837593..ae2f5b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 
@@ -42,11 +43,13 @@ class FsVolumeList {
   private Object checkDirsMutex = new Object();
 
   private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
+  private final BlockScanner blockScanner;
   private volatile int numFailedVolumes;
 
-  FsVolumeList(int failedVols,
+  FsVolumeList(int failedVols, BlockScanner blockScanner,
       VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
     this.blockChooser = blockChooser;
+    this.blockScanner = blockScanner;
     this.numFailedVolumes = failedVols;
   }
   
@@ -260,13 +263,14 @@ class FsVolumeList {
 
   /**
    * Dynamically add new volumes to the existing volumes that this DN manages.
-   * @param newVolume the instance of new FsVolumeImpl.
+   *
+   * @param ref       a reference to the new FsVolumeImpl instance.
    */
-  void addVolume(FsVolumeImpl newVolume) {
+  void addVolume(FsVolumeReference ref) {
     while (true) {
       final FsVolumeImpl[] curVolumes = volumes.get();
       final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
-      volumeList.add(newVolume);
+      volumeList.add((FsVolumeImpl)ref.getVolume());
       if (volumes.compareAndSet(curVolumes,
           volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
         break;
@@ -274,12 +278,15 @@ class FsVolumeList {
         if (FsDatasetImpl.LOG.isDebugEnabled()) {
           FsDatasetImpl.LOG.debug(
               "The volume list has been changed concurrently, " +
-                  "retry to remove volume: " + newVolume);
+                  "retry to remove volume: " + ref.getVolume().getStorageID());
         }
       }
     }
-
-    FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
+    if (blockScanner != null) {
+      blockScanner.addVolumeScanner(ref);
+    }
+    FsDatasetImpl.LOG.info("Added new volume: " +
+        ref.getVolume().getStorageID());
   }
 
   /**
@@ -293,6 +300,9 @@ class FsVolumeList {
       if (volumeList.remove(target)) {
         if (volumes.compareAndSet(curVolumes,
             volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+          if (blockScanner != null) {
+            blockScanner.removeVolumeScanner(target);
+          }
           try {
             target.closeAndWait();
           } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
deleted file mode 100644
index 121127d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-
-import com.google.common.base.Charsets;
-
-class RollingLogsImpl implements RollingLogs {
-  private static final String CURR_SUFFIX = ".curr";
-  private static final String PREV_SUFFIX = ".prev";
-
-  static boolean isFilePresent(String dir, String filePrefix) {
-    return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
-           new File(dir, filePrefix + PREV_SUFFIX).exists();
-  }
-
-  private final File curr;
-  private final File prev;
-  private PrintWriter out; //require synchronized access
-
-  private final Appender appender = new Appender() {
-    @Override
-    public Appendable append(CharSequence csq) {
-      synchronized(RollingLogsImpl.this) {
-        if (out == null) {
-          throw new IllegalStateException(RollingLogsImpl.this
-              + " is not yet opened.");
-        }
-        out.print(csq);
-        out.flush();
-      }
-      return this;
-    }
-
-    @Override
-    public Appendable append(char c) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Appendable append(CharSequence csq, int start, int end) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close() {
-      synchronized(RollingLogsImpl.this) {
-        if (out != null) {
-          out.close();
-          out = null;
-        }
-      }
-    }
-  };
-
-
-  private final AtomicInteger numReaders = new AtomicInteger();
-
-  RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
-    curr = new File(dir, filePrefix + CURR_SUFFIX);
-    prev = new File(dir, filePrefix + PREV_SUFFIX);
-    out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
-        curr, true), Charsets.UTF_8));
-  }
-
-  @Override
-  public Reader iterator(boolean skipPrevFile) throws IOException {
-    numReaders.incrementAndGet(); 
-    return new Reader(skipPrevFile);
-  }
-
-  @Override
-  public Appender appender() {
-    return appender;
-  }
-
-  @Override
-  public boolean roll() throws IOException {
-    if (numReaders.get() > 0) {
-      return false;
-    }
-    if (!prev.delete() && prev.exists()) {
-      throw new IOException("Failed to delete " + prev);
-    }
-
-    synchronized(this) {
-      appender.close();
-      final boolean renamed = curr.renameTo(prev);
-      out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
-          curr, true), Charsets.UTF_8));
-      if (!renamed) {
-        throw new IOException("Failed to rename " + curr + " to " + prev);
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return curr.toString();
-  }
-  
-  /**
-   * This is used to read the lines in order.
-   * If the data is not read completely (i.e, untill hasNext() returns
-   * false), it needs to be explicitly 
-   */
-  private class Reader implements RollingLogs.LineIterator {
-    private File file;
-    private File lastReadFile;
-    private BufferedReader reader;
-    private String line;
-    private boolean closed = false;
-    
-    private Reader(boolean skipPrevFile) throws IOException {
-      reader = null;
-      file = skipPrevFile? curr : prev;
-      readNext();        
-    }
-
-    @Override
-    public boolean isPrevious() {
-      return file == prev;
-    }
-
-    @Override
-    public boolean isLastReadFromPrevious() {
-      return lastReadFile == prev;
-    }
-
-    private boolean openFile() throws IOException {
-
-      for(int i=0; i<2; i++) {
-        if (reader != null || i > 0) {
-          // move to next file
-          file = isPrevious()? curr : null;
-        }
-        if (file == null) {
-          return false;
-        }
-        if (file.exists()) {
-          break;
-        }
-      }
-      
-      if (reader != null ) {
-        reader.close();
-        reader = null;
-      }
-      
-      reader = new BufferedReader(new InputStreamReader(new FileInputStream(
-          file), Charsets.UTF_8));
-      return true;
-    }
-    
-    // read next line if possible.
-    private void readNext() throws IOException {
-      line = null;
-      try {
-        if (reader != null && (line = reader.readLine()) != null) {
-          return;
-        }
-        // move to the next file.
-        if (openFile()) {
-          readNext();
-        }
-      } finally {
-        if (!hasNext()) {
-          close();
-        }
-      }
-    }
-    
-    @Override
-    public boolean hasNext() {
-      return line != null;
-    }
-
-    @Override
-    public String next() {
-      String curLine = line;
-      try {
-        lastReadFile = file;
-        readNext();
-      } catch (IOException e) {
-        DataBlockScanner.LOG.warn("Failed to read next line.", e);
-      }
-      return curLine;
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (!closed) {
-        try {
-          if (reader != null) {
-            reader.close();
-          }
-        } finally {
-          file = null;
-          reader = null;
-          closed = true;
-          final int n = numReaders.decrementAndGet();
-          assert(n >= 0);
-        }
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index bfaa33b..bb28f01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -987,6 +987,26 @@
 </property>
 
 <property>
+  <name>dfs.datanode.scan.period.hours</name>
+  <value>0</value>
+  <description>
+        If this is 0 or negative, the DataNode's block scanner will be
+        disabled.  If this is positive, the DataNode will not scan any
+        individual block more than once in the specified scan period.
+  </description>
+</property>
+
+<property>
+  <name>dfs.block.scanner.volume.bytes.per.second</name>
+  <value>1048576</value>
+  <description>
+        If this is 0, the DataNode's block scanner will be disabled.  If this
+        is positive, this is the number of bytes per second that the DataNode's
+        block scanner will try to scan from each volume.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.readahead.bytes</name>
   <value>4193404</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index a14c84c..086f9ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1697,4 +1697,20 @@ public class DFSTestUtil {
     GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
     GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
   }
+
+ /**
+   * Change the length of a block at datanode dnIndex
+   */
+  public static boolean changeReplicaLength(MiniDFSCluster cluster,
+      ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
+    File blockFile = cluster.getBlockFile(dnIndex, blk);
+    if (blockFile != null && blockFile.exists()) {
+      RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+      raFile.setLength(raFile.length()+lenDelta);
+      raFile.close();
+      return true;
+    }
+    LOG.info("failed to change length of block " + blk);
+    return false;
+  }
 }