You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/01/27 19:40:34 UTC

[16/50] [abbrv] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)

HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e62a1a6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e62a1a6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e62a1a6

Branch: refs/heads/YARN-2928
Commit: 6e62a1a6728b1f782f64065424f92b292c3f163a
Parents: a003f71
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Wed Dec 17 11:27:48 2014 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Wed Jan 21 19:00:53 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   2 +
 .../hdfs/server/datanode/BPOfferService.java    |   3 -
 .../hdfs/server/datanode/BPServiceActor.java    |   6 -
 .../server/datanode/BlockPoolSliceScanner.java  | 872 -------------------
 .../hdfs/server/datanode/BlockReceiver.java     |   8 -
 .../hdfs/server/datanode/BlockScanner.java      | 308 +++++++
 .../hdfs/server/datanode/BlockSender.java       |   3 -
 .../hdfs/server/datanode/DataBlockScanner.java  | 339 -------
 .../hadoop/hdfs/server/datanode/DataNode.java   |  73 +-
 .../hdfs/server/datanode/VolumeScanner.java     | 652 ++++++++++++++
 .../server/datanode/fsdataset/FsDatasetSpi.java |  32 +-
 .../server/datanode/fsdataset/FsVolumeSpi.java  | 110 +++
 .../server/datanode/fsdataset/RollingLogs.java  |  73 --
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  44 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   | 347 ++++++++
 .../datanode/fsdataset/impl/FsVolumeList.java   |  24 +-
 .../fsdataset/impl/RollingLogsImpl.java         | 241 -----
 .../src/main/resources/hdfs-default.xml         |  20 +
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  16 +
 .../hadoop/hdfs/TestDatanodeBlockScanner.java   | 551 ------------
 .../org/apache/hadoop/hdfs/TestReplication.java |   3 +-
 .../TestOverReplicatedBlocks.java               |  13 +-
 .../server/datanode/BlockReportTestBase.java    |   7 +-
 .../hdfs/server/datanode/DataNodeTestUtils.java |  24 -
 .../server/datanode/SimulatedFSDataset.java     |  22 +-
 .../hdfs/server/datanode/TestBlockScanner.java  | 680 +++++++++++++++
 .../server/datanode/TestDirectoryScanner.java   |  16 +
 .../TestMultipleNNDataBlockScanner.java         | 245 ------
 .../extdataset/ExternalDatasetImpl.java         |   7 -
 .../extdataset/ExternalRollingLogs.java         |  92 --
 .../datanode/extdataset/ExternalVolumeImpl.java |  17 +
 .../extdataset/TestExternalDataset.java         |   9 -
 .../fsdataset/impl/FsVolumeListTest.java        |  17 +-
 .../fsdataset/impl/TestFsDatasetImpl.java       |  30 +-
 .../impl/TestInterDatanodeProtocol.java         |   4 +-
 .../namenode/snapshot/SnapshotTestHelper.java   |   4 +-
 37 files changed, 2288 insertions(+), 2629 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 25ad33b..866b765 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -140,6 +140,9 @@ Trunk (Unreleased)
     class and constructor to public; and fix FsDatasetSpi to use generic type
     instead of FsVolumeImpl.  (David Powell and Joe Pallas via szetszwo)
 
+    HDFS-7430. Rewrite the BlockScanner to use O(1) memory and use multiple
+    threads (cmccabe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index fb958f1..60581b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -441,6 +441,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
   public static final String  DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
   public static final int     DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
+  public static final String  DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
+  public static final long    DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
   public static final String  DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
   public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
   public static final String  DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 4a54bed..dfeacde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -656,9 +656,6 @@ class BPOfferService {
       //
       Block toDelete[] = bcmd.getBlocks();
       try {
-        if (dn.blockScanner != null) {
-          dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
-        }
         // using global fsdataset
         dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete);
       } catch(IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index e6409ab..e396727 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -736,12 +736,6 @@ class BPServiceActor implements Runnable {
         DatanodeCommand cmd = cacheReport();
         processCommand(new DatanodeCommand[]{ cmd });
 
-        // Now safe to start scanning the block pool.
-        // If it has already been started, this is a no-op.
-        if (dn.blockScanner != null) {
-          dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
-        }
-
         //
         // There is no work to do;  sleep until hearbeat timer elapses, 
         // or work arrives, and then iterate again.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
deleted file mode 100644
index f36fea1..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ /dev/null
@@ -1,872 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.util.GSet;
-import org.apache.hadoop.util.LightWeightGSet;
-import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Scans the block files under a block pool and verifies that the
- * files are not corrupt.
- * This keeps track of blocks and their last verification times.
- * Currently it does not modify the metadata for block.
- */
-
-class BlockPoolSliceScanner {
-  
-  public static final Log LOG = LogFactory.getLog(BlockPoolSliceScanner.class);
-  
-  private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
-
-  private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
-  private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
-  private static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
-
-  private static final String VERIFICATION_PREFIX = "dncp_block_verification.log";
-
-  private final String blockPoolId;
-  private final long scanPeriod;
-  private final AtomicLong lastScanTime = new AtomicLong();
-
-  private final DataNode datanode;
-  private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
-  
-  private final SortedSet<BlockScanInfo> blockInfoSet
-      = new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
-
-  private final SortedSet<BlockScanInfo> newBlockInfoSet =
-      new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
-
-  private final GSet<Block, BlockScanInfo> blockMap
-      = new LightWeightGSet<Block, BlockScanInfo>(
-          LightWeightGSet.computeCapacity(0.5, "BlockMap"));
-  
-  // processedBlocks keeps track of which blocks are scanned
-  // since the last run.
-  private volatile HashMap<Long, Integer> processedBlocks;
-  
-  private long totalScans = 0;
-  private long totalScanErrors = 0;
-  private long totalTransientErrors = 0;
-  private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only
-  
-  private long currentPeriodStart = Time.monotonicNow();
-  private long bytesLeft = 0; // Bytes to scan in this period
-  private long totalBytesToScan = 0;
-  private boolean isNewPeriod = true;
-  private int lastScanTimeDifference = 5*60*1000;
-  
-  private final LogFileHandler verificationLog;
-  
-  private final DataTransferThrottler throttler = new DataTransferThrottler(
-       200, MAX_SCAN_RATE);
-  
-  private static enum ScanType {
-    IMMEDIATE_SCAN,  
-    VERIFICATION_SCAN,     // scanned as part of periodic verfication
-    NONE,
-  }
-
-  // Extend Block because in the DN process there's a 1-to-1 correspondence of
-  // BlockScanInfo to Block instances, so by extending rather than containing
-  // Block, we can save a bit of Object overhead (about 24 bytes per block
-  // replica.)
-  static class BlockScanInfo extends Block
-      implements LightWeightGSet.LinkedElement {
-
-    /** Compare the info by the last scan time. */
-    static final Comparator<BlockScanInfo> LAST_SCAN_TIME_COMPARATOR
-        = new Comparator<BlockPoolSliceScanner.BlockScanInfo>() {
-
-      @Override
-      public int compare(BlockScanInfo left, BlockScanInfo right) {
-        final ScanType leftNextScanType = left.nextScanType;
-        final ScanType rightNextScanType = right.nextScanType;
-        final long l = left.lastScanTime;
-        final long r = right.lastScanTime;
-        // Compare by nextScanType if they are same then compare by 
-        // lastScanTimes
-        // compare blocks itself if scantimes are same to avoid.
-        // because TreeMap uses comparator if available to check existence of
-        // the object. 
-        int compareByNextScanType = leftNextScanType.compareTo(rightNextScanType);
-        return compareByNextScanType < 0? -1: compareByNextScanType > 0? 1:  l < r? -1: l > r? 1: left.compareTo(right); 
-      }
-    };
-
-    long lastScanTime = 0;
-    ScanType lastScanType = ScanType.NONE; 
-    boolean lastScanOk = true;
-    private LinkedElement next;
-    ScanType nextScanType = ScanType.VERIFICATION_SCAN;
-    
-    BlockScanInfo(Block block) {
-      super(block);
-    }
-    
-    @Override
-    public int hashCode() {
-      return super.hashCode();
-    }
-    
-    @Override
-    public boolean equals(Object that) {
-      if (this == that) {
-        return true;
-      }
-      return super.equals(that);
-    }
-    
-    long getLastScanTime() {
-      return (lastScanType == ScanType.NONE) ? 0 : lastScanTime;
-    }
-
-    @Override
-    public void setNext(LinkedElement next) {
-      this.next = next;
-    }
-
-    @Override
-    public LinkedElement getNext() {
-      return next;
-    }
-  }
-  
-  BlockPoolSliceScanner(String bpid, DataNode datanode,
-      FsDatasetSpi<? extends FsVolumeSpi> dataset, Configuration conf) {
-    this.datanode = datanode;
-    this.dataset = dataset;
-    this.blockPoolId  = bpid;
-    
-    long hours = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 
-                             DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT);
-    if (hours <= 0) {
-      hours = DEFAULT_SCAN_PERIOD_HOURS;
-    }
-    this.scanPeriod = hours * 3600 * 1000;
-    LOG.info("Periodic Block Verification Scanner initialized with interval "
-        + hours + " hours for block pool " + bpid);
-
-    // get the list of blocks and arrange them in random order
-    List<FinalizedReplica> arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId);
-    Collections.shuffle(arr);
-    
-    long scanTime = -1;
-    for (Block block : arr) {
-      BlockScanInfo info = new BlockScanInfo( block );
-      info.lastScanTime = scanTime--; 
-      //still keep 'info.lastScanType' to NONE.
-      addBlockInfo(info, false);
-    }
-
-    RollingLogs rollingLogs = null;
-    try {
-       rollingLogs = dataset.createRollingLogs(blockPoolId, VERIFICATION_PREFIX);
-    } catch (IOException e) {
-      LOG.warn("Could not open verfication log. " +
-               "Verification times are not stored.");
-    }
-    verificationLog = rollingLogs == null? null: new LogFileHandler(rollingLogs);
-  }
-  
-  String getBlockPoolId() {
-    return blockPoolId;
-  }
-  
-  private void updateBytesToScan(long len, long lastScanTime) {
-    // len could be negative when a block is deleted.
-    totalBytesToScan += len;
-    if ( lastScanTime < currentPeriodStart ) {
-      bytesLeft += len;
-    }
-    // Should we change throttler bandwidth every time bytesLeft changes?
-    // not really required.
-  }
-
-  /**
-   * Add the BlockScanInfo to sorted set of blockScanInfo
-   * @param info BlockScanInfo to be added
-   * @param isNewBlock true if the block is the new Block, false if
-   *          BlockScanInfo is being updated with new scanTime
-   */
-  private synchronized void addBlockInfo(BlockScanInfo info,
-      boolean isNewBlock) {
-    boolean added = false;
-    if (isNewBlock) {
-      // check whether the block already present
-      boolean exists = blockInfoSet.contains(info);
-      added = !exists && newBlockInfoSet.add(info);
-    } else {
-      added = blockInfoSet.add(info);
-    }
-    blockMap.put(info);
-    
-    if (added) {
-      updateBytesToScan(info.getNumBytes(), info.lastScanTime);
-    }
-  }
-
-  private synchronized void delBlockInfo(BlockScanInfo info) {
-    boolean exists = blockInfoSet.remove(info);
-    if (!exists){
-      exists = newBlockInfoSet.remove(info);
-    }
-    blockMap.remove(info);
-
-    if (exists) {
-      updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
-    }
-  }
-
-  /** Update blockMap by the given LogEntry */
-  private synchronized void updateBlockInfo(LogEntry e) {
-    BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
-    
-    if (info != null && e.verificationTime > 0 && 
-        info.lastScanTime < e.verificationTime) {
-      delBlockInfo(info);
-      if (info.nextScanType != ScanType.IMMEDIATE_SCAN) {
-        info.lastScanTime = e.verificationTime;
-      }
-      info.lastScanType = ScanType.VERIFICATION_SCAN;
-      addBlockInfo(info, false);
-    }
-  }
-
-  private synchronized long getNewBlockScanTime() {
-    /* If there are a lot of blocks, this returns a random time with in 
-     * the scan period. Otherwise something sooner.
-     */
-    long period = Math.min(scanPeriod, 
-                           Math.max(blockMap.size(),1) * 600 * 1000L);
-    int periodInt = Math.abs((int)period);
-    return Time.monotonicNow() - scanPeriod +
-        DFSUtil.getRandom().nextInt(periodInt);
-  }
-
-  /** Adds block to list of blocks 
-   * @param scanNow - true if we want to make that particular block a high 
-   * priority one to scan immediately
-   **/
-  synchronized void addBlock(ExtendedBlock block, boolean scanNow) {
-    BlockScanInfo info = blockMap.get(block.getLocalBlock());
-    long lastScanTime = 0;
-    if (info != null) {
-      lastScanTime = info.lastScanTime;
-    }
-    // If the particular block is scanned in last 5 minutes, the  no need to 
-    // verify that block again
-    if (scanNow && Time.monotonicNow() - lastScanTime < 
-        lastScanTimeDifference) {
-      return;
-    }
-    
-    if ( info != null ) {
-      LOG.warn("Adding an already existing block " + block);
-      delBlockInfo(info);
-    }
-    
-    info = new BlockScanInfo(block.getLocalBlock());    
-    info.lastScanTime = getNewBlockScanTime();
-    if (scanNow) {
-      // Create a new BlockScanInfo object and set the lastScanTime to 0
-      // which will make it the high priority block
-      LOG.info("Adding block for immediate verification " + block);
-      info.nextScanType = ScanType.IMMEDIATE_SCAN;
-    }
-    
-    addBlockInfo(info, true);
-    adjustThrottler();
-  }
-  
-  /** Deletes the block from internal structures */
-  synchronized void deleteBlock(Block block) {
-    BlockScanInfo info = blockMap.get(block);
-    if (info != null) {
-      delBlockInfo(info);
-    }
-  }
-
-  @VisibleForTesting
-  long getTotalScans() {
-    return totalScans;
-  }
-
-  /** @return the last scan time for the block pool. */
-  long getLastScanTime() {
-    return lastScanTime.get();
-  }
-
-  /** @return the last scan time the given block. */
-  synchronized long getLastScanTime(Block block) {
-    BlockScanInfo info = blockMap.get(block);
-    return info == null? 0: info.lastScanTime;
-  }
-
-  /** Deletes blocks from internal structures */
-  void deleteBlocks(Block[] blocks) {
-    for ( Block b : blocks ) {
-      deleteBlock(b);
-    }
-  }
-  
-  private synchronized void updateScanStatus(BlockScanInfo info,
-                                             ScanType type,
-                                             boolean scanOk) {
-    delBlockInfo(info);
-
-    long now = Time.monotonicNow();
-    info.lastScanType = type;
-    info.lastScanTime = now;
-    info.lastScanOk = scanOk;
-    info.nextScanType = ScanType.VERIFICATION_SCAN;
-    addBlockInfo(info, false);
-        
-    // Don't update meta data if the verification failed.
-    if (!scanOk) {
-      return;
-    }
-    
-    if (verificationLog != null) {
-      verificationLog.append(now, info.getGenerationStamp(),
-          info.getBlockId());
-    }
-  }
-  
-  private void handleScanFailure(ExtendedBlock block) {
-    LOG.info("Reporting bad " + block);
-    try {
-      datanode.reportBadBlocks(block);
-    } catch (IOException ie) {
-      // it is bad, but not bad enough to shutdown the scanner
-      LOG.warn("Cannot report bad " + block.getBlockId());
-    }
-  }
-  
-  @VisibleForTesting
-  synchronized void setLastScanTimeDifference(int lastScanTimeDifference) {
-    this.lastScanTimeDifference = lastScanTimeDifference;
-  }
-  
-  static private class LogEntry {
-
-    long blockId = -1;
-    long verificationTime = -1;
-    long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
-    
-    /**
-     * The format consists of single line with multiple entries. each 
-     * entry is in the form : name="value".
-     * This simple text and easily extendable and easily parseable with a
-     * regex.
-     */
-    private static final Pattern entryPattern =
-      Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
-    
-    static String toString(long verificationTime, long genStamp, long blockId,
-        DateFormat dateFormat) {
-      return "\ndate=\"" + dateFormat.format(new Date(verificationTime))
-          + "\"\t time=\"" + verificationTime
-          + "\"\t genstamp=\"" + genStamp
-          + "\"\t id=\"" + blockId + "\"";
-    }
-
-    static LogEntry parseEntry(String line) {
-      LogEntry entry = new LogEntry();
-      
-      Matcher matcher = entryPattern.matcher(line);
-      while (matcher.find()) {
-        String name = matcher.group(1);
-        String value = matcher.group(2);
-        
-        try {
-          if (name.equals("id")) {
-            entry.blockId = Long.parseLong(value);
-          } else if (name.equals("time")) {
-            entry.verificationTime = Long.parseLong(value);
-          } else if (name.equals("genstamp")) {
-            entry.genStamp = Long.parseLong(value);
-          }
-        } catch(NumberFormatException nfe) {
-          LOG.warn("Cannot parse line: " + line, nfe);
-          return null;
-        }
-      }
-      
-      return entry;
-    }
-  }
-  
-  private synchronized void adjustThrottler() {
-    long timeLeft = Math.max(1L,
-        currentPeriodStart + scanPeriod - Time.monotonicNow());
-    long bw = Math.max((bytesLeft * 1000) / timeLeft, MIN_SCAN_RATE);
-    throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
-  }
-  
-  @VisibleForTesting
-  void verifyBlock(ExtendedBlock block) {
-    BlockSender blockSender = null;
-
-    /* In case of failure, attempt to read second time to reduce
-     * transient errors. How do we flush block data from kernel 
-     * buffers before the second read? 
-     */
-    for (int i=0; i<2; i++) {
-      boolean second = (i > 0);
-      
-      try {
-        adjustThrottler();
-        
-        blockSender = new BlockSender(block, 0, -1, false, true, true, 
-            datanode, null, CachingStrategy.newDropBehind());
-
-        DataOutputStream out = 
-                new DataOutputStream(new IOUtils.NullOutputStream());
-        
-        blockSender.sendBlock(out, null, throttler);
-
-        LOG.info((second ? "Second " : "") +
-                 "Verification succeeded for " + block);
-        
-        if ( second ) {
-          totalTransientErrors++;
-        }
-        
-        updateScanStatus((BlockScanInfo)block.getLocalBlock(),
-            ScanType.VERIFICATION_SCAN, true);
-
-        return;
-      } catch (IOException e) {
-        updateScanStatus((BlockScanInfo)block.getLocalBlock(),
-            ScanType.VERIFICATION_SCAN, false);
-
-        // If the block does not exists anymore, then its not an error
-        if (!dataset.contains(block)) {
-          LOG.info(block + " is no longer in the dataset");
-          deleteBlock(block.getLocalBlock());
-          return;
-        }
-
-        // If the block exists, the exception may due to a race with write:
-        // The BlockSender got an old block path in rbw. BlockReceiver removed
-        // the rbw block from rbw to finalized but BlockSender tried to open the
-        // file before BlockReceiver updated the VolumeMap. The state of the
-        // block can be changed again now, so ignore this error here. If there
-        // is a block really deleted by mistake, DirectoryScan should catch it.
-        if (e instanceof FileNotFoundException ) {
-          LOG.info("Verification failed for " + block +
-              " - may be due to race with write");
-          deleteBlock(block.getLocalBlock());
-          return;
-        }
-
-        LOG.warn((second ? "Second " : "First ") + "Verification failed for "
-            + block, e);
-        
-        if (second) {
-          totalScanErrors++;
-          datanode.getMetrics().incrBlockVerificationFailures();
-          handleScanFailure(block);
-          return;
-        } 
-      } finally {
-        IOUtils.closeStream(blockSender);
-        datanode.getMetrics().incrBlocksVerified();
-        totalScans++;
-      }
-    }
-  }
-  
-  private synchronized long getEarliestScanTime() {
-    if (!blockInfoSet.isEmpty()) {
-      return blockInfoSet.first().lastScanTime;
-    }
-    return Long.MAX_VALUE; 
-  }
-  
-  private synchronized boolean isFirstBlockProcessed() {
-    if (!blockInfoSet.isEmpty()) {
-      if (blockInfoSet.first().nextScanType == ScanType.IMMEDIATE_SCAN) {
-        return false;
-      }
-      long blockId = blockInfoSet.first().getBlockId();
-      if ((processedBlocks.get(blockId) != null)
-          && (processedBlocks.get(blockId) == 1)) {
-        return true;
-      }
-    }
-    return false;
-  }
-  
-  // Picks one block and verifies it
-  private void verifyFirstBlock() {
-    BlockScanInfo block = null;
-    synchronized (this) {
-      if (!blockInfoSet.isEmpty()) {
-        block = blockInfoSet.first();
-      }
-    }
-    if ( block != null ) {
-      verifyBlock(new ExtendedBlock(blockPoolId, block));
-      processedBlocks.put(block.getBlockId(), 1);
-    }
-  }
-  
-  // Used for tests only
-  int getBlocksScannedInLastRun() {
-    return totalBlocksScannedInLastRun.get();
-  }
-
-  /**
-   * Reads the current and previous log files (if any) and marks the blocks
-   * processed if they were processed within last scan period. Copies the log
-   * records of recently scanned blocks from previous to current file. 
-   * Returns false if the process was interrupted because the thread is marked 
-   * to exit.
-   */
-  private boolean assignInitialVerificationTimes() {
-    //First updates the last verification times from the log file.
-    if (verificationLog != null) {
-      long now = Time.monotonicNow();
-      RollingLogs.LineIterator logIterator = null;
-      try {
-        logIterator = verificationLog.logs.iterator(false);
-        // update verification times from the verificationLog.
-        while (logIterator.hasNext()) {
-          if (!datanode.shouldRun
-              || datanode.blockScanner.blockScannerThread.isInterrupted()) {
-            return false;
-          }
-          LogEntry entry = LogEntry.parseEntry(logIterator.next());
-          if (entry != null) {
-            updateBlockInfo(entry);
-            if (now - entry.verificationTime < scanPeriod) {
-              BlockScanInfo info = blockMap.get(new Block(entry.blockId, 0,
-                  entry.genStamp));
-              if (info != null) {
-                if (processedBlocks.get(entry.blockId) == null) {
-                  if (isNewPeriod) {
-                    updateBytesLeft(-info.getNumBytes());
-                  }
-                  processedBlocks.put(entry.blockId, 1);
-                }
-                if (logIterator.isLastReadFromPrevious()) {
-                  // write the log entry to current file
-                  // so that the entry is preserved for later runs.
-                  verificationLog.append(entry.verificationTime, entry.genStamp,
-                      entry.blockId);
-                }
-              }
-            }
-          }
-        }
-      } catch (IOException e) {
-        LOG.warn("Failed to read previous verification times.", e);
-      } finally {
-        IOUtils.closeStream(logIterator);
-      }
-      isNewPeriod = false;
-    }
-    
-    
-    /* Before this loop, entries in blockInfoSet that are not
-     * updated above have lastScanTime of <= 0 . Loop until first entry has
-     * lastModificationTime > 0.
-     */    
-    synchronized (this) {
-      final int numBlocks = Math.max(blockMap.size(), 1);
-      // Initially spread the block reads over half of scan period
-      // so that we don't keep scanning the blocks too quickly when restarted.
-      long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
-      long lastScanTime = Time.monotonicNow() - scanPeriod;
-
-      if (!blockInfoSet.isEmpty()) {
-        BlockScanInfo info;
-        while ((info =  blockInfoSet.first()).lastScanTime < 0) {
-          delBlockInfo(info);        
-          info.lastScanTime = lastScanTime;
-          lastScanTime += verifyInterval;
-          addBlockInfo(info, false);
-        }
-      }
-    }
-    
-    return true;
-  }
-  
-  private synchronized void updateBytesLeft(long len) {
-    bytesLeft += len;
-  }
-  
-  private synchronized void startNewPeriod() {
-    LOG.info("Starting a new period : work left in prev period : "
-        + String.format("%.2f%%", totalBytesToScan == 0 ? 0
-            : (bytesLeft * 100.0) / totalBytesToScan));
-
-    // reset the byte counts :
-    bytesLeft = totalBytesToScan;
-    currentPeriodStart = Time.monotonicNow();
-    isNewPeriod = true;
-  }
-  
-  private synchronized boolean workRemainingInCurrentPeriod() {
-    if (bytesLeft <= 0 && Time.monotonicNow() < currentPeriodStart + scanPeriod) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
-                  currentPeriodStart + ", period=" + scanPeriod + ", now=" +
-                  Time.monotonicNow() + " " + blockPoolId);
-      }
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-  void scanBlockPoolSlice() {
-    if (!workRemainingInCurrentPeriod()) {
-      return;
-    }
-
-    // Create a new processedBlocks structure
-    processedBlocks = new HashMap<Long, Integer>();
-    if (!assignInitialVerificationTimes()) {
-      return;
-    }
-    // Start scanning
-    try {
-      scan();
-    } finally {
-      totalBlocksScannedInLastRun.set(processedBlocks.size());
-      lastScanTime.set(Time.monotonicNow());
-    }
-  }
-
-  /**
-   * Shuts down this BlockPoolSliceScanner and releases any internal resources.
-   */
-  void shutdown() {
-    if (verificationLog != null) {
-      verificationLog.close();
-    }
-  }
-  
-  private void scan() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Starting to scan blockpool: " + blockPoolId);
-    }
-    try {
-      adjustThrottler();
-        
-      while (datanode.shouldRun
-          && !datanode.blockScanner.blockScannerThread.isInterrupted()
-          && datanode.isBPServiceAlive(blockPoolId)) {
-        long now = Time.monotonicNow();
-        synchronized (this) {
-          if ( now >= (currentPeriodStart + scanPeriod)) {
-            startNewPeriod();
-          }
-        }
-        if (((now - getEarliestScanTime()) >= scanPeriod)
-            || ((!blockInfoSet.isEmpty()) && !(this.isFirstBlockProcessed()))) {
-          verifyFirstBlock();
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("All remaining blocks were processed recently, "
-                + "so this run is complete");
-          }
-          break;
-        }
-      }
-    } catch (RuntimeException e) {
-      LOG.warn("RuntimeException during BlockPoolScanner.scan()", e);
-      throw e;
-    } finally {
-      rollVerificationLogs();
-      rollNewBlocksInfo();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Done scanning block pool: " + blockPoolId);
-      }
-    }
-  }
-
-  // add new blocks to scan in next iteration
-  private synchronized void rollNewBlocksInfo() {
-    for (BlockScanInfo newBlock : newBlockInfoSet) {
-      blockInfoSet.add(newBlock);
-    }
-    newBlockInfoSet.clear();
-  }
-
-  private synchronized void rollVerificationLogs() {
-    if (verificationLog != null) {
-      try {
-        verificationLog.logs.roll();
-      } catch (IOException ex) {
-        LOG.warn("Received exception: ", ex);
-        verificationLog.close();
-      }
-    }
-  }
-
-  
-  synchronized void printBlockReport(StringBuilder buffer, 
-                                     boolean summaryOnly) {
-    long oneHour = 3600*1000;
-    long oneDay = 24*oneHour;
-    long oneWeek = 7*oneDay;
-    long fourWeeks = 4*oneWeek;
-    
-    int inOneHour = 0;
-    int inOneDay = 0;
-    int inOneWeek = 0;
-    int inFourWeeks = 0;
-    int inScanPeriod = 0;
-    int neverScanned = 0;
-    
-    DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
-    
-    int total = blockInfoSet.size();
-    
-    long now = Time.monotonicNow();
-    
-    Date date = new Date();
-    
-    for(Iterator<BlockScanInfo> it = blockInfoSet.iterator(); it.hasNext();) {
-      BlockScanInfo info = it.next();
-      
-      long scanTime = info.getLastScanTime();
-      long diff = now - scanTime;
-      
-      if (diff <= oneHour) inOneHour++;
-      if (diff <= oneDay) inOneDay++;
-      if (diff <= oneWeek) inOneWeek++;
-      if (diff <= fourWeeks) inFourWeeks++;
-      if (diff <= scanPeriod) inScanPeriod++;      
-      if (scanTime <= 0) neverScanned++;
-      
-      if (!summaryOnly) {
-        date.setTime(scanTime);
-        String scanType = 
-          (info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none"; 
-        buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
-                                    " scan time : " +
-                                    "%-15d %s%n", info,
-                                    (info.lastScanOk ? "ok" : "failed"),
-                                    scanType, scanTime,
-                                    (scanTime <= 0) ? "not yet verified" : 
-                                      dateFormat.format(date)));
-      }
-    }
-    
-    double pctPeriodLeft = (scanPeriod + currentPeriodStart - now)
-                           *100.0/scanPeriod;
-    double pctProgress = (totalBytesToScan == 0) ? 100 :
-                         (totalBytesToScan-bytesLeft)*100.0/totalBytesToScan;
-                         
-    buffer.append(String.format("%nTotal Blocks                 : %6d" +
-                                "%nVerified in last hour        : %6d" +
-                                "%nVerified in last day         : %6d" +
-                                "%nVerified in last week        : %6d" +
-                                "%nVerified in last four weeks  : %6d" +
-                                "%nVerified in SCAN_PERIOD      : %6d" +
-                                "%nNot yet verified             : %6d" +
-                                "%nVerified since restart       : %6d" +
-                                "%nScans since restart          : %6d" +
-                                "%nScan errors since restart    : %6d" +
-                                "%nTransient scan errors        : %6d" +
-                                "%nCurrent scan rate limit KBps : %6d" +
-                                "%nProgress this period         : %6.0f%%" +
-                                "%nTime left in cur period      : %6.2f%%" +
-                                "%n", 
-                                total, inOneHour, inOneDay, inOneWeek,
-                                inFourWeeks, inScanPeriod, neverScanned,
-                                totalScans, totalScans, 
-                                totalScanErrors, totalTransientErrors, 
-                                Math.round(throttler.getBandwidth()/1024.0),
-                                pctProgress, pctPeriodLeft));
-  }
-  
-  /**
-   * This class takes care of log file used to store the last verification
-   * times of the blocks.
-   */
-  private static class LogFileHandler {
-    private final DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
-
-    private final RollingLogs logs;
-
-    private LogFileHandler(RollingLogs logs)  {
-      this.logs = logs;
-    }
-
-    void append(long verificationTime, long genStamp, long blockId) {
-      final String m = LogEntry.toString(verificationTime, genStamp, blockId,
-          dateFormat);
-      try {
-        logs.appender().append(m);
-      } catch (IOException e) {
-        LOG.warn("Failed to append to " + logs + ", m=" + m, e);
-      }
-    }
-
-    void close() {
-      try {
-        logs.appender().close();
-      } catch (IOException e) {
-        LOG.warn("Failed to close the appender of " + logs, e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index df8dd5c..12041a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -193,20 +193,12 @@ class BlockReceiver implements Closeable {
           break;
         case PIPELINE_SETUP_APPEND:
           replicaHandler = datanode.data.append(block, newGs, minBytesRcvd);
-          if (datanode.blockScanner != null) { // remove from block scanner
-            datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
-                block.getLocalBlock());
-          }
           block.setGenerationStamp(newGs);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
           replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
-          if (datanode.blockScanner != null) { // remove from block scanner
-            datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
-                block.getLocalBlock());
-          }
           block.setGenerationStamp(newGs);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
new file mode 100644
index 0000000..7429fff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+@InterfaceAudience.Private
+public class BlockScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockScanner.class);
+
+  /**
+   * The DataNode that this scanner is associated with.
+   */
+  private final DataNode datanode;
+
+  /**
+   * Maps Storage IDs to VolumeScanner objects.
+   */
+  private final TreeMap<String, VolumeScanner> scanners =
+      new TreeMap<String, VolumeScanner>();
+
+  /**
+   * The scanner configuration.
+   */
+  private final Conf conf;
+
+  /**
+   * The cached scanner configuration.
+   */
+  static class Conf {
+    // These are a few internal configuration keys used for unit tests.
+    // They can't be set unless the static boolean allowUnitTestSettings has
+    // been set to true.
+
+    @VisibleForTesting
+    static final String INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS =
+        "internal.dfs.datanode.scan.period.ms.key";
+
+    @VisibleForTesting
+    static final String INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER =
+        "internal.volume.scanner.scan.result.handler";
+
+    @VisibleForTesting
+    static final String INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS =
+        "internal.dfs.block.scanner.max_staleness.ms";
+
+    @VisibleForTesting
+    static final long INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT =
+        TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
+
+    @VisibleForTesting
+    static final String INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS =
+        "dfs.block.scanner.cursor.save.interval.ms";
+
+    @VisibleForTesting
+    static final long
+        INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT =
+            TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
+    static boolean allowUnitTestSettings = false;
+    final long targetBytesPerSec;
+    final long maxStalenessMs;
+    final long scanPeriodMs;
+    final long cursorSaveMs;
+    final Class<? extends ScanResultHandler> resultHandler;
+
+    private static long getUnitTestLong(Configuration conf, String key,
+                                        long defVal) {
+      if (allowUnitTestSettings) {
+        return conf.getLong(key, defVal);
+      } else {
+        return defVal;
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    Conf(Configuration conf) {
+      this.targetBytesPerSec = Math.max(0L, conf.getLong(
+          DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
+          DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT));
+      this.maxStalenessMs = Math.max(0L, getUnitTestLong(conf,
+          INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS,
+          INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT));
+      this.scanPeriodMs = Math.max(0L,
+          getUnitTestLong(conf, INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS,
+              TimeUnit.MILLISECONDS.convert(conf.getLong(
+                  DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
+                  DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT), TimeUnit.HOURS)));
+      this.cursorSaveMs = Math.max(0L, getUnitTestLong(conf,
+          INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS,
+          INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT));
+      if (allowUnitTestSettings) {
+        this.resultHandler = (Class<? extends ScanResultHandler>)
+            conf.getClass(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+                          ScanResultHandler.class);
+      } else {
+        this.resultHandler = ScanResultHandler.class;
+      }
+    }
+  }
+
+  public BlockScanner(DataNode datanode, Configuration conf) {
+    this.datanode = datanode;
+    this.conf = new Conf(conf);
+    if (isEnabled()) {
+      LOG.info("Initialized block scanner with targetBytesPerSec {}",
+          this.conf.targetBytesPerSec);
+    } else {
+      LOG.info("Disabled block scanner.");
+    }
+  }
+
+  /**
+   * Returns true if the block scanner is enabled.<p/>
+   *
+   * If the block scanner is disabled, no volume scanners will be created, and
+   * no threads will start.
+   */
+  public boolean isEnabled() {
+    return (conf.scanPeriodMs) > 0 && (conf.targetBytesPerSec > 0);
+  }
+
+ /**
+  * Set up a scanner for the given block pool and volume.
+  *
+  * @param ref              A reference to the volume.
+  */
+  public synchronized void addVolumeScanner(FsVolumeReference ref) {
+    boolean success = false;
+    try {
+      FsVolumeSpi volume = ref.getVolume();
+      if (!isEnabled()) {
+        LOG.debug("Not adding volume scanner for {}, because the block " +
+            "scanner is disabled.", volume.getBasePath());
+        return;
+      }
+      VolumeScanner scanner = scanners.get(volume.getStorageID());
+      if (scanner != null) {
+        LOG.error("Already have a scanner for volume {}.",
+            volume.getBasePath());
+        return;
+      }
+      LOG.debug("Adding scanner for volume {} (StorageID {})",
+          volume.getBasePath(), volume.getStorageID());
+      scanner = new VolumeScanner(conf, datanode, ref);
+      scanner.start();
+      scanners.put(volume.getStorageID(), scanner);
+      success = true;
+    } finally {
+      if (!success) {
+        // If we didn't create a new VolumeScanner object, we don't
+        // need this reference to the volume.
+        IOUtils.cleanup(null, ref);
+      }
+    }
+  }
+
+  /**
+   * Stops and removes a volume scanner.<p/>
+   *
+   * This function will block until the volume scanner has stopped.
+   *
+   * @param volume           The volume to remove.
+   */
+  public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
+    if (!isEnabled()) {
+      LOG.debug("Not removing volume scanner for {}, because the block " +
+          "scanner is disabled.", volume.getStorageID());
+      return;
+    }
+    VolumeScanner scanner = scanners.get(volume.getStorageID());
+    if (scanner == null) {
+      LOG.warn("No scanner found to remove for volumeId {}",
+          volume.getStorageID());
+      return;
+    }
+    LOG.info("Removing scanner for volume {} (StorageID {})",
+        volume.getBasePath(), volume.getStorageID());
+    scanner.shutdown();
+    scanners.remove(volume.getStorageID());
+    Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
+  }
+
+  /**
+   * Stops and removes all volume scanners.<p/>
+   *
+   * This function will block until all the volume scanners have stopped.
+   */
+  public synchronized void removeAllVolumeScanners() {
+    for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+      entry.getValue().shutdown();
+    }
+    for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+      Uninterruptibles.joinUninterruptibly(entry.getValue(),
+          5, TimeUnit.MINUTES);
+    }
+    scanners.clear();
+  }
+
+  /**
+   * Enable scanning a given block pool id.
+   *
+   * @param bpid        The block pool id to enable scanning for.
+   */
+  synchronized void enableBlockPoolId(String bpid) {
+    Preconditions.checkNotNull(bpid);
+    for (VolumeScanner scanner : scanners.values()) {
+      scanner.enableBlockPoolId(bpid);
+    }
+  }
+
+  /**
+   * Disable scanning a given block pool id.
+   *
+   * @param bpid        The block pool id to disable scanning for.
+   */
+  synchronized void disableBlockPoolId(String bpid) {
+    Preconditions.checkNotNull(bpid);
+    for (VolumeScanner scanner : scanners.values()) {
+      scanner.disableBlockPoolId(bpid);
+    }
+  }
+
+  @VisibleForTesting
+  synchronized VolumeScanner.Statistics getVolumeStats(String volumeId) {
+    VolumeScanner scanner = scanners.get(volumeId);
+    if (scanner == null) {
+      return null;
+    }
+    return scanner.getStatistics();
+  }
+
+  synchronized void printStats(StringBuilder p) {
+    // print out all bpids that we're scanning ?
+    for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+      entry.getValue().printStats(p);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public static class Servlet extends HttpServlet {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void doGet(HttpServletRequest request,
+                      HttpServletResponse response) throws IOException {
+      response.setContentType("text/plain");
+
+      DataNode datanode = (DataNode)
+          getServletContext().getAttribute("datanode");
+      BlockScanner blockScanner = datanode.getBlockScanner();
+
+      StringBuilder buffer = new StringBuilder(8 * 1024);
+      if (!blockScanner.isEnabled()) {
+        LOG.warn("Periodic block scanner is not running");
+        buffer.append("Periodic block scanner is not running. " +
+            "Please check the datanode log if this is unexpected.");
+      } else {
+        buffer.append("Block Scanner Statistics\n\n");
+        blockScanner.printStats(buffer);
+      }
+      String resp = buffer.toString();
+      LOG.trace("Returned Servlet info {}", resp);
+      response.getWriter().write(resp);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 2d312d7..182b366 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -600,9 +600,6 @@ class BlockSender implements java.io.Closeable {
         String ioem = e.getMessage();
         if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
           LOG.error("BlockSender.sendChunks() exception: ", e);
-          //Something might be wrong with the block. Make this block the high 
-          //priority block for verification.
-          datanode.blockScanner.addBlock(block, true);
         }
       }
       throw ioeToSocketException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
deleted file mode 100644
index 450c2b1..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-import java.util.TreeMap;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * DataBlockScanner manages block scanning for all the block pools. For each
- * block pool a {@link BlockPoolSliceScanner} is created which runs in a separate
- * thread to scan the blocks for that block pool. When a {@link BPOfferService}
- * becomes alive or dies, blockPoolScannerMap in this class is updated.
- */
-@InterfaceAudience.Private
-public class DataBlockScanner implements Runnable {
-  public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
-  private final DataNode datanode;
-  private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
-  private final Configuration conf;
-  
-  static final int SLEEP_PERIOD_MS = 5 * 1000;
-
-  /**
-   * Map to find the BlockPoolScanner for a given block pool id. This is updated
-   * when a BPOfferService becomes alive or dies.
-   */
-  private final TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap = 
-    new TreeMap<String, BlockPoolSliceScanner>();
-  Thread blockScannerThread = null;
-  
-  DataBlockScanner(DataNode datanode,
-      FsDatasetSpi<? extends FsVolumeSpi> dataset,
-      Configuration conf) {
-    this.datanode = datanode;
-    this.dataset = dataset;
-    this.conf = conf;
-  }
-  
-  @Override
-  public void run() {
-    String currentBpId = "";
-    boolean firstRun = true;
-    while (datanode.shouldRun && !Thread.interrupted()) {
-      //Sleep everytime except in the first iteration.
-      if (!firstRun) {
-        try {
-          Thread.sleep(SLEEP_PERIOD_MS);
-        } catch (InterruptedException ex) {
-          // Interrupt itself again to set the interrupt status
-          blockScannerThread.interrupt();
-          continue;
-        }
-      } else {
-        firstRun = false;
-      }
-      
-      BlockPoolSliceScanner bpScanner = getNextBPScanner(currentBpId);
-      if (bpScanner == null) {
-        // Possible if thread is interrupted
-        continue;
-      }
-      currentBpId = bpScanner.getBlockPoolId();
-      // If BPOfferService for this pool is not alive, don't process it
-      if (!datanode.isBPServiceAlive(currentBpId)) {
-        LOG.warn("Block Pool " + currentBpId + " is not alive");
-        // Remove in case BP service died abruptly without proper shutdown
-        removeBlockPool(currentBpId);
-        continue;
-      }
-      bpScanner.scanBlockPoolSlice();
-    }
-
-    // Call shutdown for each allocated BlockPoolSliceScanner.
-    for (BlockPoolSliceScanner bpss: blockPoolScannerMap.values()) {
-      bpss.shutdown();
-    }
-  }
-
-  // Wait for at least one block pool to be up
-  private void waitForInit() {
-    while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
-        || (getBlockPoolSetSize() < 1)) {
-      try {
-        Thread.sleep(SLEEP_PERIOD_MS);
-      } catch (InterruptedException e) {
-        blockScannerThread.interrupt();
-        return;
-      }
-    }
-  }
-  
-  /**
-   * Find next block pool id to scan. There should be only one current
-   * verification log file. Find which block pool contains the current
-   * verification log file and that is used as the starting block pool id. If no
-   * current files are found start with first block-pool in the blockPoolSet.
-   * However, if more than one current files are found, the one with latest 
-   * modification time is used to find the next block pool id.
-   */
-  private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
-    
-    String nextBpId = null;
-    while (datanode.shouldRun && !blockScannerThread.isInterrupted()) {
-      waitForInit();
-      synchronized (this) {
-        if (getBlockPoolSetSize() > 0) {          
-          // Find nextBpId by the minimum of the last scan time
-          long lastScanTime = 0;
-          for (String bpid : blockPoolScannerMap.keySet()) {
-            final long t = getBPScanner(bpid).getLastScanTime();
-            if (t != 0L) {
-              if (bpid == null || t < lastScanTime) {
-                lastScanTime =  t;
-                nextBpId = bpid;
-              }
-            }
-          }
-          
-          // nextBpId can still be null if no current log is found,
-          // find nextBpId sequentially.
-          if (nextBpId == null) {
-            nextBpId = blockPoolScannerMap.higherKey(currentBpId);
-            if (nextBpId == null) {
-              nextBpId = blockPoolScannerMap.firstKey();
-            }
-          }
-          if (nextBpId != null) {
-            return getBPScanner(nextBpId);
-          }
-        }
-      }
-      LOG.warn("No block pool is up, going to wait");
-      try {
-        Thread.sleep(5000);
-      } catch (InterruptedException ex) {
-        LOG.warn("Received exception: " + ex);
-        blockScannerThread.interrupt();
-        return null;
-      }
-    }
-    return null;
-  }
-
-  private synchronized int getBlockPoolSetSize() {
-    return blockPoolScannerMap.size();
-  }
-  
-  @VisibleForTesting
-  synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
-    return blockPoolScannerMap.get(bpid);
-  }
-  
-  private synchronized String[] getBpIdList() {
-    return blockPoolScannerMap.keySet().toArray(
-        new String[blockPoolScannerMap.keySet().size()]);
-  }
-  
-  public void addBlock(ExtendedBlock block, boolean scanNow) {
-    BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
-    if (bpScanner != null) {
-      bpScanner.addBlock(block, scanNow);
-    } else {
-      LOG.warn("No block pool scanner found for block pool id: "
-          + block.getBlockPoolId());
-    }
-  }
-  
-  boolean isInitialized(String bpid) {
-    return getBPScanner(bpid) != null;
-  }
-
-  public synchronized void printBlockReport(StringBuilder buffer,
-      boolean summary) {
-    String[] bpIdList = getBpIdList();
-    if (bpIdList == null || bpIdList.length == 0) {
-      buffer.append("Periodic block scanner is not yet initialized. "
-          + "Please check back again after some time.");
-      return;
-    }
-    for (String bpid : bpIdList) {
-      BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
-      buffer.append("\n\nBlock report for block pool: "+bpid + "\n");
-      bpScanner.printBlockReport(buffer, summary);
-      buffer.append("\n");
-    }
-  }
-  
-  public void deleteBlock(String poolId, Block toDelete) {
-    BlockPoolSliceScanner bpScanner = getBPScanner(poolId);
-    if (bpScanner != null) {
-      bpScanner.deleteBlock(toDelete);
-    } else {
-      LOG.warn("No block pool scanner found for block pool id: "
-          + poolId);
-    }
-  }
-
-  public void deleteBlocks(String poolId, Block[] toDelete) {
-    BlockPoolSliceScanner bpScanner = getBPScanner(poolId);
-    if (bpScanner != null) {
-      bpScanner.deleteBlocks(toDelete);
-    } else {
-      LOG.warn("No block pool scanner found for block pool id: "
-          + poolId);
-    }
-  }
-  
-  public void shutdown() {
-    synchronized (this) {
-      if (blockScannerThread != null) {
-        blockScannerThread.interrupt();
-      }
-    }
-
-    // We cannot join within the synchronized block, because it would create a
-    // deadlock situation.  blockScannerThread calls other synchronized methods.
-    if (blockScannerThread != null) {
-      try {
-        blockScannerThread.join();
-      } catch (InterruptedException e) {
-        // shutting down anyway
-      }
-    }
-  }
-
-  public synchronized void addBlockPool(String blockPoolId) {
-    if (blockPoolScannerMap.get(blockPoolId) != null) {
-      return;
-    }
-    BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId,
-        datanode, dataset, conf);
-    blockPoolScannerMap.put(blockPoolId, bpScanner);
-    LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="
-        + blockPoolScannerMap.size());
-  }
-  
-  public synchronized void removeBlockPool(String blockPoolId) {
-    BlockPoolSliceScanner bpss = blockPoolScannerMap.remove(blockPoolId);
-    if (bpss != null) {
-      bpss.shutdown();
-    }
-    LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
-  }
-  
-  @VisibleForTesting
-  long getBlocksScannedInLastRun(String bpid) throws IOException {
-    BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
-    if (bpScanner == null) {
-      throw new IOException("Block Pool: "+bpid+" is not running");
-    } else {
-      return bpScanner.getBlocksScannedInLastRun();
-    }
-  }
-
-  @VisibleForTesting
-  long getTotalScans(String bpid) throws IOException {
-    BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
-    if (bpScanner == null) {
-      throw new IOException("Block Pool: "+bpid+" is not running");
-    } else {
-      return bpScanner.getTotalScans();
-    }
-  }
-
-  @VisibleForTesting
-  public void setLastScanTimeDifference(ExtendedBlock block, int lastScanTimeDifference) {
-    BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
-    if (bpScanner != null) {
-      bpScanner.setLastScanTimeDifference(lastScanTimeDifference);
-    } else {
-      LOG.warn("No block pool scanner found for block pool id: "
-          + block.getBlockPoolId());
-    }
-  }
-  
-  public void start() {
-    blockScannerThread = new Thread(this);
-    blockScannerThread.setDaemon(true);
-    blockScannerThread.start();
-  }
-  
-  @InterfaceAudience.Private
-  public static class Servlet extends HttpServlet {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void doGet(HttpServletRequest request, 
-                      HttpServletResponse response) throws IOException {
-      response.setContentType("text/plain");
-      
-      DataNode datanode = (DataNode) getServletContext().getAttribute("datanode");
-      DataBlockScanner blockScanner = datanode.blockScanner;
-      
-      boolean summary = (request.getParameter("listblocks") == null);
-      
-      StringBuilder buffer = new StringBuilder(8*1024);
-      if (blockScanner == null) {
-        LOG.warn("Periodic block scanner is not running");
-        buffer.append("Periodic block scanner is not running. " +
-                      "Please check the datanode log if this is unexpected.");
-      } else {
-        blockScanner.printBlockReport(buffer, summary);
-      }
-      response.getWriter().write(buffer.toString()); // extra copy!
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 12df9d6..c77bc3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -316,7 +316,7 @@ public class DataNode extends ReconfigurableBase
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   private boolean hasAnyBlockPoolRegistered = false;
   
-  volatile DataBlockScanner blockScanner = null;
+  private final BlockScanner blockScanner;
   private DirectoryScanner directoryScanner = null;
   
   /** Activated plug-ins. */
@@ -365,6 +365,7 @@ public class DataNode extends ReconfigurableBase
     this.usersWithLocalPathAccess = null;
     this.connectToDnViaHostname = false;
     this.getHdfsBlockLocationsEnabled = false;
+    this.blockScanner = new BlockScanner(this, conf);
   }
 
   /**
@@ -375,6 +376,7 @@ public class DataNode extends ReconfigurableBase
            final List<StorageLocation> dataDirs,
            final SecureResources resources) throws IOException {
     super(conf);
+    this.blockScanner = new BlockScanner(this, conf);
     this.lastDiskErrorCheck = 0;
     this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
         DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
@@ -671,7 +673,8 @@ public class DataNode extends ReconfigurableBase
     this.infoServer.setAttribute("datanode", this);
     this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
     this.infoServer.addServlet(null, "/blockScannerReport",
-                               DataBlockScanner.Servlet.class);
+                               BlockScanner.Servlet.class);
+
     this.infoServer.start();
     InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
 
@@ -772,56 +775,12 @@ public class DataNode extends ReconfigurableBase
     // Not a superuser.
     throw new AccessControlException();
   }
-  
-/**
- * Initialize the datanode's periodic scanners:
- *     {@link DataBlockScanner}
- *     {@link DirectoryScanner}
- * They report results on a per-blockpool basis but do their scanning 
- * on a per-Volume basis to minimize competition for disk iops.
- * 
- * @param conf - Configuration has the run intervals and other 
- *               parameters for these periodic scanners
- */
-  private void initPeriodicScanners(Configuration conf) {
-    initDataBlockScanner(conf);
-    initDirectoryScanner(conf);
-  }
-  
+
   private void shutdownPeriodicScanners() {
     shutdownDirectoryScanner();
-    shutdownDataBlockScanner();
-  }
-  
-  /**
-   * See {@link DataBlockScanner}
-   */
-  private synchronized void initDataBlockScanner(Configuration conf) {
-    if (blockScanner != null) {
-      return;
-    }
-    String reason = null;
-    assert data != null;
-    if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
-                    DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
-      reason = "verification is turned off by configuration";
-    } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
-      reason = "verifcation is not supported by SimulatedFSDataset";
-    } 
-    if (reason == null) {
-      blockScanner = new DataBlockScanner(this, data, conf);
-      blockScanner.start();
-    } else {
-      LOG.info("Periodic Block Verification scan disabled because " + reason);
-    }
+    blockScanner.removeAllVolumeScanners();
   }
-  
-  private void shutdownDataBlockScanner() {
-    if (blockScanner != null) {
-      blockScanner.shutdown();
-    }
-  }
-  
+
   /**
    * See {@link DirectoryScanner}
    */
@@ -1250,9 +1209,8 @@ public class DataNode extends ReconfigurableBase
       // registering anywhere. If that's the case, we wouldn't have
       // a block pool id
       String bpId = bpos.getBlockPoolId();
-      if (blockScanner != null) {
-        blockScanner.removeBlockPool(bpId);
-      }
+
+      blockScanner.disableBlockPoolId(bpId);
 
       if (data != null) {
         data.shutdownBlockPool(bpId);
@@ -1296,9 +1254,9 @@ public class DataNode extends ReconfigurableBase
     // failures.
     checkDiskError();
 
-    initPeriodicScanners(conf);
-    
+    initDirectoryScanner(conf);
     data.addBlockPool(nsInfo.getBlockPoolID(), conf);
+    blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
   }
 
   BPOfferService[] getAllBpOs() {
@@ -2168,10 +2126,6 @@ public class DataNode extends ReconfigurableBase
       LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
           + block.getBlockPoolId());
     }
-    FsVolumeSpi volume = getFSDataset().getVolume(block);
-    if (blockScanner != null && !volume.isTransientStorage()) {
-      blockScanner.addBlock(block, false);
-    }
   }
 
   /** Start a single datanode daemon and wait for it to finish.
@@ -2445,8 +2399,9 @@ public class DataNode extends ReconfigurableBase
     return data;
   }
 
+  @VisibleForTesting
   /** @return the block scanner. */
-  public DataBlockScanner getBlockScanner() {
+  public BlockScanner getBlockScanner() {
     return blockScanner;
   }