You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/01/22 04:01:12 UTC

[1/4] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)

Repository: hadoop
Updated Branches:
  refs/heads/trunk a003f71ca -> 6e62a1a67


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
index cffb930..4c703ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@@ -89,7 +89,7 @@ public class SnapshotTestHelper {
     GenericTestUtils.disableLog(LogFactory.getLog(DirectoryScanner.class));
     GenericTestUtils.disableLog(LogFactory.getLog(MetricsSystemImpl.class));
     
-    GenericTestUtils.disableLog(DataBlockScanner.LOG);
+    GenericTestUtils.disableLog(BlockScanner.LOG);
     GenericTestUtils.disableLog(HttpServer2.LOG);
     GenericTestUtils.disableLog(DataNode.LOG);
     GenericTestUtils.disableLog(BlockPoolSliceStorage.LOG);


[2/4] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)

Posted by cm...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
deleted file mode 100644
index 9e78c10..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
+++ /dev/null
@@ -1,551 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.TimeoutException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.log4j.Level;
-import org.junit.Test;
-
-/**
- * This test verifies that block verification occurs on the datanode
- */
-public class TestDatanodeBlockScanner {
-  
-  private static final Log LOG = 
-                 LogFactory.getLog(TestDatanodeBlockScanner.class);
-  
-  private static final long TIMEOUT = 20000; // 20 sec.
-  
-  private static final Pattern pattern =
-             Pattern.compile(".*?(blk_[-]*\\d+).*?scan time\\s*:\\s*(\\d+)");
-  
-  private static final Pattern pattern_blockVerify =
-             Pattern.compile(".*?(SCAN_PERIOD)\\s*:\\s*(\\d+.*?)");
-  
-  static {
-    ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN);
-  }
-  /**
-   * This connects to datanode and fetches block verification data.
-   * It repeats this until the given block has a verification time > newTime.
-   * @param newTime - validation timestamps before newTime are "old", the
-   *            result of previous validations.  This method waits until a "new"
-   *            validation timestamp is obtained.  If no validator runs soon
-   *            enough, the method will time out.
-   * @return - the new validation timestamp
-   * @throws IOException
-   * @throws TimeoutException
-   */
-  private static long waitForVerification(int infoPort, FileSystem fs, 
-                          Path file, int blocksValidated, 
-                          long newTime, long timeout) 
-  throws IOException, TimeoutException {
-    URL url = new URL("http://localhost:" + infoPort +
-                      "/blockScannerReport?listblocks");
-    long lastWarnTime = Time.monotonicNow();
-    if (newTime <= 0) newTime = 1L;
-    long verificationTime = 0;
-    
-    String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
-    long failtime = (timeout <= 0) ? Long.MAX_VALUE 
-        : Time.monotonicNow() + timeout;
-    while (verificationTime < newTime) {
-      if (failtime < Time.monotonicNow()) {
-        throw new TimeoutException("failed to achieve block verification after "
-            + timeout + " msec.  Current verification timestamp = "
-            + verificationTime + ", requested verification time > " 
-            + newTime);
-      }
-      String response = DFSTestUtil.urlGet(url);
-      if(blocksValidated >= 0) {
-        for(Matcher matcher = pattern_blockVerify.matcher(response); matcher.find();) {
-          if (block.equals(matcher.group(1))) {
-            assertEquals(1, blocksValidated);
-            break;
-          }
-        }
-      }
-      for(Matcher matcher = pattern.matcher(response); matcher.find();) {
-        if (block.equals(matcher.group(1))) {
-          verificationTime = Long.parseLong(matcher.group(2));
-          break;
-        }
-      }
-      
-      if (verificationTime < newTime) {
-        long now = Time.monotonicNow();
-        if ((now - lastWarnTime) >= 5*1000) {
-          LOG.info("Waiting for verification of " + block);
-          lastWarnTime = now; 
-        }
-        try {
-          Thread.sleep(500);
-        } catch (InterruptedException ignored) {}
-      }
-    }
-    
-    return verificationTime;
-  }
-
-  @Test
-  public void testDatanodeBlockScanner() throws IOException, TimeoutException {
-    long startTime = Time.monotonicNow();
-    
-    Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
-    cluster.waitActive();
-    
-    FileSystem fs = cluster.getFileSystem();
-    Path file1 = new Path("/tmp/testBlockVerification/file1");
-    Path file2 = new Path("/tmp/testBlockVerification/file2");
-    
-    /*
-     * Write the first file and restart the cluster.
-     */
-    DFSTestUtil.createFile(fs, file1, 10, (short)1, 0);
-    cluster.shutdown();
-
-    cluster = new MiniDFSCluster.Builder(conf)
-                                .numDataNodes(1)
-                                .format(false).build();
-    cluster.waitActive();
-    
-    DFSClient dfsClient =  new DFSClient(new InetSocketAddress("localhost", 
-                                         cluster.getNameNodePort()), conf);
-    fs = cluster.getFileSystem();
-    DatanodeInfo dn = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
-    
-    /*
-     * The cluster restarted. The block should be verified by now.
-     */
-    assertTrue(waitForVerification(dn.getInfoPort(), fs, file1, 1, startTime,
-        TIMEOUT) >= startTime);
-    
-    /*
-     * Create a new file and read the block. The block should be marked 
-     * verified since the client reads the block and verifies checksum. 
-     */
-    DFSTestUtil.createFile(fs, file2, 10, (short)1, 0);
-    IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(), 
-                      conf, true); 
-    assertTrue(waitForVerification(dn.getInfoPort(), fs, file2, 2, startTime,
-        TIMEOUT) >= startTime);
-    
-    cluster.shutdown();
-  }
-
-  @Test
-  public void testBlockCorruptionPolicy() throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
-    Random random = new Random();
-    FileSystem fs = null;
-    int rand = random.nextInt(3);
-
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
-    cluster.waitActive();
-    fs = cluster.getFileSystem();
-    Path file1 = new Path("/tmp/testBlockVerification/file1");
-    DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
-    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
-    
-    DFSTestUtil.waitReplication(fs, file1, (short)3);
-    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-
-    // Corrupt random replica of block 
-    assertTrue(cluster.corruptReplica(rand, block));
-
-    // Restart the datanode hoping the corrupt block to be reported
-    cluster.restartDataNode(rand);
-
-    // We have 2 good replicas and block is not corrupt
-    DFSTestUtil.waitReplication(fs, file1, (short)2);
-    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-  
-    // Corrupt all replicas. Now, block should be marked as corrupt
-    // and we should get all the replicas 
-    assertTrue(cluster.corruptReplica(0, block));
-    assertTrue(cluster.corruptReplica(1, block));
-    assertTrue(cluster.corruptReplica(2, block));
-
-    // Trigger each of the DNs to scan this block immediately.
-    // The block pool scanner doesn't run frequently enough on its own
-    // to notice these, and due to HDFS-1371, the client won't report
-    // bad blocks to the NN when all replicas are bad.
-    for (DataNode dn : cluster.getDataNodes()) {
-      DataNodeTestUtils.runBlockScannerForBlock(dn, block);
-    }
-
-    // We now have the blocks to be marked as corrupt and we get back all
-    // its replicas
-    DFSTestUtil.waitReplication(fs, file1, (short)3);
-    assertTrue(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-    cluster.shutdown();
-  }
-  
-  /**
-   * testBlockCorruptionRecoveryPolicy.
-   * This tests recovery of corrupt replicas, first for one corrupt replica
-   * then for two. The test invokes blockCorruptionRecoveryPolicy which
-   * 1. Creates a block with desired number of replicas
-   * 2. Corrupts the desired number of replicas and restarts the datanodes
-   *    containing the corrupt replica. Additionaly we also read the block
-   *    in case restarting does not report corrupt replicas.
-   *    Restarting or reading from the datanode would trigger reportBadBlocks 
-   *    to namenode.
-   *    NameNode adds it to corruptReplicasMap and neededReplication
-   * 3. Test waits until all corrupt replicas are reported, meanwhile
-   *    Re-replciation brings the block back to healthy state
-   * 4. Test again waits until the block is reported with expected number
-   *    of good replicas.
-   */
-  @Test
-  public void testBlockCorruptionRecoveryPolicy1() throws Exception {
-    // Test recovery of 1 corrupt replica
-    LOG.info("Testing corrupt replica recovery for one corrupt replica");
-    blockCorruptionRecoveryPolicy(4, (short)3, 1);
-  }
-
-  @Test
-  public void testBlockCorruptionRecoveryPolicy2() throws Exception {
-    // Test recovery of 2 corrupt replicas
-    LOG.info("Testing corrupt replica recovery for two corrupt replicas");
-    blockCorruptionRecoveryPolicy(5, (short)3, 2);
-  }
-  
-  private void blockCorruptionRecoveryPolicy(int numDataNodes, 
-                                             short numReplicas,
-                                             int numCorruptReplicas) 
-                                             throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 30L);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5L);
-
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
-    cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
-    Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
-    DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
-    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
-    final int ITERATIONS = 10;
-
-    // Wait until block is replicated to numReplicas
-    DFSTestUtil.waitReplication(fs, file1, numReplicas);
-
-    for (int k = 0; ; k++) {
-      // Corrupt numCorruptReplicas replicas of block 
-      int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
-      for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
-        if (cluster.corruptReplica(i, block)) {
-          corruptReplicasDNIDs[j++] = i;
-          LOG.info("successfully corrupted block " + block + " on node " 
-                   + i + " " + cluster.getDataNodes().get(i).getDisplayName());
-        }
-      }
-      
-      // Restart the datanodes containing corrupt replicas 
-      // so they would be reported to namenode and re-replicated
-      // They MUST be restarted in reverse order from highest to lowest index,
-      // because the act of restarting them removes them from the ArrayList
-      // and causes the indexes of all nodes above them in the list to change.
-      for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
-        LOG.info("restarting node with corrupt replica: position " 
-            + i + " node " + corruptReplicasDNIDs[i] + " " 
-            + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName());
-        cluster.restartDataNode(corruptReplicasDNIDs[i]);
-      }
-
-      // Loop until all corrupt replicas are reported
-      try {
-        DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
-                                        block, numCorruptReplicas);
-      } catch(TimeoutException e) {
-        if (k > ITERATIONS) {
-          throw e;
-        }
-        LOG.info("Timed out waiting for corrupt replicas, trying again, iteration " + k);
-        continue;
-      }
-      break;
-    }
-    
-    // Loop until the block recovers after replication
-    DFSTestUtil.waitReplication(fs, file1, numReplicas);
-    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-
-    // Make sure the corrupt replica is invalidated and removed from
-    // corruptReplicasMap
-    DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
-        block, 0);
-    cluster.shutdown();
-  }
-  
-  /** Test if NameNode handles truncated blocks in block report */
-  @Test
-  public void testTruncatedBlockReport() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    final short REPLICATION_FACTOR = (short)2;
-    final Path fileName = new Path("/file1");
-
-    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3L);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
-
-    long startTime = Time.monotonicNow();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-                                               .numDataNodes(REPLICATION_FACTOR)
-                                               .build();
-    cluster.waitActive();
-    
-    ExtendedBlock block;
-    try {
-      FileSystem fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0);
-      DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
-      block = DFSTestUtil.getFirstBlock(fs, fileName);
-    } finally {
-      cluster.shutdown();
-    }
-
-    // Restart cluster and confirm block is verified on datanode 0,
-    // then truncate it on datanode 0.
-    cluster = new MiniDFSCluster.Builder(conf)
-                                .numDataNodes(REPLICATION_FACTOR)
-                                .format(false)
-                                .build();
-    cluster.waitActive();
-    try {
-      FileSystem fs = cluster.getFileSystem();
-      int infoPort = cluster.getDataNodes().get(0).getInfoPort();
-      assertTrue(waitForVerification(infoPort, fs, fileName, 1, startTime, TIMEOUT) >= startTime);
-      
-      // Truncate replica of block
-      if (!changeReplicaLength(cluster, block, 0, -1)) {
-        throw new IOException(
-            "failed to find or change length of replica on node 0 "
-            + cluster.getDataNodes().get(0).getDisplayName());
-      }      
-    } finally {
-      cluster.shutdown();
-    }
-
-    // Restart the cluster, add a node, and check that the truncated block is 
-    // handled correctly
-    cluster = new MiniDFSCluster.Builder(conf)
-                                .numDataNodes(REPLICATION_FACTOR)
-                                .format(false)
-                                .build();
-    cluster.startDataNodes(conf, 1, true, null, null);
-    cluster.waitActive();  // now we have 3 datanodes
-
-    // Assure the cluster has left safe mode.
-    cluster.waitClusterUp();
-    assertFalse("failed to leave safe mode", 
-        cluster.getNameNode().isInSafeMode());
-
-    try {
-      // wait for truncated block be detected by block scanner,
-      // and the block to be replicated
-      DFSTestUtil.waitReplication(
-          cluster.getFileSystem(), fileName, REPLICATION_FACTOR);
-      
-      // Make sure that truncated block will be deleted
-      waitForBlockDeleted(cluster, block, 0, TIMEOUT);
-    } finally {
-      cluster.shutdown();
-    }
-  }
-  
-  /**
-   * Change the length of a block at datanode dnIndex
-   */
-  static boolean changeReplicaLength(MiniDFSCluster cluster, ExtendedBlock blk,
-      int dnIndex, int lenDelta) throws IOException {
-    File blockFile = cluster.getBlockFile(dnIndex, blk);
-    if (blockFile != null && blockFile.exists()) {
-      RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-      raFile.setLength(raFile.length()+lenDelta);
-      raFile.close();
-      return true;
-    }
-    LOG.info("failed to change length of block " + blk);
-    return false;
-  }
-  
-  private static void waitForBlockDeleted(MiniDFSCluster cluster,
-      ExtendedBlock blk, int dnIndex, long timeout) throws TimeoutException,
-      InterruptedException {
-    File blockFile = cluster.getBlockFile(dnIndex, blk);
-    long failtime = Time.monotonicNow()
-                    + ((timeout > 0) ? timeout : Long.MAX_VALUE);
-    while (blockFile != null && blockFile.exists()) {
-      if (failtime < Time.monotonicNow()) {
-        throw new TimeoutException("waited too long for blocks to be deleted: "
-            + blockFile.getPath() + (blockFile.exists() ? " still exists; " : " is absent; "));
-      }
-      Thread.sleep(100);
-      blockFile = cluster.getBlockFile(dnIndex, blk);
-    }
-  }
-  
-  private static final String BASE_PATH = (new File("/data/current/finalized"))
-      .getAbsolutePath();
-  
-  @Test
-  public void testReplicaInfoParsing() throws Exception {
-    testReplicaInfoParsingSingle(BASE_PATH);
-    testReplicaInfoParsingSingle(BASE_PATH + "/subdir1");
-    testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3");
-  }
-  
-  private static void testReplicaInfoParsingSingle(String subDirPath) {
-    File testFile = new File(subDirPath);
-    assertEquals(BASE_PATH, ReplicaInfo.parseBaseDir(testFile).baseDirPath);
-  }
-
-  @Test
-  public void testDuplicateScans() throws Exception {
-    long startTime = Time.monotonicNow();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
-        .numDataNodes(1).build();
-    FileSystem fs = null;
-    try {
-      fs = cluster.getFileSystem();
-      DataNode dataNode = cluster.getDataNodes().get(0);
-      int infoPort = dataNode.getInfoPort();
-      long scanTimeBefore = 0, scanTimeAfter = 0;
-      for (int i = 1; i < 10; i++) {
-        Path fileName = new Path("/test" + i);
-        DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
-        waitForVerification(infoPort, fs, fileName, i, startTime, TIMEOUT);
-        if (i > 1) {
-          scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode,
-              DFSTestUtil.getFirstBlock(fs, new Path("/test" + (i - 1))));
-          assertFalse("scan time shoud not be 0", scanTimeAfter == 0);
-          assertEquals("There should not be duplicate scan", scanTimeBefore,
-              scanTimeAfter);
-        }
-
-        scanTimeBefore = DataNodeTestUtils.getLatestScanTime(dataNode,
-            DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
-      }
-      cluster.restartDataNode(0);
-      Thread.sleep(10000);
-      dataNode = cluster.getDataNodes().get(0);
-      scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode,
-          DFSTestUtil.getFirstBlock(fs, new Path("/test" + (9))));
-      assertEquals("There should not be duplicate scan", scanTimeBefore,
-          scanTimeAfter);
-    } finally {
-      IOUtils.closeStream(fs);
-      cluster.shutdown();
-    }
-  }
-  
-/**
- * This test verifies whether block is added to the first location of 
- * BlockPoolSliceScanner#blockInfoSet
- */
-  @Test
-  public void testAddBlockInfoToFirstLocation() throws Exception {
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
-        .numDataNodes(1).build();
-    FileSystem fs = null;
-    try {
-      fs = cluster.getFileSystem();
-      DataNode dataNode = cluster.getDataNodes().get(0);
-      // Creating a bunch of blocks
-      for (int i = 1; i < 10; i++) {
-        Path fileName = new Path("/test" + i);
-        DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
-      } 
-      // Get block of the first file created (file1)
-      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/test1"));
-      dataNode.getBlockScanner().setLastScanTimeDifference(block, 0);
-      // Let it sleep for more than 5 seconds so that BlockPoolSliceScanner can
-      // scan the first set of blocks
-      Thread.sleep(10000);
-      Long scanTime1Fortest1Block = DataNodeTestUtils.getLatestScanTime(
-          dataNode, block);
-      // Create another set of blocks
-      for (int i = 10; i < 20; i++) {
-        Path fileName = new Path("/test" + i);
-        DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
-      }
-      dataNode.getBlockScanner().addBlock(block, true);
-      // Sleep so that BlockPoolSliceScanner can scan the second set of blocks
-      // and one block which we scheduled to rescan
-      Thread.sleep(10000);
-      // Get the lastScanTime of all of the second set of blocks
-      Set<Long> lastScanTimeSet = new HashSet<Long>();
-      for (int i = 10; i < 20; i++) {
-        long lastScanTime = DataNodeTestUtils.getLatestScanTime(dataNode,
-            DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
-        lastScanTimeSet.add(lastScanTime);
-      }
-      Long scanTime2Fortest1Block = DataNodeTestUtils.getLatestScanTime(
-          dataNode, DFSTestUtil.getFirstBlock(fs, new Path("/test1")));
-      Long minimumLastScanTime = Collections.min(lastScanTimeSet);
-      assertTrue("The second scanTime for test1 block should be greater than "
-         + "first scanTime", scanTime2Fortest1Block > scanTime1Fortest1Block);
-      assertTrue("The second scanTime for test1 block should be less than or"
-         + " equal to minimum of the lastScanTime of second set of blocks",
-          scanTime2Fortest1Block <= minimumLastScanTime);
-    } finally {
-      IOUtils.closeStream(fs);
-      cluster.shutdown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
index b88b5c2..d116f82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
@@ -444,8 +444,7 @@ public class TestReplication {
 
     // Change the length of a replica
     for (int i=0; i<cluster.getDataNodes().size(); i++) {
-      if (TestDatanodeBlockScanner.changeReplicaLength(cluster, block, i,
-          lenDelta)) {
+      if (DFSTestUtil.changeReplicaLength(cluster, block, i, lenDelta)) {
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
index f8f476d..2942d0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -54,6 +53,7 @@ public class TestOverReplicatedBlocks {
   @Test
   public void testProcesOverReplicateBlock() throws Exception {
     Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
     conf.set(
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
@@ -71,13 +71,14 @@ public class TestOverReplicatedBlocks {
       assertTrue(cluster.corruptReplica(0, block));
       DataNodeProperties dnProps = cluster.stopDataNode(0);
       // remove block scanner log to trigger block scanning
-      File scanLog = new File(MiniDFSCluster.getFinalizedDir(
+      File scanCursor = new File(new File(MiniDFSCluster.getFinalizedDir(
           cluster.getInstanceStorageDir(0, 0),
-          cluster.getNamesystem().getBlockPoolId()).getParent().toString()
-          + "/../dncp_block_verification.log.prev");
+          cluster.getNamesystem().getBlockPoolId()).getParent()).getParent(),
+          "scanner.cursor");
       //wait for one minute for deletion to succeed;
-      for(int i=0; !scanLog.delete(); i++) {
-        assertTrue("Could not delete log file in one minute", i < 60);
+      for(int i = 0; !scanCursor.delete(); i++) {
+        assertTrue("Could not delete " + scanCursor.getAbsolutePath() +
+            " in one minute", i < 60);
         try {
           Thread.sleep(1000);
         } catch (InterruptedException ignored) {}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index e9557da..68c66a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
@@ -82,8 +83,8 @@ public abstract class BlockReportTestBase {
 
   private static short REPL_FACTOR = 1;
   private static final int RAND_LIMIT = 2000;
-  private static final long DN_RESCAN_INTERVAL = 5000;
-  private static final long DN_RESCAN_EXTRA_WAIT = 2 * DN_RESCAN_INTERVAL;
+  private static final long DN_RESCAN_INTERVAL = 1;
+  private static final long DN_RESCAN_EXTRA_WAIT = 3 * DN_RESCAN_INTERVAL;
   private static final int DN_N0 = 0;
   private static final int FILE_START = 0;
 
@@ -294,7 +295,7 @@ public abstract class BlockReportTestBase {
       }
     }
 
-    waitTil(DN_RESCAN_EXTRA_WAIT);
+    waitTil(TimeUnit.SECONDS.toMillis(DN_RESCAN_EXTRA_WAIT));
 
     // all blocks belong to the same file, hence same BP
     String poolId = cluster.getNamesystem().getBlockPoolId();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
index f50afd4..fd51e52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
@@ -113,30 +113,6 @@ public class DataNodeTestUtils {
     return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf,
         dn.getDnConf().socketTimeout, dn.getDnConf().connectToDnViaHostname);
   }
-  
-  public static void runBlockScannerForBlock(DataNode dn, ExtendedBlock b) {
-    BlockPoolSliceScanner bpScanner = getBlockPoolScanner(dn, b);
-    bpScanner.verifyBlock(new ExtendedBlock(b.getBlockPoolId(),
-        new BlockPoolSliceScanner.BlockScanInfo(b.getLocalBlock())));
-  }
-
-  private static BlockPoolSliceScanner getBlockPoolScanner(DataNode dn,
-      ExtendedBlock b) {
-    DataBlockScanner scanner = dn.getBlockScanner();
-    BlockPoolSliceScanner bpScanner = scanner.getBPScanner(b.getBlockPoolId());
-    return bpScanner;
-  }
-
-  public static long getLatestScanTime(DataNode dn, ExtendedBlock b) {
-    BlockPoolSliceScanner scanner = getBlockPoolScanner(dn, b);
-    return scanner.getLastScanTime(b.getLocalBlock());
-  }
-
-  public static void shutdownBlockScanner(DataNode dn) {
-    if (dn.blockScanner != null) {
-      dn.blockScanner.shutdown();
-    }
-  }
 
   /**
    * This method is used for testing. 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 16b6350..0610b94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -484,6 +483,22 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     @Override
     public void releaseReservedSpace(long bytesToRelease) {
     }
+
+    @Override
+    public BlockIterator newBlockIterator(String bpid, String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BlockIterator loadBlockIterator(String bpid, String name)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FsDatasetSpi getDataset() {
+      throw new UnsupportedOperationException();
+    }
   }
 
   private final Map<String, Map<Block, BInfo>> blockMap
@@ -1238,11 +1253,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public RollingLogs createRollingLogs(String bpid, String prefix) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public FsVolumeSpi getVolume(ExtendedBlock b) {
     return volume;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
new file mode 100644
index 0000000..7eaa2bf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
@@ -0,0 +1,680 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
+import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
+import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER;
+import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.Statistics;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBlockScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestBlockScanner.class);
+
+  @Before
+  public void before() {
+    BlockScanner.Conf.allowUnitTestSettings = true;
+    GenericTestUtils.setLogLevel(BlockScanner.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(VolumeScanner.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(FsVolumeImpl.LOG, Level.ALL);
+  }
+
+  private static void disableBlockScanner(Configuration conf) {
+    conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 0L);
+  }
+
+  private static class TestContext implements Closeable {
+    final int numNameServices;
+    final MiniDFSCluster cluster;
+    final DistributedFileSystem[] dfs;
+    final String[] bpids;
+    final DataNode datanode;
+    final BlockScanner blockScanner;
+    final FsDatasetSpi<? extends FsVolumeSpi> data;
+    final List<? extends FsVolumeSpi> volumes;
+
+    TestContext(Configuration conf, int numNameServices) throws Exception {
+      this.numNameServices = numNameServices;
+      MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf).
+          numDataNodes(1).
+          storagesPerDatanode(1);
+      if (numNameServices > 1) {
+        bld.nnTopology(MiniDFSNNTopology.
+              simpleFederatedTopology(numNameServices));
+      }
+      cluster = bld.build();
+      cluster.waitActive();
+      dfs = new DistributedFileSystem[numNameServices];
+      for (int i = 0; i < numNameServices; i++) {
+        dfs[i] = cluster.getFileSystem(i);
+      }
+      bpids = new String[numNameServices];
+      for (int i = 0; i < numNameServices; i++) {
+        bpids[i] = cluster.getNamesystem(i).getBlockPoolId();
+      }
+      datanode = cluster.getDataNodes().get(0);
+      blockScanner = datanode.getBlockScanner();
+      for (int i = 0; i < numNameServices; i++) {
+        dfs[i].mkdirs(new Path("/test"));
+      }
+      data = datanode.getFSDataset();
+      volumes = data.getVolumes();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (cluster != null) {
+        for (int i = 0; i < numNameServices; i++) {
+          dfs[i].delete(new Path("/test"), true);
+        }
+        cluster.shutdown();
+      }
+    }
+
+    public void createFiles(int nsIdx, int numFiles, int length)
+          throws Exception {
+      for (int blockIdx = 0; blockIdx < numFiles; blockIdx++) {
+        DFSTestUtil.createFile(dfs[nsIdx], getPath(blockIdx), length,
+            (short)1, 123L);
+      }
+    }
+
+    public Path getPath(int fileIdx) {
+      return new Path("/test/" + fileIdx);
+    }
+
+    public ExtendedBlock getFileBlock(int nsIdx, int fileIdx)
+          throws Exception {
+      return DFSTestUtil.getFirstBlock(dfs[nsIdx], getPath(fileIdx));
+    }
+  }
+
+  /**
+   * Test iterating through a bunch of blocks in a volume using a volume
+   * iterator.<p/>
+   *
+   * We will rewind the iterator when about halfway through the blocks.
+   *
+   * @param numFiles        The number of files to create.
+   * @param maxStaleness    The maximum staleness to allow with the iterator.
+   * @throws Exception
+   */
+  private void testVolumeIteratorImpl(int numFiles,
+              long maxStaleness) throws Exception {
+    Configuration conf = new Configuration();
+    disableBlockScanner(conf);
+    TestContext ctx = new TestContext(conf, 1);
+    ctx.createFiles(0, numFiles, 1);
+    assertEquals(1, ctx.volumes.size());
+    FsVolumeSpi volume = ctx.volumes.get(0);
+    ExtendedBlock savedBlock = null, loadedBlock = null;
+    boolean testedRewind = false, testedSave = false, testedLoad = false;
+    int blocksProcessed = 0, savedBlocksProcessed = 0;
+    try {
+      BPOfferService bpos[] = ctx.datanode.getAllBpOs();
+      assertEquals(1, bpos.length);
+      BlockIterator iter = volume.newBlockIterator(ctx.bpids[0], "test");
+      assertEquals(ctx.bpids[0], iter.getBlockPoolId());
+      iter.setMaxStalenessMs(maxStaleness);
+      while (true) {
+        HashSet<ExtendedBlock> blocks = new HashSet<ExtendedBlock>();
+        for (int blockIdx = 0; blockIdx < numFiles; blockIdx++) {
+          blocks.add(ctx.getFileBlock(0, blockIdx));
+        }
+        while (true) {
+          ExtendedBlock block = iter.nextBlock();
+          if (block == null) {
+            break;
+          }
+          blocksProcessed++;
+          LOG.info("BlockIterator for {} found block {}, blocksProcessed = {}",
+              volume, block, blocksProcessed);
+          if (testedSave && (savedBlock == null)) {
+            savedBlock = block;
+          }
+          if (testedLoad && (loadedBlock == null)) {
+            loadedBlock = block;
+            // The block that we get back right after loading the iterator
+            // should be the same block we got back right after saving
+            // the iterator.
+            assertEquals(savedBlock, loadedBlock);
+          }
+          boolean blockRemoved = blocks.remove(block);
+          assertTrue("Found unknown block " + block, blockRemoved);
+          if (blocksProcessed > (numFiles / 3)) {
+            if (!testedSave) {
+              LOG.info("Processed {} blocks out of {}.  Saving iterator.",
+                  blocksProcessed, numFiles);
+              iter.save();
+              testedSave = true;
+              savedBlocksProcessed = blocksProcessed;
+            }
+          }
+          if (blocksProcessed > (numFiles / 2)) {
+            if (!testedRewind) {
+              LOG.info("Processed {} blocks out of {}.  Rewinding iterator.",
+                  blocksProcessed, numFiles);
+              iter.rewind();
+              break;
+            }
+          }
+          if (blocksProcessed > ((2 * numFiles) / 3)) {
+            if (!testedLoad) {
+              LOG.info("Processed {} blocks out of {}.  Loading iterator.",
+                  blocksProcessed, numFiles);
+              iter = volume.loadBlockIterator(ctx.bpids[0], "test");
+              iter.setMaxStalenessMs(maxStaleness);
+              break;
+            }
+          }
+        }
+        if (!testedRewind) {
+          testedRewind = true;
+          blocksProcessed = 0;
+          LOG.info("Starting again at the beginning...");
+          continue;
+        }
+        if (!testedLoad) {
+          testedLoad = true;
+          blocksProcessed = savedBlocksProcessed;
+          LOG.info("Starting again at the load point...");
+          continue;
+        }
+        assertEquals(numFiles, blocksProcessed);
+        break;
+      }
+    } finally {
+      ctx.close();
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testVolumeIteratorWithoutCaching() throws Exception {
+    testVolumeIteratorImpl(5, 0);
+  }
+
+  @Test(timeout=60000)
+  public void testVolumeIteratorWithCaching() throws Exception {
+    testVolumeIteratorImpl(600, 100);
+  }
+
+  @Test(timeout=60000)
+  public void testDisableVolumeScanner() throws Exception {
+    Configuration conf = new Configuration();
+    disableBlockScanner(conf);
+    TestContext ctx = new TestContext(conf, 1);
+    try {
+      Assert.assertFalse(ctx.datanode.getBlockScanner().isEnabled());
+    } finally {
+      ctx.close();
+    }
+  }
+
+  public static class TestScanResultHandler extends ScanResultHandler {
+    static class Info {
+      boolean shouldRun = false;
+      final Set<ExtendedBlock> badBlocks = new HashSet<ExtendedBlock>();
+      final Set<ExtendedBlock> goodBlocks = new HashSet<ExtendedBlock>();
+      long blocksScanned = 0;
+      Semaphore sem = null;
+    }
+
+    private VolumeScanner scanner;
+
+    final static ConcurrentHashMap<String, Info> infos =
+        new ConcurrentHashMap<String, Info>();
+
+    static Info getInfo(FsVolumeSpi volume) {
+      Info newInfo = new Info();
+      Info prevInfo = infos.
+          putIfAbsent(volume.getStorageID(), newInfo);
+      return prevInfo == null ? newInfo : prevInfo;
+    }
+
+    @Override
+    public void setup(VolumeScanner scanner) {
+      this.scanner = scanner;
+      Info info = getInfo(scanner.volume);
+      LOG.info("about to start scanning.");
+      synchronized (info) {
+        while (!info.shouldRun) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+      LOG.info("starting scanning.");
+    }
+
+    @Override
+    public void handle(ExtendedBlock block, IOException e) {
+      LOG.info("handling block {} (exception {})", block, e);
+      Info info = getInfo(scanner.volume);
+      Semaphore sem;
+      synchronized (info) {
+        sem = info.sem;
+      }
+      if (sem != null) {
+        try {
+          sem.acquire();
+        } catch (InterruptedException ie) {
+          throw new RuntimeException("interrupted");
+        }
+      }
+      synchronized (info) {
+        if (!info.shouldRun) {
+          throw new RuntimeException("stopping volumescanner thread.");
+        }
+        if (e == null) {
+          info.goodBlocks.add(block);
+        } else {
+          info.badBlocks.add(block);
+        }
+        info.blocksScanned++;
+      }
+    }
+  }
+
+  private void testScanAllBlocksImpl(final boolean rescan) throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576L);
+    if (rescan) {
+      conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 100L);
+    } else {
+      conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    }
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    final TestContext ctx = new TestContext(conf, 1);
+    final int NUM_EXPECTED_BLOCKS = 10;
+    ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
+    final Set<ExtendedBlock> expectedBlocks = new HashSet<ExtendedBlock>();
+    for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) {
+      expectedBlocks.add(ctx.getFileBlock(0, i));
+    }
+    TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>(){
+      @Override
+      public Boolean get() {
+        TestScanResultHandler.Info info =
+            TestScanResultHandler.getInfo(ctx.volumes.get(0));
+        int numFoundBlocks = 0;
+        StringBuilder foundBlocksBld = new StringBuilder();
+        String prefix = "";
+        synchronized (info) {
+          for (ExtendedBlock block : info.goodBlocks) {
+            assertTrue(expectedBlocks.contains(block));
+            numFoundBlocks++;
+            foundBlocksBld.append(prefix).append(block);
+            prefix = ", ";
+          }
+          LOG.info("numFoundBlocks = {}.  blocksScanned = {}. Found blocks {}",
+              numFoundBlocks, info.blocksScanned, foundBlocksBld.toString());
+          if (rescan) {
+            return (numFoundBlocks == NUM_EXPECTED_BLOCKS) &&
+                     (info.blocksScanned >= 2 * NUM_EXPECTED_BLOCKS);
+          } else {
+            return numFoundBlocks == NUM_EXPECTED_BLOCKS;
+          }
+        }
+      }
+    }, 10, 60000);
+    if (!rescan) {
+      synchronized (info) {
+        assertEquals(NUM_EXPECTED_BLOCKS, info.blocksScanned);
+      }
+      Statistics stats = ctx.blockScanner.getVolumeStats(
+          ctx.volumes.get(0).getStorageID());
+      assertEquals(5 * NUM_EXPECTED_BLOCKS, stats.bytesScannedInPastHour);
+      assertEquals(NUM_EXPECTED_BLOCKS, stats.blocksScannedSinceRestart);
+      assertEquals(NUM_EXPECTED_BLOCKS, stats.blocksScannedInCurrentPeriod);
+      assertEquals(0, stats.scanErrorsSinceRestart);
+      assertEquals(1, stats.scansSinceRestart);
+    }
+    ctx.close();
+  }
+
+  /**
+   * Test scanning all blocks.  Set the scan period high enough that
+   * we shouldn't rescan any block during this test.
+   */
+  @Test(timeout=60000)
+  public void testScanAllBlocksNoRescan() throws Exception {
+    testScanAllBlocksImpl(false);
+  }
+
+  /**
+   * Test scanning all blocks.  Set the scan period high enough that
+   * we should rescan all blocks at least twice during this test.
+   */
+  @Test(timeout=60000)
+  public void testScanAllBlocksWithRescan() throws Exception {
+    testScanAllBlocksImpl(true);
+  }
+
+  /**
+   * Test that we don't scan too many blocks per second.
+   */
+  @Test(timeout=120000)
+  public void testScanRateLimit() throws Exception {
+    Configuration conf = new Configuration();
+    // Limit scan bytes per second dramatically
+    conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 4096L);
+    // Scan continuously
+    conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 1L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    final TestContext ctx = new TestContext(conf, 1);
+    final int NUM_EXPECTED_BLOCKS = 5;
+    ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4096);
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    long startMs = Time.monotonicNow();
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+    Thread.sleep(5000);
+    synchronized (info) {
+      long endMs = Time.monotonicNow();
+      // Should scan no more than one block a second.
+      long maxBlocksScanned = ((endMs + 999 - startMs) / 1000);
+      assertTrue(info.blocksScanned < maxBlocksScanned);
+    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          return info.blocksScanned > 0;
+        }
+      }
+    }, 1, 30000);
+    ctx.close();
+  }
+
+  @Test(timeout=120000)
+  public void testCorruptBlockHandling() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    final TestContext ctx = new TestContext(conf, 1);
+    final int NUM_EXPECTED_BLOCKS = 5;
+    final int CORRUPT_INDEX = 3;
+    ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4);
+    ExtendedBlock badBlock = ctx.getFileBlock(0, CORRUPT_INDEX);
+    ctx.cluster.corruptBlockOnDataNodes(badBlock);
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          return info.blocksScanned == NUM_EXPECTED_BLOCKS;
+        }
+      }
+    }, 3, 30000);
+    synchronized (info) {
+      assertTrue(info.badBlocks.contains(badBlock));
+      for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) {
+        if (i != CORRUPT_INDEX) {
+          ExtendedBlock block = ctx.getFileBlock(0, i);
+          assertTrue(info.goodBlocks.contains(block));
+        }
+      }
+    }
+    ctx.close();
+  }
+
+  /**
+   * Test that we save the scan cursor when shutting down the datanode, and
+   * restart scanning from there when the datanode is restarted.
+   */
+  @Test(timeout=120000)
+  public void testDatanodeCursor() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
+    final TestContext ctx = new TestContext(conf, 1);
+    final int NUM_EXPECTED_BLOCKS = 10;
+    ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    synchronized (info) {
+      info.sem = new Semaphore(5);
+      info.shouldRun = true;
+      info.notify();
+    }
+    // Scan the first 5 blocks
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          return info.blocksScanned == 5;
+        }
+      }
+    }, 3, 30000);
+    synchronized (info) {
+      assertEquals(5, info.goodBlocks.size());
+      assertEquals(5, info.blocksScanned);
+      info.shouldRun = false;
+    }
+    ctx.datanode.shutdown();
+    String vPath = ctx.volumes.get(0).getBasePath();
+    File cursorPath = new File(new File(new File(vPath, "current"),
+          ctx.bpids[0]), "scanner.cursor");
+    assertTrue("Failed to find cursor save file in " +
+        cursorPath.getAbsolutePath(), cursorPath.exists());
+    Set<ExtendedBlock> prevGoodBlocks = new HashSet<ExtendedBlock>();
+    synchronized (info) {
+      info.sem = new Semaphore(4);
+      prevGoodBlocks.addAll(info.goodBlocks);
+      info.goodBlocks.clear();
+    }
+
+    // The block that we were scanning when we shut down the DN won't get
+    // recorded.
+    // After restarting the datanode, we should scan the next 4 blocks.
+    ctx.cluster.restartDataNode(0);
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          if (info.blocksScanned != 9) {
+            LOG.info("Waiting for blocksScanned to reach 9.  It is at {}",
+                info.blocksScanned);
+          }
+          return info.blocksScanned == 9;
+        }
+      }
+    }, 3, 30000);
+    synchronized (info) {
+      assertEquals(4, info.goodBlocks.size());
+      info.goodBlocks.addAll(prevGoodBlocks);
+      assertEquals(9, info.goodBlocks.size());
+      assertEquals(9, info.blocksScanned);
+    }
+    ctx.datanode.shutdown();
+
+    // After restarting the datanode, we should not scan any more blocks.
+    // This is because we reached the end of the block pool earlier, and
+    // the scan period is much, much longer than the test time.
+    synchronized (info) {
+      info.sem = null;
+      info.shouldRun = false;
+      info.goodBlocks.clear();
+    }
+    ctx.cluster.restartDataNode(0);
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+    Thread.sleep(3000);
+    synchronized (info) {
+      assertTrue(info.goodBlocks.isEmpty());
+    }
+    ctx.close();
+  }
+
+  @Test(timeout=120000)
+  public void testMultipleBlockPoolScanning() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    final TestContext ctx = new TestContext(conf, 3);
+
+    // We scan 5 bytes per file (1 byte in file, 4 bytes of checksum)
+    final int BYTES_SCANNED_PER_FILE = 5;
+    final int NUM_FILES[] = new int[] { 1, 5, 10 };
+    int TOTAL_FILES = 0;
+    for (int i = 0; i < NUM_FILES.length; i++) {
+      TOTAL_FILES += NUM_FILES[i];
+    }
+    ctx.createFiles(0, NUM_FILES[0], 1);
+    ctx.createFiles(0, NUM_FILES[1], 1);
+    ctx.createFiles(0, NUM_FILES[2], 1);
+
+    // start scanning
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    synchronized (info) {
+      info.shouldRun = true;
+      info.notify();
+    }
+
+    // Wait for all the block pools to be scanned.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          Statistics stats = ctx.blockScanner.getVolumeStats(
+              ctx.volumes.get(0).getStorageID());
+          if (stats.scansSinceRestart < 3) {
+            LOG.info("Waiting for scansSinceRestart to reach 3 (it is {})",
+                stats.scansSinceRestart);
+            return false;
+          }
+          if (!stats.eof) {
+            LOG.info("Waiting for eof.");
+            return false;
+          }
+          return true;
+        }
+      }
+    }, 3, 30000);
+
+    Statistics stats = ctx.blockScanner.getVolumeStats(
+        ctx.volumes.get(0).getStorageID());
+    assertEquals(TOTAL_FILES, stats.blocksScannedSinceRestart);
+    assertEquals(BYTES_SCANNED_PER_FILE * TOTAL_FILES,
+        stats.bytesScannedInPastHour);
+    ctx.close();
+  }
+
+  @Test(timeout=120000)
+  public void testNextSorted() throws Exception {
+    List<String> arr = new LinkedList<String>();
+    arr.add("1");
+    arr.add("3");
+    arr.add("5");
+    arr.add("7");
+    Assert.assertEquals("3", FsVolumeImpl.nextSorted(arr, "2"));
+    Assert.assertEquals("3", FsVolumeImpl.nextSorted(arr, "1"));
+    Assert.assertEquals("1", FsVolumeImpl.nextSorted(arr, ""));
+    Assert.assertEquals("1", FsVolumeImpl.nextSorted(arr, null));
+    Assert.assertEquals(null, FsVolumeImpl.nextSorted(arr, "9"));
+  }
+
+  @Test(timeout=120000)
+  public void testCalculateNeededBytesPerSec() throws Exception {
+    // If we didn't check anything the last hour, we should scan now.
+    Assert.assertTrue(
+        VolumeScanner.calculateShouldScan(100, 0));
+
+    // If, on average, we checked 101 bytes/s checked during the last hour,
+    // stop checking now.
+    Assert.assertFalse(
+        VolumeScanner.calculateShouldScan(100, 101 * 3600));
+
+    // Target is 1 byte / s, but we didn't scan anything in the last minute.
+    // Should scan now.
+    Assert.assertTrue(
+        VolumeScanner.calculateShouldScan(1, 3540));
+
+    // Target is 1000000 byte / s, but we didn't scan anything in the last
+    // minute.  Should scan now.
+    Assert.assertTrue(
+        VolumeScanner.calculateShouldScan(100000L, 354000000L));
+
+    Assert.assertFalse(
+        VolumeScanner.calculateShouldScan(100000L, 365000000L));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 1b8f243..82a1684 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -589,6 +589,22 @@ public class TestDirectoryScanner {
     @Override
     public void releaseReservedSpace(long bytesToRelease) {
     }
+
+    @Override
+    public BlockIterator newBlockIterator(String bpid, String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BlockIterator loadBlockIterator(String bpid, String name)
+          throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FsDatasetSpi getDataset() {
+      throw new UnsupportedOperationException();
+    }
   }
 
   private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
deleted file mode 100644
index 55b1739..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import static org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.SLEEP_PERIOD_MS;
-import org.apache.log4j.Level;
-import org.junit.Assert;
-import org.junit.Test;
-import static org.junit.Assert.fail;
-
-
-public class TestMultipleNNDataBlockScanner {
-  private static final Log LOG = 
-    LogFactory.getLog(TestMultipleNNDataBlockScanner.class);
-  Configuration conf;
-  MiniDFSCluster cluster = null;
-  final String[] bpids = new String[3];
-  final FileSystem[] fs = new FileSystem[3];
-  
-  public void setUp() throws IOException {
-    conf = new HdfsConfiguration();
-    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
-    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 100);
-    cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
-        .build();
-    for (int i = 0; i < 3; i++) {
-      cluster.waitActive(i);
-    }
-    for (int i = 0; i < 3; i++) {
-      bpids[i] = cluster.getNamesystem(i).getBlockPoolId();
-    }
-    for (int i = 0; i < 3; i++) {
-      fs[i] = cluster.getFileSystem(i);
-    }
-    // Create 2 files on each namenode with 10 blocks each
-    for (int i = 0; i < 3; i++) {
-      DFSTestUtil.createFile(fs[i], new Path("file1"), 1000, (short) 1, 0);
-      DFSTestUtil.createFile(fs[i], new Path("file2"), 1000, (short) 1, 1);
-    }
-  }
-  
-  @Test(timeout=120000)
-  public void testDataBlockScanner() throws IOException, InterruptedException {
-    setUp();
-    try {
-      DataNode dn = cluster.getDataNodes().get(0);
-      for (int i = 0; i < 3; i++) {
-        long blocksScanned = 0;
-        while (blocksScanned != 20) {
-          blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]);
-          LOG.info("Waiting for all blocks to be scanned for bpid=" + bpids[i]
-              + "; Scanned so far=" + blocksScanned);
-          Thread.sleep(5000);
-        }
-      }
-
-      StringBuilder buffer = new StringBuilder();
-      dn.blockScanner.printBlockReport(buffer, false);
-      LOG.info("Block Report\n" + buffer.toString());
-    } finally {
-      cluster.shutdown();
-    }
-  }
-  
-  @Test(timeout=120000)
-  public void testBlockScannerAfterRefresh() throws IOException,
-      InterruptedException {
-    setUp();
-    try {
-      Configuration dnConf = cluster.getDataNodes().get(0).getConf();
-      Configuration conf = new HdfsConfiguration(dnConf);
-      StringBuilder namenodesBuilder = new StringBuilder();
-
-      String bpidToShutdown = cluster.getNamesystem(2).getBlockPoolId();
-      for (int i = 0; i < 2; i++) {
-        String nsId = DFSUtil.getNamenodeNameServiceId(cluster
-            .getConfiguration(i));
-        namenodesBuilder.append(nsId);
-        namenodesBuilder.append(",");
-      }
-
-      conf.set(DFSConfigKeys.DFS_NAMESERVICES, namenodesBuilder
-          .toString());
-      DataNode dn = cluster.getDataNodes().get(0);
-      dn.refreshNamenodes(conf);
-
-      try {
-        while (true) {
-          dn.blockScanner.getBlocksScannedInLastRun(bpidToShutdown);
-          Thread.sleep(1000);
-        }
-      } catch (IOException ex) {
-        // Expected
-        LOG.info(ex.getMessage());
-      }
-
-      namenodesBuilder.append(DFSUtil.getNamenodeNameServiceId(cluster
-          .getConfiguration(2)));
-      conf.set(DFSConfigKeys.DFS_NAMESERVICES, namenodesBuilder
-          .toString());
-      dn.refreshNamenodes(conf);
-
-      for (int i = 0; i < 3; i++) {
-        long blocksScanned = 0;
-        while (blocksScanned != 20) {
-          blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]);
-          LOG.info("Waiting for all blocks to be scanned for bpid=" + bpids[i]
-              + "; Scanned so far=" + blocksScanned);
-          Thread.sleep(5000);
-        }
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-  
-  @Test(timeout=120000)
-  public void testBlockScannerAfterRestart() throws IOException,
-      InterruptedException {
-    setUp();
-    try {
-      cluster.restartDataNode(0);
-      cluster.waitActive();
-      DataNode dn = cluster.getDataNodes().get(0);
-      for (int i = 0; i < 3; i++) {
-        while (!dn.blockScanner.isInitialized(bpids[i])) {
-          Thread.sleep(1000);
-        }
-        long blocksScanned = 0;
-        while (blocksScanned != 20) {
-          if (dn.blockScanner != null) {
-            blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]);
-            LOG.info("Waiting for all blocks to be scanned for bpid="
-                + bpids[i] + "; Scanned so far=" + blocksScanned);
-          }
-          Thread.sleep(5000);
-        }
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-  
-  @Test(timeout=120000)
-  public void test2NNBlockRescanInterval() throws IOException {
-    ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL);
-    Configuration conf = new HdfsConfiguration();
-    cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
-        .build();
-
-    try {
-      FileSystem fs = cluster.getFileSystem(1);
-      Path file2 = new Path("/test/testBlockScanInterval");
-      DFSTestUtil.createFile(fs, file2, 30, (short) 1, 0);
-
-      fs = cluster.getFileSystem(0);
-      Path file1 = new Path("/test/testBlockScanInterval");
-      DFSTestUtil.createFile(fs, file1, 30, (short) 1, 0);
-      for (int i = 0; i < 8; i++) {
-        LOG.info("Verifying that the blockscanner scans exactly once");
-        waitAndScanBlocks(1, 1);
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  /**
-   * HDFS-3828: DN rescans blocks too frequently
-   * 
-   * @throws Exception
-   */
-  @Test(timeout=120000)
-  public void testBlockRescanInterval() throws IOException {
-    ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL);
-    Configuration conf = new HdfsConfiguration();
-    cluster = new MiniDFSCluster.Builder(conf).build();
-
-    try {
-      FileSystem fs = cluster.getFileSystem();
-      Path file1 = new Path("/test/testBlockScanInterval");
-      DFSTestUtil.createFile(fs, file1, 30, (short) 1, 0);
-      for (int i = 0; i < 4; i++) {
-        LOG.info("Verifying that the blockscanner scans exactly once");
-        waitAndScanBlocks(1, 1);
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  void waitAndScanBlocks(long scansLastRun, long scansTotal)
-      throws IOException {
-    // DataBlockScanner will run for every 5 seconds so we are checking for
-    // every 5 seconds
-    int n = 5;
-    String bpid = cluster.getNamesystem(0).getBlockPoolId();
-    DataNode dn = cluster.getDataNodes().get(0);
-    long blocksScanned, total;
-    do {
-      try {
-        Thread.sleep(SLEEP_PERIOD_MS);
-      } catch (InterruptedException e) {
-        fail("Interrupted: " + e);
-      }
-      blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpid);
-      total = dn.blockScanner.getTotalScans(bpid);
-      LOG.info("bpid = " + bpid + " blocksScanned = " + blocksScanned + " total=" + total);
-    } while (n-- > 0 && (blocksScanned != scansLastRun || scansTotal != total));
-    Assert.assertEquals(scansTotal, total);
-    Assert.assertEquals(scansLastRun, blocksScanned);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index f256ee6..8fd51d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -51,12 +50,6 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
       StorageType.DEFAULT);
 
   @Override
-  public RollingLogs createRollingLogs(String bpid, String prefix)
-      throws IOException {
-    return new ExternalRollingLogs();
-  }
-
-  @Override
   public List<ExternalVolumeImpl> getVolumes() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java
deleted file mode 100644
index c9fb7c8..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.extdataset;
-
-import java.io.IOException;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-
-public class ExternalRollingLogs implements RollingLogs {
-
-  private class ExternalLineIterator implements LineIterator {
-    @Override
-    public boolean isPrevious() {
-      return false;
-    }
-
-    @Override
-    public boolean isLastReadFromPrevious() {
-      return false;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return false;
-    }
-
-    @Override
-    public String next() {
-      return null;
-    }
-
-    @Override
-    public void remove() {
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-  }
-
-  private class ExternalAppender implements Appender {
-    @Override
-    public Appendable append(CharSequence cs) throws IOException {
-      return null;
-    }
-
-    @Override
-    public Appendable append(CharSequence cs, int i, int i1)
-	throws IOException {
-      return null;
-    }
-
-    @Override
-    public Appendable append(char c) throws IOException {
-      return null;
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-  }
-
-  @Override
-  public LineIterator iterator(boolean skipPrevious) throws IOException {
-    return new ExternalLineIterator();
-  }
-
-  @Override
-  public Appender appender() {
-    return new ExternalAppender();
-  }
-
-  @Override
-  public boolean roll() throws IOException {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index 857e946..0ea33bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 
@@ -79,4 +80,20 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
   @Override
   public void releaseReservedSpace(long bytesToRelease) {
   }
+
+  @Override
+  public BlockIterator newBlockIterator(String bpid, String name) {
+    return null;
+  }
+
+  @Override
+  public BlockIterator loadBlockIterator(String bpid, String name)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public FsDatasetSpi getDataset() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
index 791bb76..82a6951 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.junit.Test;
 
 /**
@@ -80,14 +79,6 @@ public class TestExternalDataset {
   }
 
   /**
-   * Tests instantiating a RollingLogs subclass.
-   */
-  @Test
-  public void testInstantiateRollingLogs() throws Throwable {
-    RollingLogs inst = new ExternalRollingLogs();
-  }
-
-  /**
    * Tests instantiating an FsVolumeSpi subclass.
    */
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java
index f92d949..691d390 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
@@ -41,16 +43,21 @@ public class FsVolumeListTest {
       new RoundRobinVolumeChoosingPolicy<>();
   private FsDatasetImpl dataset = null;
   private String baseDir;
+  private BlockScanner blockScanner;
 
   @Before
   public void setUp() {
     dataset = mock(FsDatasetImpl.class);
     baseDir = new FileSystemTestHelper().getTestRootDir();
+    Configuration blockScannerConf = new Configuration();
+    blockScannerConf.setInt(DFSConfigKeys.
+        DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+    blockScanner = new BlockScanner(null, blockScannerConf);
   }
 
   @Test
   public void testGetNextVolumeWithClosedVolume() throws IOException {
-    FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+    FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser);
     List<FsVolumeImpl> volumes = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       File curDir = new File(baseDir, "nextvolume-" + i);
@@ -59,7 +66,7 @@ public class FsVolumeListTest {
           conf, StorageType.DEFAULT);
       volume.setCapacityForTesting(1024 * 1024 * 1024);
       volumes.add(volume);
-      volumeList.addVolume(volume);
+      volumeList.addVolume(volume.obtainReference());
     }
 
     // Close the second volume.
@@ -75,7 +82,7 @@ public class FsVolumeListTest {
 
   @Test
   public void testCheckDirsWithClosedVolume() throws IOException {
-    FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+    FsVolumeList volumeList = new FsVolumeList(0, blockScanner, blockChooser);
     List<FsVolumeImpl> volumes = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       File curDir = new File(baseDir, "volume-" + i);
@@ -83,7 +90,7 @@ public class FsVolumeListTest {
       FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir,
           conf, StorageType.DEFAULT);
       volumes.add(volume);
-      volumeList.addVolume(volume);
+      volumeList.addVolume(volume.obtainReference());
     }
 
     // Close the 2nd volume.
@@ -91,4 +98,4 @@ public class FsVolumeListTest {
     // checkDirs() should ignore the 2nd volume since it is closed.
     volumeList.checkDirs();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index ca936b3..f3d15de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -22,17 +22,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -51,19 +51,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -81,7 +79,6 @@ public class TestFsDatasetImpl {
   private Configuration conf;
   private DataNode datanode;
   private DataStorage storage;
-  private DataBlockScanner scanner;
   private FsDatasetImpl dataset;
 
   private static Storage.StorageDirectory createStorageDirectory(File root) {
@@ -112,13 +109,14 @@ public class TestFsDatasetImpl {
   public void setUp() throws IOException {
     datanode = mock(DataNode.class);
     storage = mock(DataStorage.class);
-    scanner = mock(DataBlockScanner.class);
     this.conf = new Configuration();
+    this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
     final DNConf dnConf = new DNConf(conf);
 
     when(datanode.getConf()).thenReturn(conf);
     when(datanode.getDnConf()).thenReturn(dnConf);
-    when(datanode.getBlockScanner()).thenReturn(scanner);
+    final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
+    when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
 
     createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
     dataset = new FsDatasetImpl(datanode, storage, conf);
@@ -208,10 +206,6 @@ public class TestFsDatasetImpl {
     assertEquals("The replica infos on this volume has been removed from the "
                  + "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
                  totalNumReplicas);
-
-    // Verify that every BlockPool deletes the removed blocks from the volume.
-    verify(scanner, times(BLOCK_POOL_IDS.length))
-        .deleteBlocks(anyString(), any(Block[].class));
   }
 
   @Test(timeout = 5000)
@@ -245,7 +239,9 @@ public class TestFsDatasetImpl {
   public void testChangeVolumeWithRunningCheckDirs() throws IOException {
     RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
         new RoundRobinVolumeChoosingPolicy<>();
-    final FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+    final BlockScanner blockScanner = new BlockScanner(datanode, conf);
+    final FsVolumeList volumeList =
+        new FsVolumeList(0, blockScanner, blockChooser);
     final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
 
     // Initialize FsVolumeList with 5 mock volumes.
@@ -254,19 +250,23 @@ public class TestFsDatasetImpl {
       FsVolumeImpl volume = mock(FsVolumeImpl.class);
       oldVolumes.add(volume);
       when(volume.getBasePath()).thenReturn("data" + i);
-      volumeList.addVolume(volume);
+      FsVolumeReference ref = mock(FsVolumeReference.class);
+      when(ref.getVolume()).thenReturn(volume);
+      volumeList.addVolume(ref);
     }
 
     // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
     // volume and add another volume. It does not affect checkDirs() running.
     final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
+    final FsVolumeReference newRef = mock(FsVolumeReference.class);
+    when(newRef.getVolume()).thenReturn(newVolume);
     FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
     doAnswer(new Answer() {
       @Override
       public Object answer(InvocationOnMock invocationOnMock)
           throws Throwable {
         volumeList.removeVolume(new File("data4"));
-        volumeList.addVolume(newVolume);
+        volumeList.addVolume(newRef);
         return null;
       }
     }).when(blockedVolume).checkDirs();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
index 3609684..6cc3d7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
@@ -184,8 +184,8 @@ public class TestInterDatanodeProtocol {
       InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(
           datanode, datanodeinfo[0], conf, useDnHostname);
       
-      //stop block scanner, so we could compare lastScanTime
-      DataNodeTestUtils.shutdownBlockScanner(datanode);
+      // Stop the block scanners.
+      datanode.getBlockScanner().removeAllVolumeScanners();
 
       //verify BlockMetaDataInfo
       ExtendedBlock b = locatedblock.getBlock();


[4/4] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)

Posted by cm...@apache.org.
HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e62a1a6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e62a1a6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e62a1a6

Branch: refs/heads/trunk
Commit: 6e62a1a6728b1f782f64065424f92b292c3f163a
Parents: a003f71
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Wed Dec 17 11:27:48 2014 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Wed Jan 21 19:00:53 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   2 +
 .../hdfs/server/datanode/BPOfferService.java    |   3 -
 .../hdfs/server/datanode/BPServiceActor.java    |   6 -
 .../server/datanode/BlockPoolSliceScanner.java  | 872 -------------------
 .../hdfs/server/datanode/BlockReceiver.java     |   8 -
 .../hdfs/server/datanode/BlockScanner.java      | 308 +++++++
 .../hdfs/server/datanode/BlockSender.java       |   3 -
 .../hdfs/server/datanode/DataBlockScanner.java  | 339 -------
 .../hadoop/hdfs/server/datanode/DataNode.java   |  73 +-
 .../hdfs/server/datanode/VolumeScanner.java     | 652 ++++++++++++++
 .../server/datanode/fsdataset/FsDatasetSpi.java |  32 +-
 .../server/datanode/fsdataset/FsVolumeSpi.java  | 110 +++
 .../server/datanode/fsdataset/RollingLogs.java  |  73 --
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  44 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   | 347 ++++++++
 .../datanode/fsdataset/impl/FsVolumeList.java   |  24 +-
 .../fsdataset/impl/RollingLogsImpl.java         | 241 -----
 .../src/main/resources/hdfs-default.xml         |  20 +
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  16 +
 .../hadoop/hdfs/TestDatanodeBlockScanner.java   | 551 ------------
 .../org/apache/hadoop/hdfs/TestReplication.java |   3 +-
 .../TestOverReplicatedBlocks.java               |  13 +-
 .../server/datanode/BlockReportTestBase.java    |   7 +-
 .../hdfs/server/datanode/DataNodeTestUtils.java |  24 -
 .../server/datanode/SimulatedFSDataset.java     |  22 +-
 .../hdfs/server/datanode/TestBlockScanner.java  | 680 +++++++++++++++
 .../server/datanode/TestDirectoryScanner.java   |  16 +
 .../TestMultipleNNDataBlockScanner.java         | 245 ------
 .../extdataset/ExternalDatasetImpl.java         |   7 -
 .../extdataset/ExternalRollingLogs.java         |  92 --
 .../datanode/extdataset/ExternalVolumeImpl.java |  17 +
 .../extdataset/TestExternalDataset.java         |   9 -
 .../fsdataset/impl/FsVolumeListTest.java        |  17 +-
 .../fsdataset/impl/TestFsDatasetImpl.java       |  30 +-
 .../impl/TestInterDatanodeProtocol.java         |   4 +-
 .../namenode/snapshot/SnapshotTestHelper.java   |   4 +-
 37 files changed, 2288 insertions(+), 2629 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 25ad33b..866b765 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -140,6 +140,9 @@ Trunk (Unreleased)
     class and constructor to public; and fix FsDatasetSpi to use generic type
     instead of FsVolumeImpl.  (David Powell and Joe Pallas via szetszwo)
 
+    HDFS-7430. Rewrite the BlockScanner to use O(1) memory and use multiple
+    threads (cmccabe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index fb958f1..60581b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -441,6 +441,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
   public static final String  DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
   public static final int     DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
+  public static final String  DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
+  public static final long    DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
   public static final String  DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
   public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
   public static final String  DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 4a54bed..dfeacde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -656,9 +656,6 @@ class BPOfferService {
       //
       Block toDelete[] = bcmd.getBlocks();
       try {
-        if (dn.blockScanner != null) {
-          dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
-        }
         // using global fsdataset
         dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete);
       } catch(IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index e6409ab..e396727 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -736,12 +736,6 @@ class BPServiceActor implements Runnable {
         DatanodeCommand cmd = cacheReport();
         processCommand(new DatanodeCommand[]{ cmd });
 
-        // Now safe to start scanning the block pool.
-        // If it has already been started, this is a no-op.
-        if (dn.blockScanner != null) {
-          dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
-        }
-
         //
         // There is no work to do;  sleep until hearbeat timer elapses, 
         // or work arrives, and then iterate again.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
deleted file mode 100644
index f36fea1..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ /dev/null
@@ -1,872 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.util.GSet;
-import org.apache.hadoop.util.LightWeightGSet;
-import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Scans the block files under a block pool and verifies that the
- * files are not corrupt.
- * This keeps track of blocks and their last verification times.
- * Currently it does not modify the metadata for block.
- */
-
-class BlockPoolSliceScanner {
-  
-  public static final Log LOG = LogFactory.getLog(BlockPoolSliceScanner.class);
-  
-  private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
-
-  private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
-  private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
-  private static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
-
-  private static final String VERIFICATION_PREFIX = "dncp_block_verification.log";
-
-  private final String blockPoolId;
-  private final long scanPeriod;
-  private final AtomicLong lastScanTime = new AtomicLong();
-
-  private final DataNode datanode;
-  private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
-  
-  private final SortedSet<BlockScanInfo> blockInfoSet
-      = new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
-
-  private final SortedSet<BlockScanInfo> newBlockInfoSet =
-      new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
-
-  private final GSet<Block, BlockScanInfo> blockMap
-      = new LightWeightGSet<Block, BlockScanInfo>(
-          LightWeightGSet.computeCapacity(0.5, "BlockMap"));
-  
-  // processedBlocks keeps track of which blocks are scanned
-  // since the last run.
-  private volatile HashMap<Long, Integer> processedBlocks;
-  
-  private long totalScans = 0;
-  private long totalScanErrors = 0;
-  private long totalTransientErrors = 0;
-  private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only
-  
-  private long currentPeriodStart = Time.monotonicNow();
-  private long bytesLeft = 0; // Bytes to scan in this period
-  private long totalBytesToScan = 0;
-  private boolean isNewPeriod = true;
-  private int lastScanTimeDifference = 5*60*1000;
-  
-  private final LogFileHandler verificationLog;
-  
-  private final DataTransferThrottler throttler = new DataTransferThrottler(
-       200, MAX_SCAN_RATE);
-  
-  private static enum ScanType {
-    IMMEDIATE_SCAN,  
-    VERIFICATION_SCAN,     // scanned as part of periodic verfication
-    NONE,
-  }
-
-  // Extend Block because in the DN process there's a 1-to-1 correspondence of
-  // BlockScanInfo to Block instances, so by extending rather than containing
-  // Block, we can save a bit of Object overhead (about 24 bytes per block
-  // replica.)
-  static class BlockScanInfo extends Block
-      implements LightWeightGSet.LinkedElement {
-
-    /** Compare the info by the last scan time. */
-    static final Comparator<BlockScanInfo> LAST_SCAN_TIME_COMPARATOR
-        = new Comparator<BlockPoolSliceScanner.BlockScanInfo>() {
-
-      @Override
-      public int compare(BlockScanInfo left, BlockScanInfo right) {
-        final ScanType leftNextScanType = left.nextScanType;
-        final ScanType rightNextScanType = right.nextScanType;
-        final long l = left.lastScanTime;
-        final long r = right.lastScanTime;
-        // Compare by nextScanType if they are same then compare by 
-        // lastScanTimes
-        // compare blocks itself if scantimes are same to avoid.
-        // because TreeMap uses comparator if available to check existence of
-        // the object. 
-        int compareByNextScanType = leftNextScanType.compareTo(rightNextScanType);
-        return compareByNextScanType < 0? -1: compareByNextScanType > 0? 1:  l < r? -1: l > r? 1: left.compareTo(right); 
-      }
-    };
-
-    long lastScanTime = 0;
-    ScanType lastScanType = ScanType.NONE; 
-    boolean lastScanOk = true;
-    private LinkedElement next;
-    ScanType nextScanType = ScanType.VERIFICATION_SCAN;
-    
-    BlockScanInfo(Block block) {
-      super(block);
-    }
-    
-    @Override
-    public int hashCode() {
-      return super.hashCode();
-    }
-    
-    @Override
-    public boolean equals(Object that) {
-      if (this == that) {
-        return true;
-      }
-      return super.equals(that);
-    }
-    
-    long getLastScanTime() {
-      return (lastScanType == ScanType.NONE) ? 0 : lastScanTime;
-    }
-
-    @Override
-    public void setNext(LinkedElement next) {
-      this.next = next;
-    }
-
-    @Override
-    public LinkedElement getNext() {
-      return next;
-    }
-  }
-  
-  BlockPoolSliceScanner(String bpid, DataNode datanode,
-      FsDatasetSpi<? extends FsVolumeSpi> dataset, Configuration conf) {
-    this.datanode = datanode;
-    this.dataset = dataset;
-    this.blockPoolId  = bpid;
-    
-    long hours = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 
-                             DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT);
-    if (hours <= 0) {
-      hours = DEFAULT_SCAN_PERIOD_HOURS;
-    }
-    this.scanPeriod = hours * 3600 * 1000;
-    LOG.info("Periodic Block Verification Scanner initialized with interval "
-        + hours + " hours for block pool " + bpid);
-
-    // get the list of blocks and arrange them in random order
-    List<FinalizedReplica> arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId);
-    Collections.shuffle(arr);
-    
-    long scanTime = -1;
-    for (Block block : arr) {
-      BlockScanInfo info = new BlockScanInfo( block );
-      info.lastScanTime = scanTime--; 
-      //still keep 'info.lastScanType' to NONE.
-      addBlockInfo(info, false);
-    }
-
-    RollingLogs rollingLogs = null;
-    try {
-       rollingLogs = dataset.createRollingLogs(blockPoolId, VERIFICATION_PREFIX);
-    } catch (IOException e) {
-      LOG.warn("Could not open verfication log. " +
-               "Verification times are not stored.");
-    }
-    verificationLog = rollingLogs == null? null: new LogFileHandler(rollingLogs);
-  }
-  
-  String getBlockPoolId() {
-    return blockPoolId;
-  }
-  
-  private void updateBytesToScan(long len, long lastScanTime) {
-    // len could be negative when a block is deleted.
-    totalBytesToScan += len;
-    if ( lastScanTime < currentPeriodStart ) {
-      bytesLeft += len;
-    }
-    // Should we change throttler bandwidth every time bytesLeft changes?
-    // not really required.
-  }
-
-  /**
-   * Add the BlockScanInfo to sorted set of blockScanInfo
-   * @param info BlockScanInfo to be added
-   * @param isNewBlock true if the block is the new Block, false if
-   *          BlockScanInfo is being updated with new scanTime
-   */
-  private synchronized void addBlockInfo(BlockScanInfo info,
-      boolean isNewBlock) {
-    boolean added = false;
-    if (isNewBlock) {
-      // check whether the block already present
-      boolean exists = blockInfoSet.contains(info);
-      added = !exists && newBlockInfoSet.add(info);
-    } else {
-      added = blockInfoSet.add(info);
-    }
-    blockMap.put(info);
-    
-    if (added) {
-      updateBytesToScan(info.getNumBytes(), info.lastScanTime);
-    }
-  }
-
-  private synchronized void delBlockInfo(BlockScanInfo info) {
-    boolean exists = blockInfoSet.remove(info);
-    if (!exists){
-      exists = newBlockInfoSet.remove(info);
-    }
-    blockMap.remove(info);
-
-    if (exists) {
-      updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
-    }
-  }
-
-  /** Update blockMap by the given LogEntry */
-  private synchronized void updateBlockInfo(LogEntry e) {
-    BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
-    
-    if (info != null && e.verificationTime > 0 && 
-        info.lastScanTime < e.verificationTime) {
-      delBlockInfo(info);
-      if (info.nextScanType != ScanType.IMMEDIATE_SCAN) {
-        info.lastScanTime = e.verificationTime;
-      }
-      info.lastScanType = ScanType.VERIFICATION_SCAN;
-      addBlockInfo(info, false);
-    }
-  }
-
-  private synchronized long getNewBlockScanTime() {
-    /* If there are a lot of blocks, this returns a random time with in 
-     * the scan period. Otherwise something sooner.
-     */
-    long period = Math.min(scanPeriod, 
-                           Math.max(blockMap.size(),1) * 600 * 1000L);
-    int periodInt = Math.abs((int)period);
-    return Time.monotonicNow() - scanPeriod +
-        DFSUtil.getRandom().nextInt(periodInt);
-  }
-
-  /** Adds block to list of blocks 
-   * @param scanNow - true if we want to make that particular block a high 
-   * priority one to scan immediately
-   **/
-  synchronized void addBlock(ExtendedBlock block, boolean scanNow) {
-    BlockScanInfo info = blockMap.get(block.getLocalBlock());
-    long lastScanTime = 0;
-    if (info != null) {
-      lastScanTime = info.lastScanTime;
-    }
-    // If the particular block is scanned in last 5 minutes, the  no need to 
-    // verify that block again
-    if (scanNow && Time.monotonicNow() - lastScanTime < 
-        lastScanTimeDifference) {
-      return;
-    }
-    
-    if ( info != null ) {
-      LOG.warn("Adding an already existing block " + block);
-      delBlockInfo(info);
-    }
-    
-    info = new BlockScanInfo(block.getLocalBlock());    
-    info.lastScanTime = getNewBlockScanTime();
-    if (scanNow) {
-      // Create a new BlockScanInfo object and set the lastScanTime to 0
-      // which will make it the high priority block
-      LOG.info("Adding block for immediate verification " + block);
-      info.nextScanType = ScanType.IMMEDIATE_SCAN;
-    }
-    
-    addBlockInfo(info, true);
-    adjustThrottler();
-  }
-  
-  /** Deletes the block from internal structures */
-  synchronized void deleteBlock(Block block) {
-    BlockScanInfo info = blockMap.get(block);
-    if (info != null) {
-      delBlockInfo(info);
-    }
-  }
-
-  @VisibleForTesting
-  long getTotalScans() {
-    return totalScans;
-  }
-
-  /** @return the last scan time for the block pool. */
-  long getLastScanTime() {
-    return lastScanTime.get();
-  }
-
-  /** @return the last scan time the given block. */
-  synchronized long getLastScanTime(Block block) {
-    BlockScanInfo info = blockMap.get(block);
-    return info == null? 0: info.lastScanTime;
-  }
-
-  /** Deletes blocks from internal structures */
-  void deleteBlocks(Block[] blocks) {
-    for ( Block b : blocks ) {
-      deleteBlock(b);
-    }
-  }
-  
-  private synchronized void updateScanStatus(BlockScanInfo info,
-                                             ScanType type,
-                                             boolean scanOk) {
-    delBlockInfo(info);
-
-    long now = Time.monotonicNow();
-    info.lastScanType = type;
-    info.lastScanTime = now;
-    info.lastScanOk = scanOk;
-    info.nextScanType = ScanType.VERIFICATION_SCAN;
-    addBlockInfo(info, false);
-        
-    // Don't update meta data if the verification failed.
-    if (!scanOk) {
-      return;
-    }
-    
-    if (verificationLog != null) {
-      verificationLog.append(now, info.getGenerationStamp(),
-          info.getBlockId());
-    }
-  }
-  
-  private void handleScanFailure(ExtendedBlock block) {
-    LOG.info("Reporting bad " + block);
-    try {
-      datanode.reportBadBlocks(block);
-    } catch (IOException ie) {
-      // it is bad, but not bad enough to shutdown the scanner
-      LOG.warn("Cannot report bad " + block.getBlockId());
-    }
-  }
-  
-  @VisibleForTesting
-  synchronized void setLastScanTimeDifference(int lastScanTimeDifference) {
-    this.lastScanTimeDifference = lastScanTimeDifference;
-  }
-  
-  static private class LogEntry {
-
-    long blockId = -1;
-    long verificationTime = -1;
-    long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
-    
-    /**
-     * The format consists of single line with multiple entries. each 
-     * entry is in the form : name="value".
-     * This simple text and easily extendable and easily parseable with a
-     * regex.
-     */
-    private static final Pattern entryPattern =
-      Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
-    
-    static String toString(long verificationTime, long genStamp, long blockId,
-        DateFormat dateFormat) {
-      return "\ndate=\"" + dateFormat.format(new Date(verificationTime))
-          + "\"\t time=\"" + verificationTime
-          + "\"\t genstamp=\"" + genStamp
-          + "\"\t id=\"" + blockId + "\"";
-    }
-
-    static LogEntry parseEntry(String line) {
-      LogEntry entry = new LogEntry();
-      
-      Matcher matcher = entryPattern.matcher(line);
-      while (matcher.find()) {
-        String name = matcher.group(1);
-        String value = matcher.group(2);
-        
-        try {
-          if (name.equals("id")) {
-            entry.blockId = Long.parseLong(value);
-          } else if (name.equals("time")) {
-            entry.verificationTime = Long.parseLong(value);
-          } else if (name.equals("genstamp")) {
-            entry.genStamp = Long.parseLong(value);
-          }
-        } catch(NumberFormatException nfe) {
-          LOG.warn("Cannot parse line: " + line, nfe);
-          return null;
-        }
-      }
-      
-      return entry;
-    }
-  }
-  
-  private synchronized void adjustThrottler() {
-    long timeLeft = Math.max(1L,
-        currentPeriodStart + scanPeriod - Time.monotonicNow());
-    long bw = Math.max((bytesLeft * 1000) / timeLeft, MIN_SCAN_RATE);
-    throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
-  }
-  
-  @VisibleForTesting
-  void verifyBlock(ExtendedBlock block) {
-    BlockSender blockSender = null;
-
-    /* In case of failure, attempt to read second time to reduce
-     * transient errors. How do we flush block data from kernel 
-     * buffers before the second read? 
-     */
-    for (int i=0; i<2; i++) {
-      boolean second = (i > 0);
-      
-      try {
-        adjustThrottler();
-        
-        blockSender = new BlockSender(block, 0, -1, false, true, true, 
-            datanode, null, CachingStrategy.newDropBehind());
-
-        DataOutputStream out = 
-                new DataOutputStream(new IOUtils.NullOutputStream());
-        
-        blockSender.sendBlock(out, null, throttler);
-
-        LOG.info((second ? "Second " : "") +
-                 "Verification succeeded for " + block);
-        
-        if ( second ) {
-          totalTransientErrors++;
-        }
-        
-        updateScanStatus((BlockScanInfo)block.getLocalBlock(),
-            ScanType.VERIFICATION_SCAN, true);
-
-        return;
-      } catch (IOException e) {
-        updateScanStatus((BlockScanInfo)block.getLocalBlock(),
-            ScanType.VERIFICATION_SCAN, false);
-
-        // If the block does not exists anymore, then its not an error
-        if (!dataset.contains(block)) {
-          LOG.info(block + " is no longer in the dataset");
-          deleteBlock(block.getLocalBlock());
-          return;
-        }
-
-        // If the block exists, the exception may due to a race with write:
-        // The BlockSender got an old block path in rbw. BlockReceiver removed
-        // the rbw block from rbw to finalized but BlockSender tried to open the
-        // file before BlockReceiver updated the VolumeMap. The state of the
-        // block can be changed again now, so ignore this error here. If there
-        // is a block really deleted by mistake, DirectoryScan should catch it.
-        if (e instanceof FileNotFoundException ) {
-          LOG.info("Verification failed for " + block +
-              " - may be due to race with write");
-          deleteBlock(block.getLocalBlock());
-          return;
-        }
-
-        LOG.warn((second ? "Second " : "First ") + "Verification failed for "
-            + block, e);
-        
-        if (second) {
-          totalScanErrors++;
-          datanode.getMetrics().incrBlockVerificationFailures();
-          handleScanFailure(block);
-          return;
-        } 
-      } finally {
-        IOUtils.closeStream(blockSender);
-        datanode.getMetrics().incrBlocksVerified();
-        totalScans++;
-      }
-    }
-  }
-  
-  private synchronized long getEarliestScanTime() {
-    if (!blockInfoSet.isEmpty()) {
-      return blockInfoSet.first().lastScanTime;
-    }
-    return Long.MAX_VALUE; 
-  }
-  
-  private synchronized boolean isFirstBlockProcessed() {
-    if (!blockInfoSet.isEmpty()) {
-      if (blockInfoSet.first().nextScanType == ScanType.IMMEDIATE_SCAN) {
-        return false;
-      }
-      long blockId = blockInfoSet.first().getBlockId();
-      if ((processedBlocks.get(blockId) != null)
-          && (processedBlocks.get(blockId) == 1)) {
-        return true;
-      }
-    }
-    return false;
-  }
-  
-  // Picks one block and verifies it
-  private void verifyFirstBlock() {
-    BlockScanInfo block = null;
-    synchronized (this) {
-      if (!blockInfoSet.isEmpty()) {
-        block = blockInfoSet.first();
-      }
-    }
-    if ( block != null ) {
-      verifyBlock(new ExtendedBlock(blockPoolId, block));
-      processedBlocks.put(block.getBlockId(), 1);
-    }
-  }
-  
-  // Used for tests only
-  int getBlocksScannedInLastRun() {
-    return totalBlocksScannedInLastRun.get();
-  }
-
-  /**
-   * Reads the current and previous log files (if any) and marks the blocks
-   * processed if they were processed within last scan period. Copies the log
-   * records of recently scanned blocks from previous to current file. 
-   * Returns false if the process was interrupted because the thread is marked 
-   * to exit.
-   */
-  private boolean assignInitialVerificationTimes() {
-    //First updates the last verification times from the log file.
-    if (verificationLog != null) {
-      long now = Time.monotonicNow();
-      RollingLogs.LineIterator logIterator = null;
-      try {
-        logIterator = verificationLog.logs.iterator(false);
-        // update verification times from the verificationLog.
-        while (logIterator.hasNext()) {
-          if (!datanode.shouldRun
-              || datanode.blockScanner.blockScannerThread.isInterrupted()) {
-            return false;
-          }
-          LogEntry entry = LogEntry.parseEntry(logIterator.next());
-          if (entry != null) {
-            updateBlockInfo(entry);
-            if (now - entry.verificationTime < scanPeriod) {
-              BlockScanInfo info = blockMap.get(new Block(entry.blockId, 0,
-                  entry.genStamp));
-              if (info != null) {
-                if (processedBlocks.get(entry.blockId) == null) {
-                  if (isNewPeriod) {
-                    updateBytesLeft(-info.getNumBytes());
-                  }
-                  processedBlocks.put(entry.blockId, 1);
-                }
-                if (logIterator.isLastReadFromPrevious()) {
-                  // write the log entry to current file
-                  // so that the entry is preserved for later runs.
-                  verificationLog.append(entry.verificationTime, entry.genStamp,
-                      entry.blockId);
-                }
-              }
-            }
-          }
-        }
-      } catch (IOException e) {
-        LOG.warn("Failed to read previous verification times.", e);
-      } finally {
-        IOUtils.closeStream(logIterator);
-      }
-      isNewPeriod = false;
-    }
-    
-    
-    /* Before this loop, entries in blockInfoSet that are not
-     * updated above have lastScanTime of <= 0 . Loop until first entry has
-     * lastModificationTime > 0.
-     */    
-    synchronized (this) {
-      final int numBlocks = Math.max(blockMap.size(), 1);
-      // Initially spread the block reads over half of scan period
-      // so that we don't keep scanning the blocks too quickly when restarted.
-      long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
-      long lastScanTime = Time.monotonicNow() - scanPeriod;
-
-      if (!blockInfoSet.isEmpty()) {
-        BlockScanInfo info;
-        while ((info =  blockInfoSet.first()).lastScanTime < 0) {
-          delBlockInfo(info);        
-          info.lastScanTime = lastScanTime;
-          lastScanTime += verifyInterval;
-          addBlockInfo(info, false);
-        }
-      }
-    }
-    
-    return true;
-  }
-  
-  private synchronized void updateBytesLeft(long len) {
-    bytesLeft += len;
-  }
-  
-  private synchronized void startNewPeriod() {
-    LOG.info("Starting a new period : work left in prev period : "
-        + String.format("%.2f%%", totalBytesToScan == 0 ? 0
-            : (bytesLeft * 100.0) / totalBytesToScan));
-
-    // reset the byte counts :
-    bytesLeft = totalBytesToScan;
-    currentPeriodStart = Time.monotonicNow();
-    isNewPeriod = true;
-  }
-  
-  private synchronized boolean workRemainingInCurrentPeriod() {
-    if (bytesLeft <= 0 && Time.monotonicNow() < currentPeriodStart + scanPeriod) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
-                  currentPeriodStart + ", period=" + scanPeriod + ", now=" +
-                  Time.monotonicNow() + " " + blockPoolId);
-      }
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-  void scanBlockPoolSlice() {
-    if (!workRemainingInCurrentPeriod()) {
-      return;
-    }
-
-    // Create a new processedBlocks structure
-    processedBlocks = new HashMap<Long, Integer>();
-    if (!assignInitialVerificationTimes()) {
-      return;
-    }
-    // Start scanning
-    try {
-      scan();
-    } finally {
-      totalBlocksScannedInLastRun.set(processedBlocks.size());
-      lastScanTime.set(Time.monotonicNow());
-    }
-  }
-
-  /**
-   * Shuts down this BlockPoolSliceScanner and releases any internal resources.
-   */
-  void shutdown() {
-    if (verificationLog != null) {
-      verificationLog.close();
-    }
-  }
-  
-  private void scan() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Starting to scan blockpool: " + blockPoolId);
-    }
-    try {
-      adjustThrottler();
-        
-      while (datanode.shouldRun
-          && !datanode.blockScanner.blockScannerThread.isInterrupted()
-          && datanode.isBPServiceAlive(blockPoolId)) {
-        long now = Time.monotonicNow();
-        synchronized (this) {
-          if ( now >= (currentPeriodStart + scanPeriod)) {
-            startNewPeriod();
-          }
-        }
-        if (((now - getEarliestScanTime()) >= scanPeriod)
-            || ((!blockInfoSet.isEmpty()) && !(this.isFirstBlockProcessed()))) {
-          verifyFirstBlock();
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("All remaining blocks were processed recently, "
-                + "so this run is complete");
-          }
-          break;
-        }
-      }
-    } catch (RuntimeException e) {
-      LOG.warn("RuntimeException during BlockPoolScanner.scan()", e);
-      throw e;
-    } finally {
-      rollVerificationLogs();
-      rollNewBlocksInfo();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Done scanning block pool: " + blockPoolId);
-      }
-    }
-  }
-
-  // add new blocks to scan in next iteration
-  private synchronized void rollNewBlocksInfo() {
-    for (BlockScanInfo newBlock : newBlockInfoSet) {
-      blockInfoSet.add(newBlock);
-    }
-    newBlockInfoSet.clear();
-  }
-
-  private synchronized void rollVerificationLogs() {
-    if (verificationLog != null) {
-      try {
-        verificationLog.logs.roll();
-      } catch (IOException ex) {
-        LOG.warn("Received exception: ", ex);
-        verificationLog.close();
-      }
-    }
-  }
-
-  
-  synchronized void printBlockReport(StringBuilder buffer, 
-                                     boolean summaryOnly) {
-    long oneHour = 3600*1000;
-    long oneDay = 24*oneHour;
-    long oneWeek = 7*oneDay;
-    long fourWeeks = 4*oneWeek;
-    
-    int inOneHour = 0;
-    int inOneDay = 0;
-    int inOneWeek = 0;
-    int inFourWeeks = 0;
-    int inScanPeriod = 0;
-    int neverScanned = 0;
-    
-    DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
-    
-    int total = blockInfoSet.size();
-    
-    long now = Time.monotonicNow();
-    
-    Date date = new Date();
-    
-    for(Iterator<BlockScanInfo> it = blockInfoSet.iterator(); it.hasNext();) {
-      BlockScanInfo info = it.next();
-      
-      long scanTime = info.getLastScanTime();
-      long diff = now - scanTime;
-      
-      if (diff <= oneHour) inOneHour++;
-      if (diff <= oneDay) inOneDay++;
-      if (diff <= oneWeek) inOneWeek++;
-      if (diff <= fourWeeks) inFourWeeks++;
-      if (diff <= scanPeriod) inScanPeriod++;      
-      if (scanTime <= 0) neverScanned++;
-      
-      if (!summaryOnly) {
-        date.setTime(scanTime);
-        String scanType = 
-          (info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none"; 
-        buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
-                                    " scan time : " +
-                                    "%-15d %s%n", info,
-                                    (info.lastScanOk ? "ok" : "failed"),
-                                    scanType, scanTime,
-                                    (scanTime <= 0) ? "not yet verified" : 
-                                      dateFormat.format(date)));
-      }
-    }
-    
-    double pctPeriodLeft = (scanPeriod + currentPeriodStart - now)
-                           *100.0/scanPeriod;
-    double pctProgress = (totalBytesToScan == 0) ? 100 :
-                         (totalBytesToScan-bytesLeft)*100.0/totalBytesToScan;
-                         
-    buffer.append(String.format("%nTotal Blocks                 : %6d" +
-                                "%nVerified in last hour        : %6d" +
-                                "%nVerified in last day         : %6d" +
-                                "%nVerified in last week        : %6d" +
-                                "%nVerified in last four weeks  : %6d" +
-                                "%nVerified in SCAN_PERIOD      : %6d" +
-                                "%nNot yet verified             : %6d" +
-                                "%nVerified since restart       : %6d" +
-                                "%nScans since restart          : %6d" +
-                                "%nScan errors since restart    : %6d" +
-                                "%nTransient scan errors        : %6d" +
-                                "%nCurrent scan rate limit KBps : %6d" +
-                                "%nProgress this period         : %6.0f%%" +
-                                "%nTime left in cur period      : %6.2f%%" +
-                                "%n", 
-                                total, inOneHour, inOneDay, inOneWeek,
-                                inFourWeeks, inScanPeriod, neverScanned,
-                                totalScans, totalScans, 
-                                totalScanErrors, totalTransientErrors, 
-                                Math.round(throttler.getBandwidth()/1024.0),
-                                pctProgress, pctPeriodLeft));
-  }
-  
-  /**
-   * This class takes care of log file used to store the last verification
-   * times of the blocks.
-   */
-  private static class LogFileHandler {
-    private final DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
-
-    private final RollingLogs logs;
-
-    private LogFileHandler(RollingLogs logs)  {
-      this.logs = logs;
-    }
-
-    void append(long verificationTime, long genStamp, long blockId) {
-      final String m = LogEntry.toString(verificationTime, genStamp, blockId,
-          dateFormat);
-      try {
-        logs.appender().append(m);
-      } catch (IOException e) {
-        LOG.warn("Failed to append to " + logs + ", m=" + m, e);
-      }
-    }
-
-    void close() {
-      try {
-        logs.appender().close();
-      } catch (IOException e) {
-        LOG.warn("Failed to close the appender of " + logs, e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index df8dd5c..12041a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -193,20 +193,12 @@ class BlockReceiver implements Closeable {
           break;
         case PIPELINE_SETUP_APPEND:
           replicaHandler = datanode.data.append(block, newGs, minBytesRcvd);
-          if (datanode.blockScanner != null) { // remove from block scanner
-            datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
-                block.getLocalBlock());
-          }
           block.setGenerationStamp(newGs);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
           replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
-          if (datanode.blockScanner != null) { // remove from block scanner
-            datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
-                block.getLocalBlock());
-          }
           block.setGenerationStamp(newGs);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
new file mode 100644
index 0000000..7429fff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+@InterfaceAudience.Private
+public class BlockScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockScanner.class);
+
+  /**
+   * The DataNode that this scanner is associated with.
+   */
+  private final DataNode datanode;
+
+  /**
+   * Maps Storage IDs to VolumeScanner objects.
+   */
+  private final TreeMap<String, VolumeScanner> scanners =
+      new TreeMap<String, VolumeScanner>();
+
+  /**
+   * The scanner configuration.
+   */
+  private final Conf conf;
+
+  /**
+   * The cached scanner configuration.
+   */
+  static class Conf {
+    // These are a few internal configuration keys used for unit tests.
+    // They can't be set unless the static boolean allowUnitTestSettings has
+    // been set to true.
+
+    @VisibleForTesting
+    static final String INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS =
+        "internal.dfs.datanode.scan.period.ms.key";
+
+    @VisibleForTesting
+    static final String INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER =
+        "internal.volume.scanner.scan.result.handler";
+
+    @VisibleForTesting
+    static final String INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS =
+        "internal.dfs.block.scanner.max_staleness.ms";
+
+    @VisibleForTesting
+    static final long INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT =
+        TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
+
+    @VisibleForTesting
+    static final String INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS =
+        "dfs.block.scanner.cursor.save.interval.ms";
+
+    @VisibleForTesting
+    static final long
+        INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT =
+            TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
+    static boolean allowUnitTestSettings = false;
+    final long targetBytesPerSec;
+    final long maxStalenessMs;
+    final long scanPeriodMs;
+    final long cursorSaveMs;
+    final Class<? extends ScanResultHandler> resultHandler;
+
+    private static long getUnitTestLong(Configuration conf, String key,
+                                        long defVal) {
+      if (allowUnitTestSettings) {
+        return conf.getLong(key, defVal);
+      } else {
+        return defVal;
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    Conf(Configuration conf) {
+      this.targetBytesPerSec = Math.max(0L, conf.getLong(
+          DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
+          DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT));
+      this.maxStalenessMs = Math.max(0L, getUnitTestLong(conf,
+          INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS,
+          INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT));
+      this.scanPeriodMs = Math.max(0L,
+          getUnitTestLong(conf, INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS,
+              TimeUnit.MILLISECONDS.convert(conf.getLong(
+                  DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
+                  DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT), TimeUnit.HOURS)));
+      this.cursorSaveMs = Math.max(0L, getUnitTestLong(conf,
+          INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS,
+          INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT));
+      if (allowUnitTestSettings) {
+        this.resultHandler = (Class<? extends ScanResultHandler>)
+            conf.getClass(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+                          ScanResultHandler.class);
+      } else {
+        this.resultHandler = ScanResultHandler.class;
+      }
+    }
+  }
+
+  public BlockScanner(DataNode datanode, Configuration conf) {
+    this.datanode = datanode;
+    this.conf = new Conf(conf);
+    if (isEnabled()) {
+      LOG.info("Initialized block scanner with targetBytesPerSec {}",
+          this.conf.targetBytesPerSec);
+    } else {
+      LOG.info("Disabled block scanner.");
+    }
+  }
+
+  /**
+   * Returns true if the block scanner is enabled.<p/>
+   *
+   * If the block scanner is disabled, no volume scanners will be created, and
+   * no threads will start.
+   */
+  public boolean isEnabled() {
+    return (conf.scanPeriodMs) > 0 && (conf.targetBytesPerSec > 0);
+  }
+
+ /**
+  * Set up a scanner for the given block pool and volume.
+  *
+  * @param ref              A reference to the volume.
+  */
+  public synchronized void addVolumeScanner(FsVolumeReference ref) {
+    boolean success = false;
+    try {
+      FsVolumeSpi volume = ref.getVolume();
+      if (!isEnabled()) {
+        LOG.debug("Not adding volume scanner for {}, because the block " +
+            "scanner is disabled.", volume.getBasePath());
+        return;
+      }
+      VolumeScanner scanner = scanners.get(volume.getStorageID());
+      if (scanner != null) {
+        LOG.error("Already have a scanner for volume {}.",
+            volume.getBasePath());
+        return;
+      }
+      LOG.debug("Adding scanner for volume {} (StorageID {})",
+          volume.getBasePath(), volume.getStorageID());
+      scanner = new VolumeScanner(conf, datanode, ref);
+      scanner.start();
+      scanners.put(volume.getStorageID(), scanner);
+      success = true;
+    } finally {
+      if (!success) {
+        // If we didn't create a new VolumeScanner object, we don't
+        // need this reference to the volume.
+        IOUtils.cleanup(null, ref);
+      }
+    }
+  }
+
+  /**
+   * Stops and removes a volume scanner.<p/>
+   *
+   * This function will block until the volume scanner has stopped.
+   *
+   * @param volume           The volume to remove.
+   */
+  public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
+    if (!isEnabled()) {
+      LOG.debug("Not removing volume scanner for {}, because the block " +
+          "scanner is disabled.", volume.getStorageID());
+      return;
+    }
+    VolumeScanner scanner = scanners.get(volume.getStorageID());
+    if (scanner == null) {
+      LOG.warn("No scanner found to remove for volumeId {}",
+          volume.getStorageID());
+      return;
+    }
+    LOG.info("Removing scanner for volume {} (StorageID {})",
+        volume.getBasePath(), volume.getStorageID());
+    scanner.shutdown();
+    scanners.remove(volume.getStorageID());
+    Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
+  }
+
+  /**
+   * Stops and removes all volume scanners.<p/>
+   *
+   * This function will block until all the volume scanners have stopped.
+   */
+  public synchronized void removeAllVolumeScanners() {
+    for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+      entry.getValue().shutdown();
+    }
+    for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+      Uninterruptibles.joinUninterruptibly(entry.getValue(),
+          5, TimeUnit.MINUTES);
+    }
+    scanners.clear();
+  }
+
+  /**
+   * Enable scanning a given block pool id.
+   *
+   * @param bpid        The block pool id to enable scanning for.
+   */
+  synchronized void enableBlockPoolId(String bpid) {
+    Preconditions.checkNotNull(bpid);
+    for (VolumeScanner scanner : scanners.values()) {
+      scanner.enableBlockPoolId(bpid);
+    }
+  }
+
+  /**
+   * Disable scanning a given block pool id.
+   *
+   * @param bpid        The block pool id to disable scanning for.
+   */
+  synchronized void disableBlockPoolId(String bpid) {
+    Preconditions.checkNotNull(bpid);
+    for (VolumeScanner scanner : scanners.values()) {
+      scanner.disableBlockPoolId(bpid);
+    }
+  }
+
+  @VisibleForTesting
+  synchronized VolumeScanner.Statistics getVolumeStats(String volumeId) {
+    VolumeScanner scanner = scanners.get(volumeId);
+    if (scanner == null) {
+      return null;
+    }
+    return scanner.getStatistics();
+  }
+
+  synchronized void printStats(StringBuilder p) {
+    // print out all bpids that we're scanning ?
+    for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+      entry.getValue().printStats(p);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public static class Servlet extends HttpServlet {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void doGet(HttpServletRequest request,
+                      HttpServletResponse response) throws IOException {
+      response.setContentType("text/plain");
+
+      DataNode datanode = (DataNode)
+          getServletContext().getAttribute("datanode");
+      BlockScanner blockScanner = datanode.getBlockScanner();
+
+      StringBuilder buffer = new StringBuilder(8 * 1024);
+      if (!blockScanner.isEnabled()) {
+        LOG.warn("Periodic block scanner is not running");
+        buffer.append("Periodic block scanner is not running. " +
+            "Please check the datanode log if this is unexpected.");
+      } else {
+        buffer.append("Block Scanner Statistics\n\n");
+        blockScanner.printStats(buffer);
+      }
+      String resp = buffer.toString();
+      LOG.trace("Returned Servlet info {}", resp);
+      response.getWriter().write(resp);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 2d312d7..182b366 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -600,9 +600,6 @@ class BlockSender implements java.io.Closeable {
         String ioem = e.getMessage();
         if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
           LOG.error("BlockSender.sendChunks() exception: ", e);
-          //Something might be wrong with the block. Make this block the high 
-          //priority block for verification.
-          datanode.blockScanner.addBlock(block, true);
         }
       }
       throw ioeToSocketException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
deleted file mode 100644
index 450c2b1..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-import java.util.TreeMap;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * DataBlockScanner manages block scanning for all the block pools. For each
- * block pool a {@link BlockPoolSliceScanner} is created which runs in a separate
- * thread to scan the blocks for that block pool. When a {@link BPOfferService}
- * becomes alive or dies, blockPoolScannerMap in this class is updated.
- */
-@InterfaceAudience.Private
-public class DataBlockScanner implements Runnable {
-  public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
-  private final DataNode datanode;
-  private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
-  private final Configuration conf;
-  
-  static final int SLEEP_PERIOD_MS = 5 * 1000;
-
-  /**
-   * Map to find the BlockPoolScanner for a given block pool id. This is updated
-   * when a BPOfferService becomes alive or dies.
-   */
-  private final TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap = 
-    new TreeMap<String, BlockPoolSliceScanner>();
-  Thread blockScannerThread = null;
-  
-  DataBlockScanner(DataNode datanode,
-      FsDatasetSpi<? extends FsVolumeSpi> dataset,
-      Configuration conf) {
-    this.datanode = datanode;
-    this.dataset = dataset;
-    this.conf = conf;
-  }
-  
-  @Override
-  public void run() {
-    String currentBpId = "";
-    boolean firstRun = true;
-    while (datanode.shouldRun && !Thread.interrupted()) {
-      //Sleep everytime except in the first iteration.
-      if (!firstRun) {
-        try {
-          Thread.sleep(SLEEP_PERIOD_MS);
-        } catch (InterruptedException ex) {
-          // Interrupt itself again to set the interrupt status
-          blockScannerThread.interrupt();
-          continue;
-        }
-      } else {
-        firstRun = false;
-      }
-      
-      BlockPoolSliceScanner bpScanner = getNextBPScanner(currentBpId);
-      if (bpScanner == null) {
-        // Possible if thread is interrupted
-        continue;
-      }
-      currentBpId = bpScanner.getBlockPoolId();
-      // If BPOfferService for this pool is not alive, don't process it
-      if (!datanode.isBPServiceAlive(currentBpId)) {
-        LOG.warn("Block Pool " + currentBpId + " is not alive");
-        // Remove in case BP service died abruptly without proper shutdown
-        removeBlockPool(currentBpId);
-        continue;
-      }
-      bpScanner.scanBlockPoolSlice();
-    }
-
-    // Call shutdown for each allocated BlockPoolSliceScanner.
-    for (BlockPoolSliceScanner bpss: blockPoolScannerMap.values()) {
-      bpss.shutdown();
-    }
-  }
-
-  // Wait for at least one block pool to be up
-  private void waitForInit() {
-    while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
-        || (getBlockPoolSetSize() < 1)) {
-      try {
-        Thread.sleep(SLEEP_PERIOD_MS);
-      } catch (InterruptedException e) {
-        blockScannerThread.interrupt();
-        return;
-      }
-    }
-  }
-  
-  /**
-   * Find next block pool id to scan. There should be only one current
-   * verification log file. Find which block pool contains the current
-   * verification log file and that is used as the starting block pool id. If no
-   * current files are found start with first block-pool in the blockPoolSet.
-   * However, if more than one current files are found, the one with latest 
-   * modification time is used to find the next block pool id.
-   */
-  private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
-    
-    String nextBpId = null;
-    while (datanode.shouldRun && !blockScannerThread.isInterrupted()) {
-      waitForInit();
-      synchronized (this) {
-        if (getBlockPoolSetSize() > 0) {          
-          // Find nextBpId by the minimum of the last scan time
-          long lastScanTime = 0;
-          for (String bpid : blockPoolScannerMap.keySet()) {
-            final long t = getBPScanner(bpid).getLastScanTime();
-            if (t != 0L) {
-              if (bpid == null || t < lastScanTime) {
-                lastScanTime =  t;
-                nextBpId = bpid;
-              }
-            }
-          }
-          
-          // nextBpId can still be null if no current log is found,
-          // find nextBpId sequentially.
-          if (nextBpId == null) {
-            nextBpId = blockPoolScannerMap.higherKey(currentBpId);
-            if (nextBpId == null) {
-              nextBpId = blockPoolScannerMap.firstKey();
-            }
-          }
-          if (nextBpId != null) {
-            return getBPScanner(nextBpId);
-          }
-        }
-      }
-      LOG.warn("No block pool is up, going to wait");
-      try {
-        Thread.sleep(5000);
-      } catch (InterruptedException ex) {
-        LOG.warn("Received exception: " + ex);
-        blockScannerThread.interrupt();
-        return null;
-      }
-    }
-    return null;
-  }
-
-  private synchronized int getBlockPoolSetSize() {
-    return blockPoolScannerMap.size();
-  }
-  
-  @VisibleForTesting
-  synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
-    return blockPoolScannerMap.get(bpid);
-  }
-  
-  private synchronized String[] getBpIdList() {
-    return blockPoolScannerMap.keySet().toArray(
-        new String[blockPoolScannerMap.keySet().size()]);
-  }
-  
-  public void addBlock(ExtendedBlock block, boolean scanNow) {
-    BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
-    if (bpScanner != null) {
-      bpScanner.addBlock(block, scanNow);
-    } else {
-      LOG.warn("No block pool scanner found for block pool id: "
-          + block.getBlockPoolId());
-    }
-  }
-  
-  boolean isInitialized(String bpid) {
-    return getBPScanner(bpid) != null;
-  }
-
-  public synchronized void printBlockReport(StringBuilder buffer,
-      boolean summary) {
-    String[] bpIdList = getBpIdList();
-    if (bpIdList == null || bpIdList.length == 0) {
-      buffer.append("Periodic block scanner is not yet initialized. "
-          + "Please check back again after some time.");
-      return;
-    }
-    for (String bpid : bpIdList) {
-      BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
-      buffer.append("\n\nBlock report for block pool: "+bpid + "\n");
-      bpScanner.printBlockReport(buffer, summary);
-      buffer.append("\n");
-    }
-  }
-  
-  public void deleteBlock(String poolId, Block toDelete) {
-    BlockPoolSliceScanner bpScanner = getBPScanner(poolId);
-    if (bpScanner != null) {
-      bpScanner.deleteBlock(toDelete);
-    } else {
-      LOG.warn("No block pool scanner found for block pool id: "
-          + poolId);
-    }
-  }
-
-  public void deleteBlocks(String poolId, Block[] toDelete) {
-    BlockPoolSliceScanner bpScanner = getBPScanner(poolId);
-    if (bpScanner != null) {
-      bpScanner.deleteBlocks(toDelete);
-    } else {
-      LOG.warn("No block pool scanner found for block pool id: "
-          + poolId);
-    }
-  }
-  
-  public void shutdown() {
-    synchronized (this) {
-      if (blockScannerThread != null) {
-        blockScannerThread.interrupt();
-      }
-    }
-
-    // We cannot join within the synchronized block, because it would create a
-    // deadlock situation.  blockScannerThread calls other synchronized methods.
-    if (blockScannerThread != null) {
-      try {
-        blockScannerThread.join();
-      } catch (InterruptedException e) {
-        // shutting down anyway
-      }
-    }
-  }
-
-  public synchronized void addBlockPool(String blockPoolId) {
-    if (blockPoolScannerMap.get(blockPoolId) != null) {
-      return;
-    }
-    BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId,
-        datanode, dataset, conf);
-    blockPoolScannerMap.put(blockPoolId, bpScanner);
-    LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="
-        + blockPoolScannerMap.size());
-  }
-  
-  public synchronized void removeBlockPool(String blockPoolId) {
-    BlockPoolSliceScanner bpss = blockPoolScannerMap.remove(blockPoolId);
-    if (bpss != null) {
-      bpss.shutdown();
-    }
-    LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
-  }
-  
-  @VisibleForTesting
-  long getBlocksScannedInLastRun(String bpid) throws IOException {
-    BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
-    if (bpScanner == null) {
-      throw new IOException("Block Pool: "+bpid+" is not running");
-    } else {
-      return bpScanner.getBlocksScannedInLastRun();
-    }
-  }
-
-  @VisibleForTesting
-  long getTotalScans(String bpid) throws IOException {
-    BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
-    if (bpScanner == null) {
-      throw new IOException("Block Pool: "+bpid+" is not running");
-    } else {
-      return bpScanner.getTotalScans();
-    }
-  }
-
-  @VisibleForTesting
-  public void setLastScanTimeDifference(ExtendedBlock block, int lastScanTimeDifference) {
-    BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
-    if (bpScanner != null) {
-      bpScanner.setLastScanTimeDifference(lastScanTimeDifference);
-    } else {
-      LOG.warn("No block pool scanner found for block pool id: "
-          + block.getBlockPoolId());
-    }
-  }
-  
-  public void start() {
-    blockScannerThread = new Thread(this);
-    blockScannerThread.setDaemon(true);
-    blockScannerThread.start();
-  }
-  
-  @InterfaceAudience.Private
-  public static class Servlet extends HttpServlet {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void doGet(HttpServletRequest request, 
-                      HttpServletResponse response) throws IOException {
-      response.setContentType("text/plain");
-      
-      DataNode datanode = (DataNode) getServletContext().getAttribute("datanode");
-      DataBlockScanner blockScanner = datanode.blockScanner;
-      
-      boolean summary = (request.getParameter("listblocks") == null);
-      
-      StringBuilder buffer = new StringBuilder(8*1024);
-      if (blockScanner == null) {
-        LOG.warn("Periodic block scanner is not running");
-        buffer.append("Periodic block scanner is not running. " +
-                      "Please check the datanode log if this is unexpected.");
-      } else {
-        blockScanner.printBlockReport(buffer, summary);
-      }
-      response.getWriter().write(buffer.toString()); // extra copy!
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 12df9d6..c77bc3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -316,7 +316,7 @@ public class DataNode extends ReconfigurableBase
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   private boolean hasAnyBlockPoolRegistered = false;
   
-  volatile DataBlockScanner blockScanner = null;
+  private final BlockScanner blockScanner;
   private DirectoryScanner directoryScanner = null;
   
   /** Activated plug-ins. */
@@ -365,6 +365,7 @@ public class DataNode extends ReconfigurableBase
     this.usersWithLocalPathAccess = null;
     this.connectToDnViaHostname = false;
     this.getHdfsBlockLocationsEnabled = false;
+    this.blockScanner = new BlockScanner(this, conf);
   }
 
   /**
@@ -375,6 +376,7 @@ public class DataNode extends ReconfigurableBase
            final List<StorageLocation> dataDirs,
            final SecureResources resources) throws IOException {
     super(conf);
+    this.blockScanner = new BlockScanner(this, conf);
     this.lastDiskErrorCheck = 0;
     this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
         DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
@@ -671,7 +673,8 @@ public class DataNode extends ReconfigurableBase
     this.infoServer.setAttribute("datanode", this);
     this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
     this.infoServer.addServlet(null, "/blockScannerReport",
-                               DataBlockScanner.Servlet.class);
+                               BlockScanner.Servlet.class);
+
     this.infoServer.start();
     InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
 
@@ -772,56 +775,12 @@ public class DataNode extends ReconfigurableBase
     // Not a superuser.
     throw new AccessControlException();
   }
-  
-/**
- * Initialize the datanode's periodic scanners:
- *     {@link DataBlockScanner}
- *     {@link DirectoryScanner}
- * They report results on a per-blockpool basis but do their scanning 
- * on a per-Volume basis to minimize competition for disk iops.
- * 
- * @param conf - Configuration has the run intervals and other 
- *               parameters for these periodic scanners
- */
-  private void initPeriodicScanners(Configuration conf) {
-    initDataBlockScanner(conf);
-    initDirectoryScanner(conf);
-  }
-  
+
   private void shutdownPeriodicScanners() {
     shutdownDirectoryScanner();
-    shutdownDataBlockScanner();
-  }
-  
-  /**
-   * See {@link DataBlockScanner}
-   */
-  private synchronized void initDataBlockScanner(Configuration conf) {
-    if (blockScanner != null) {
-      return;
-    }
-    String reason = null;
-    assert data != null;
-    if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
-                    DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
-      reason = "verification is turned off by configuration";
-    } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
-      reason = "verifcation is not supported by SimulatedFSDataset";
-    } 
-    if (reason == null) {
-      blockScanner = new DataBlockScanner(this, data, conf);
-      blockScanner.start();
-    } else {
-      LOG.info("Periodic Block Verification scan disabled because " + reason);
-    }
+    blockScanner.removeAllVolumeScanners();
   }
-  
-  private void shutdownDataBlockScanner() {
-    if (blockScanner != null) {
-      blockScanner.shutdown();
-    }
-  }
-  
+
   /**
    * See {@link DirectoryScanner}
    */
@@ -1250,9 +1209,8 @@ public class DataNode extends ReconfigurableBase
       // registering anywhere. If that's the case, we wouldn't have
       // a block pool id
       String bpId = bpos.getBlockPoolId();
-      if (blockScanner != null) {
-        blockScanner.removeBlockPool(bpId);
-      }
+
+      blockScanner.disableBlockPoolId(bpId);
 
       if (data != null) {
         data.shutdownBlockPool(bpId);
@@ -1296,9 +1254,9 @@ public class DataNode extends ReconfigurableBase
     // failures.
     checkDiskError();
 
-    initPeriodicScanners(conf);
-    
+    initDirectoryScanner(conf);
     data.addBlockPool(nsInfo.getBlockPoolID(), conf);
+    blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
   }
 
   BPOfferService[] getAllBpOs() {
@@ -2168,10 +2126,6 @@ public class DataNode extends ReconfigurableBase
       LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
           + block.getBlockPoolId());
     }
-    FsVolumeSpi volume = getFSDataset().getVolume(block);
-    if (blockScanner != null && !volume.isTransientStorage()) {
-      blockScanner.addBlock(block, false);
-    }
   }
 
   /** Start a single datanode daemon and wait for it to finish.
@@ -2445,8 +2399,9 @@ public class DataNode extends ReconfigurableBase
     return data;
   }
 
+  @VisibleForTesting
   /** @return the block scanner. */
-  public DataBlockScanner getBlockScanner() {
+  public BlockScanner getBlockScanner() {
     return blockScanner;
   }
 


[3/4] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)

Posted by cm...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
new file mode 100644
index 0000000..781b4d3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * VolumeScanner scans a single volume.  Each VolumeScanner has its own thread.<p/>
+ * They are all managed by the DataNode's BlockScanner.
+ */
+public class VolumeScanner extends Thread {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(VolumeScanner.class);
+
+  /**
+   * Number of seconds in a minute.
+   */
+  private final static int SECONDS_PER_MINUTE = 60;
+
+  /**
+   * Number of minutes in an hour.
+   */
+  private final static int MINUTES_PER_HOUR = 60;
+
+  /**
+   * Name of the block iterator used by this scanner.
+   */
+  private final static String BLOCK_ITERATOR_NAME = "scanner";
+
+  /**
+   * The configuration.
+   */
+  private final Conf conf;
+
+  /**
+   * The DataNode this VolumEscanner is associated with.
+   */
+  private final DataNode datanode;
+
+  /**
+   * A reference to the volume that we're scanning.
+   */
+  private final FsVolumeReference ref;
+
+  /**
+   * The volume that we're scanning.
+   */
+  final FsVolumeSpi volume;
+
+  /**
+   * The number of scanned bytes in each minute of the last hour.<p/>
+   *
+   * This array is managed as a circular buffer.  We take the monotonic time and
+   * divide it up into one-minute periods.  Each entry in the array represents
+   * how many bytes were scanned during that period.
+   */
+  private final long scannedBytes[] = new long[MINUTES_PER_HOUR];
+
+  /**
+   * The sum of all the values of scannedBytes.
+   */
+  private long scannedBytesSum = 0;
+
+  /**
+   * The throttler to use with BlockSender objects.
+   */
+  private final DataTransferThrottler throttler = new DataTransferThrottler(1);
+
+  /**
+   * The null output stream to use with BlockSender objects.
+   */
+  private final DataOutputStream nullStream =
+      new DataOutputStream(new IOUtils.NullOutputStream());
+
+  /**
+   * The block iterators associated with this VolumeScanner.<p/>
+   *
+   * Each block pool has its own BlockIterator.
+   */
+  private final List<BlockIterator> blockIters =
+      new LinkedList<BlockIterator>();
+
+  /**
+   * The current block iterator, or null if there is none.
+   */
+  private BlockIterator curBlockIter = null;
+
+  /**
+   * True if the thread is stopping.<p/>
+   * Protected by this object's lock.
+   */
+  private boolean stopping = false;
+
+  /**
+   * The current minute, in monotonic terms.
+   */
+  private long curMinute = 0;
+
+  /**
+   * Handles scan results.
+   */
+  private final ScanResultHandler resultHandler;
+
+  private final Statistics stats = new Statistics();
+
+  static class Statistics {
+    long bytesScannedInPastHour = 0;
+    long blocksScannedInCurrentPeriod = 0;
+    long blocksScannedSinceRestart = 0;
+    long scansSinceRestart = 0;
+    long scanErrorsSinceRestart = 0;
+    long nextBlockPoolScanStartMs = -1;
+    long blockPoolPeriodEndsMs = -1;
+    ExtendedBlock lastBlockScanned = null;
+    boolean eof = false;
+
+    Statistics() {
+    }
+
+    Statistics(Statistics other) {
+      this.bytesScannedInPastHour = other.bytesScannedInPastHour;
+      this.blocksScannedInCurrentPeriod = other.blocksScannedInCurrentPeriod;
+      this.blocksScannedSinceRestart = other.blocksScannedSinceRestart;
+      this.scansSinceRestart = other.scansSinceRestart;
+      this.scanErrorsSinceRestart = other.scanErrorsSinceRestart;
+      this.nextBlockPoolScanStartMs = other.nextBlockPoolScanStartMs;
+      this.blockPoolPeriodEndsMs = other.blockPoolPeriodEndsMs;
+      this.lastBlockScanned = other.lastBlockScanned;
+      this.eof = other.eof;
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().
+          append("Statistics{").
+          append("bytesScannedInPastHour=").append(bytesScannedInPastHour).
+          append(", blocksScannedInCurrentPeriod=").
+              append(blocksScannedInCurrentPeriod).
+          append(", blocksScannedSinceRestart=").
+              append(blocksScannedSinceRestart).
+          append(", scansSinceRestart=").append(scansSinceRestart).
+          append(", scanErrorsSinceRestart=").append(scanErrorsSinceRestart).
+          append(", nextBlockPoolScanStartMs=").append(nextBlockPoolScanStartMs).
+          append(", blockPoolPeriodEndsMs=").append(blockPoolPeriodEndsMs).
+          append(", lastBlockScanned=").append(lastBlockScanned).
+          append(", eof=").append(eof).
+          append("}").toString();
+    }
+  }
+
+  private static double positiveMsToHours(long ms) {
+    if (ms <= 0) {
+      return 0;
+    } else {
+      return TimeUnit.HOURS.convert(ms, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  public void printStats(StringBuilder p) {
+    p.append("Block scanner information for volume " +
+        volume.getStorageID() + " with base path " + volume.getBasePath() +
+        "%n");
+    synchronized (stats) {
+      p.append(String.format("Bytes verified in last hour       : %57d%n",
+          stats.bytesScannedInPastHour));
+      p.append(String.format("Blocks scanned in current period  : %57d%n",
+          stats.blocksScannedInCurrentPeriod));
+      p.append(String.format("Blocks scanned since restart      : %57d%n",
+          stats.blocksScannedSinceRestart));
+      p.append(String.format("Block pool scans since restart    : %57d%n",
+          stats.scansSinceRestart));
+      p.append(String.format("Block scan errors since restart   : %57d%n",
+          stats.scanErrorsSinceRestart));
+      if (stats.nextBlockPoolScanStartMs > 0) {
+        p.append(String.format("Hours until next block pool scan  : %57.3f%n",
+            positiveMsToHours(stats.nextBlockPoolScanStartMs -
+                Time.monotonicNow())));
+      }
+      if (stats.blockPoolPeriodEndsMs > 0) {
+        p.append(String.format("Hours until possible pool rescan  : %57.3f%n",
+            positiveMsToHours(stats.blockPoolPeriodEndsMs -
+                Time.now())));
+      }
+      p.append(String.format("Last block scanned                : %57s%n",
+          ((stats.lastBlockScanned == null) ? "none" :
+          stats.lastBlockScanned.toString())));
+      p.append(String.format("More blocks to scan in period     : %57s%n",
+          !stats.eof));
+      p.append("%n");
+    }
+  }
+
+  static class ScanResultHandler {
+    private VolumeScanner scanner;
+
+    public void setup(VolumeScanner scanner) {
+      LOG.trace("Starting VolumeScanner {}",
+          scanner.volume.getBasePath());
+      this.scanner = scanner;
+    }
+
+    public void handle(ExtendedBlock block, IOException e) {
+      FsVolumeSpi volume = scanner.volume;
+      if (e == null) {
+        LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath());
+        return;
+      }
+      // If the block does not exist anymore, then it's not an error.
+      if (!volume.getDataset().contains(block)) {
+        LOG.debug("Volume {}: block {} is no longer in the dataset.",
+            volume.getBasePath(), block);
+        return;
+      }
+      // If the block exists, the exception may due to a race with write:
+      // The BlockSender got an old block path in rbw. BlockReceiver removed
+      // the rbw block from rbw to finalized but BlockSender tried to open the
+      // file before BlockReceiver updated the VolumeMap. The state of the
+      // block can be changed again now, so ignore this error here. If there
+      // is a block really deleted by mistake, DirectoryScan should catch it.
+      if (e instanceof FileNotFoundException ) {
+        LOG.info("Volume {}: verification failed for {} because of " +
+                "FileNotFoundException.  This may be due to a race with write.",
+            volume.getBasePath(), block);
+        return;
+      }
+      LOG.warn("Reporting bad {} on {}", block, volume.getBasePath());
+      try {
+        scanner.datanode.reportBadBlocks(block);
+      } catch (IOException ie) {
+        // This is bad, but not bad enough to shut down the scanner.
+        LOG.warn("Cannot report bad " + block.getBlockId(), e);
+      }
+    }
+  }
+
+  VolumeScanner(Conf conf, DataNode datanode, FsVolumeReference ref) {
+    this.conf = conf;
+    this.datanode = datanode;
+    this.ref = ref;
+    this.volume = ref.getVolume();
+    ScanResultHandler handler;
+    try {
+      handler = conf.resultHandler.newInstance();
+    } catch (Throwable e) {
+      LOG.error("unable to instantiate {}", conf.resultHandler, e);
+      handler = new ScanResultHandler();
+    }
+    this.resultHandler = handler;
+    setName("VolumeScannerThread(" + volume.getBasePath() + ")");
+    setDaemon(true);
+  }
+
+  private void saveBlockIterator(BlockIterator iter) {
+    try {
+      iter.save();
+    } catch (IOException e) {
+      LOG.warn("{}: error saving {}.", this, iter, e);
+    }
+  }
+
+  private void expireOldScannedBytesRecords(long monotonicMs) {
+    long newMinute =
+        TimeUnit.MINUTES.convert(monotonicMs, TimeUnit.MILLISECONDS);
+    newMinute = newMinute % MINUTES_PER_HOUR;
+    if (curMinute == newMinute) {
+      return;
+    }
+    // If a minute or more has gone past since we last updated the scannedBytes
+    // array, zero out the slots corresponding to those minutes.
+    for (long m = curMinute + 1; m <= newMinute; m++) {
+      LOG.trace("{}: updateScannedBytes is zeroing out slot {}.  " +
+              "curMinute = {}; newMinute = {}", this, m % MINUTES_PER_HOUR,
+          curMinute, newMinute);
+      scannedBytesSum -= scannedBytes[(int)(m % MINUTES_PER_HOUR)];
+      scannedBytes[(int)(m % MINUTES_PER_HOUR)] = 0;
+    }
+    curMinute = newMinute;
+  }
+
+  /**
+   * Find a usable block iterator.<p/>
+   *
+   * We will consider available block iterators in order.  This property is
+   * important so that we don't keep rescanning the same block pool id over
+   * and over, while other block pools stay unscanned.<p/>
+   *
+   * A block pool is always ready to scan if the iterator is not at EOF.  If
+   * the iterator is at EOF, the block pool will be ready to scan when
+   * conf.scanPeriodMs milliseconds have elapsed since the iterator was last
+   * rewound.<p/>
+   *
+   * @return                     0 if we found a usable block iterator; the
+   *                               length of time we should delay before
+   *                               checking again otherwise.
+   */
+  private synchronized long findNextUsableBlockIter() {
+    int numBlockIters = blockIters.size();
+    if (numBlockIters == 0) {
+      LOG.debug("{}: no block pools are registered.", this);
+      return Long.MAX_VALUE;
+    }
+    int curIdx;
+    if (curBlockIter == null) {
+      curIdx = 0;
+    } else {
+      curIdx = blockIters.indexOf(curBlockIter);
+      Preconditions.checkState(curIdx >= 0);
+    }
+    // Note that this has to be wall-clock time, not monotonic time.  This is
+    // because the time saved in the cursor file is a wall-clock time.  We do
+    // not want to save a monotonic time in the cursor file, because it resets
+    // every time the machine reboots (on most platforms).
+    long nowMs = Time.now();
+    long minTimeoutMs = Long.MAX_VALUE;
+    for (int i = 0; i < numBlockIters; i++) {
+      int idx = (curIdx + i + 1) % numBlockIters;
+      BlockIterator iter = blockIters.get(idx);
+      if (!iter.atEnd()) {
+        LOG.info("Now scanning bpid {} on volume {}",
+            iter.getBlockPoolId(), volume.getBasePath());
+        curBlockIter = iter;
+        return 0L;
+      }
+      long iterStartMs = iter.getIterStartMs();
+      long waitMs = (iterStartMs + conf.scanPeriodMs) - nowMs;
+      if (waitMs <= 0) {
+        iter.rewind();
+        LOG.info("Now rescanning bpid {} on volume {}, after more than " +
+            "{} hour(s)", iter.getBlockPoolId(), volume.getBasePath(),
+            TimeUnit.HOURS.convert(conf.scanPeriodMs, TimeUnit.MILLISECONDS));
+        curBlockIter = iter;
+        return 0L;
+      }
+      minTimeoutMs = Math.min(minTimeoutMs, waitMs);
+    }
+    LOG.info("{}: no suitable block pools found to scan.  Waiting {} ms.",
+        this, minTimeoutMs);
+    return minTimeoutMs;
+  }
+
+  /**
+   * Scan a block.
+   *
+   * @param cblock               The block to scan.
+   * @param bytesPerSec          The bytes per second to scan at.
+   *
+   * @return                     The length of the block that was scanned, or
+   *                               -1 if the block could not be scanned.
+   */
+  private long scanBlock(ExtendedBlock cblock, long bytesPerSec) {
+    // 'cblock' has a valid blockId and block pool id, but we don't yet know the
+    // genstamp the block is supposed to have.  Ask the FsDatasetImpl for this
+    // information.
+    ExtendedBlock block = null;
+    try {
+      Block b = volume.getDataset().getStoredBlock(
+          cblock.getBlockPoolId(), cblock.getBlockId());
+      if (b == null) {
+        LOG.info("FileNotFound while finding block {} on volume {}",
+            cblock, volume.getBasePath());
+      } else {
+        block = new ExtendedBlock(cblock.getBlockPoolId(), b);
+      }
+    } catch (FileNotFoundException e) {
+      LOG.info("FileNotFoundException while finding block {} on volume {}",
+          cblock, volume.getBasePath());
+    } catch (IOException e) {
+      LOG.warn("I/O error while finding block {} on volume {}",
+            cblock, volume.getBasePath());
+    }
+    if (block == null) {
+      return -1; // block not found.
+    }
+    BlockSender blockSender = null;
+    try {
+      blockSender = new BlockSender(block, 0, -1,
+          false, true, true, datanode, null,
+          CachingStrategy.newDropBehind());
+      throttler.setBandwidth(bytesPerSec);
+      long bytesRead = blockSender.sendBlock(nullStream, null, throttler);
+      resultHandler.handle(block, null);
+      return bytesRead;
+    } catch (IOException e) {
+      resultHandler.handle(block, e);
+    } finally {
+      IOUtils.cleanup(null, blockSender);
+    }
+    return -1;
+  }
+
+  @VisibleForTesting
+  static boolean calculateShouldScan(long targetBytesPerSec,
+                                     long scannedBytesSum) {
+    long effectiveBytesPerSec =
+        scannedBytesSum / (SECONDS_PER_MINUTE * MINUTES_PER_HOUR);
+    boolean shouldScan = effectiveBytesPerSec <= targetBytesPerSec;
+    LOG.trace("calculateShouldScan: effectiveBytesPerSec = {}, and " +
+        "targetBytesPerSec = {}.  shouldScan = {}",
+        effectiveBytesPerSec, targetBytesPerSec, shouldScan);
+    return shouldScan;
+  }
+
+  /**
+   * Run an iteration of the VolumeScanner loop.
+   *
+   * @return     The number of milliseconds to delay before running the loop
+   *               again, or 0 to re-run the loop immediately.
+   */
+  private long runLoop() {
+    long bytesScanned = -1;
+    boolean scanError = false;
+    ExtendedBlock block = null;
+    try {
+      long monotonicMs = Time.monotonicNow();
+      expireOldScannedBytesRecords(monotonicMs);
+
+      if (!calculateShouldScan(conf.targetBytesPerSec, scannedBytesSum)) {
+        // If neededBytesPerSec is too low, then wait few seconds for some old
+        // scannedBytes records to expire.
+        return 30000L;
+      }
+
+      // Find a usable block pool to scan.
+      if ((curBlockIter == null) || curBlockIter.atEnd()) {
+        long timeout = findNextUsableBlockIter();
+        if (timeout > 0) {
+          LOG.trace("{}: no block pools are ready to scan yet.  Waiting " +
+              "{} ms.", this, timeout);
+          synchronized (stats) {
+            stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
+          }
+          return timeout;
+        }
+        synchronized (stats) {
+          stats.scansSinceRestart++;
+          stats.blocksScannedInCurrentPeriod = 0;
+          stats.nextBlockPoolScanStartMs = -1;
+        }
+        return 0L;
+      }
+
+      try {
+        block = curBlockIter.nextBlock();
+      } catch (IOException e) {
+        // There was an error listing the next block in the volume.  This is a
+        // serious issue.
+        LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
+        // On the next loop iteration, curBlockIter#eof will be set to true, and
+        // we will pick a different block iterator.
+        return 0L;
+      }
+      if (block == null) {
+        // The BlockIterator is at EOF.
+        LOG.info("{}: finished scanning block pool {}",
+            this, curBlockIter.getBlockPoolId());
+        saveBlockIterator(curBlockIter);
+        return 0;
+      }
+      long saveDelta = monotonicMs - curBlockIter.getLastSavedMs();
+      if (saveDelta >= conf.cursorSaveMs) {
+        LOG.debug("{}: saving block iterator {} after {} ms.",
+            this, curBlockIter, saveDelta);
+        saveBlockIterator(curBlockIter);
+      }
+      bytesScanned = scanBlock(block, conf.targetBytesPerSec);
+      if (bytesScanned >= 0) {
+        scannedBytesSum += bytesScanned;
+        scannedBytes[(int)(curMinute % MINUTES_PER_HOUR)] += bytesScanned;
+      } else {
+        scanError = true;
+      }
+      return 0L;
+    } finally {
+      synchronized (stats) {
+        stats.bytesScannedInPastHour = scannedBytesSum;
+        if (bytesScanned >= 0) {
+          stats.blocksScannedInCurrentPeriod++;
+          stats.blocksScannedSinceRestart++;
+        }
+        if (scanError) {
+          stats.scanErrorsSinceRestart++;
+        }
+        if (block != null) {
+          stats.lastBlockScanned = block;
+        }
+        if (curBlockIter == null) {
+          stats.eof = true;
+          stats.blockPoolPeriodEndsMs = -1;
+        } else {
+          stats.eof = curBlockIter.atEnd();
+          stats.blockPoolPeriodEndsMs =
+              curBlockIter.getIterStartMs() + conf.scanPeriodMs;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      LOG.trace("{}: thread starting.", this);
+      resultHandler.setup(this);
+      try {
+        long timeout = 0;
+        while (true) {
+          // Take the lock to check if we should stop.
+          synchronized (this) {
+            if (stopping) {
+              break;
+            }
+            if (timeout > 0) {
+              wait(timeout);
+              if (stopping) {
+                break;
+              }
+            }
+          }
+          timeout = runLoop();
+        }
+      } catch (InterruptedException e) {
+        // We are exiting because of an InterruptedException,
+        // probably sent by VolumeScanner#shutdown.
+        LOG.trace("{} exiting because of InterruptedException.", this);
+      } catch (Throwable e) {
+        LOG.error("{} exiting because of exception ", this, e);
+      }
+      LOG.info("{} exiting.", this);
+      // Save the current position of all block iterators and close them.
+      for (BlockIterator iter : blockIters) {
+        saveBlockIterator(iter);
+        IOUtils.cleanup(null, iter);
+      }
+    } finally {
+      // When the VolumeScanner exits, release the reference we were holding
+      // on the volume.  This will allow the volume to be removed later.
+      IOUtils.cleanup(null, ref);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "VolumeScanner(" + volume.getBasePath() +
+        ", " + volume.getStorageID() + ")";
+  }
+
+  /**
+   * Shut down this scanner.
+   */
+  public synchronized void shutdown() {
+    stopping = true;
+    notify();
+    this.interrupt();
+  }
+
+  /**
+   * Allow the scanner to scan the given block pool.
+   *
+   * @param bpid       The block pool id.
+   */
+  public synchronized void enableBlockPoolId(String bpid) {
+    for (BlockIterator iter : blockIters) {
+      if (iter.getBlockPoolId().equals(bpid)) {
+        LOG.warn("{}: already enabled scanning on block pool {}", this, bpid);
+        return;
+      }
+    }
+    BlockIterator iter = null;
+    try {
+      // Load a block iterator for the next block pool on the volume.
+      iter = volume.loadBlockIterator(bpid, BLOCK_ITERATOR_NAME);
+      LOG.trace("{}: loaded block iterator for {}.", this, bpid);
+    } catch (FileNotFoundException e) {
+      LOG.debug("{}: failed to load block iterator: " + e.getMessage(), this);
+    } catch (IOException e) {
+      LOG.warn("{}: failed to load block iterator.", this, e);
+    }
+    if (iter == null) {
+      iter = volume.newBlockIterator(bpid, BLOCK_ITERATOR_NAME);
+      LOG.trace("{}: created new block iterator for {}.", this, bpid);
+    }
+    iter.setMaxStalenessMs(conf.maxStalenessMs);
+    blockIters.add(iter);
+    notify();
+  }
+
+  /**
+   * Disallow the scanner from scanning the given block pool.
+   *
+   * @param bpid       The block pool id.
+   */
+  public synchronized void disableBlockPoolId(String bpid) {
+    Iterator<BlockIterator> i = blockIters.iterator();
+    while (i.hasNext()) {
+      BlockIterator iter = i.next();
+      if (iter.getBlockPoolId().equals(bpid)) {
+        LOG.trace("{}: disabling scanning on block pool {}", this, bpid);
+        i.remove();
+        IOUtils.cleanup(null, iter);
+        if (curBlockIter == iter) {
+          curBlockIter = null;
+        }
+        notify();
+        return;
+      }
+    }
+    LOG.warn("{}: can't remove block pool {}, because it was never " +
+        "added.", this, bpid);
+  }
+
+  @VisibleForTesting
+  Statistics getStatistics() {
+    synchronized (stats) {
+      return new Statistics(stats);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 0d5de81..162e306 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -90,24 +90,30 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
     }
   }
 
-  /**
-   * Create rolling logs.
-   *
-   * @param prefix the prefix of the log names.
-   * @return rolling logs
-   */
-  public RollingLogs createRollingLogs(String bpid, String prefix
-      ) throws IOException;
-
   /** @return a list of volumes. */
   public List<V> getVolumes();
 
-  /** Add an array of StorageLocation to FsDataset. */
+  /**
+   * Add a new volume to the FsDataset.<p/>
+   *
+   * If the FSDataset supports block scanning, this function registers
+   * the new volume with the block scanner.
+   *
+   * @param location      The storage location for the new volume.
+   * @param nsInfos       Namespace information for the new volume.
+   */
   public void addVolume(
       final StorageLocation location,
       final List<NamespaceInfo> nsInfos) throws IOException;
 
-  /** Removes a collection of volumes from FsDataset. */
+  /**
+   * Removes a collection of volumes from FsDataset.
+   *
+   * If the FSDataset supports block scanning, this function removes
+   * the volumes from the block scanner.
+   *
+   * @param volumes      The storage locations of the volumes to remove.
+   */
   public void removeVolumes(Collection<StorageLocation> volumes);
 
   /** @return a storage with the given storage ID */
@@ -514,6 +520,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
     /**
      * Move block from one storage to another storage
      */
-    public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
+   public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
         StorageType targetStorageType) throws IOException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index 3a635b7..1355e31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**
  * This is an interface for the underlying volume.
@@ -69,4 +71,112 @@ public interface FsVolumeSpi {
    * Release disk space previously reserved for RBW block.
    */
   public void releaseReservedSpace(long bytesToRelease);
+
+  /**
+   * BlockIterator will return ExtendedBlock entries from a block pool in
+   * this volume.  The entries will be returned in sorted order.<p/>
+   *
+   * BlockIterator objects themselves do not always have internal
+   * synchronization, so they can only safely be used by a single thread at a
+   * time.<p/>
+   *
+   * Closing the iterator does not save it.  You must call save to save it.
+   */
+  public interface BlockIterator extends Closeable {
+    /**
+     * Get the next block.<p/>
+     *
+     * Note that this block may be removed in between the time we list it,
+     * and the time the caller tries to use it, or it may represent a stale
+     * entry.  Callers should handle the case where the returned block no
+     * longer exists.
+     *
+     * @return               The next block, or null if there are no
+     *                         more blocks.  Null if there was an error
+     *                         determining the next block.
+     *
+     * @throws IOException   If there was an error getting the next block in
+     *                         this volume.  In this case, EOF will be set on
+     *                         the iterator.
+     */
+    public ExtendedBlock nextBlock() throws IOException;
+
+    /**
+     * Returns true if we got to the end of the block pool.
+     */
+    public boolean atEnd();
+
+    /**
+     * Repositions the iterator at the beginning of the block pool.
+     */
+    public void rewind();
+
+    /**
+     * Save this block iterator to the underlying volume.
+     * Any existing saved block iterator with this name will be overwritten.
+     * maxStalenessMs will not be saved.
+     *
+     * @throws IOException   If there was an error when saving the block
+     *                         iterator.
+     */
+    public void save() throws IOException;
+
+    /**
+     * Set the maximum staleness of entries that we will return.<p/>
+     *
+     * A maximum staleness of 0 means we will never return stale entries; a
+     * larger value will allow us to reduce resource consumption in exchange
+     * for returning more potentially stale entries.  Even with staleness set
+     * to 0, consumers of this API must handle race conditions where block
+     * disappear before they can be processed.
+     */
+    public void setMaxStalenessMs(long maxStalenessMs);
+
+    /**
+     * Get the wall-clock time, measured in milliseconds since the Epoch,
+     * when this iterator was created.
+     */
+    public long getIterStartMs();
+
+    /**
+     * Get the wall-clock time, measured in milliseconds since the Epoch,
+     * when this iterator was last saved.  Returns iterStartMs if the
+     * iterator was never saved.
+     */
+    public long getLastSavedMs();
+
+    /**
+     * Get the id of the block pool which this iterator traverses.
+     */
+    public String getBlockPoolId();
+  }
+
+  /**
+   * Create a new block iterator.  It will start at the beginning of the
+   * block set.
+   *
+   * @param bpid             The block pool id to iterate over.
+   * @param name             The name of the block iterator to create.
+   *
+   * @return                 The new block iterator.
+   */
+  public BlockIterator newBlockIterator(String bpid, String name);
+
+  /**
+   * Load a saved block iterator.
+   *
+   * @param bpid             The block pool id to iterate over.
+   * @param name             The name of the block iterator to load.
+   *
+   * @return                 The saved block iterator.
+   * @throws IOException     If there was an IO error loading the saved
+   *                           block iterator.
+   */
+  public BlockIterator loadBlockIterator(String bpid, String name)
+      throws IOException;
+
+  /**
+   * Get the FSDatasetSpi which this volume is a part of.
+   */
+  public FsDatasetSpi getDataset();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
deleted file mode 100644
index 5d54770..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.fsdataset;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Rolling logs consist of a current log and a set of previous logs.
- *
- * The implementation should support a single appender and multiple readers.
- */
-public interface RollingLogs {
-  /**
-   * To iterate the lines of the logs.
-   */
-  public interface LineIterator extends Iterator<String>, Closeable {
-    /** Is the iterator iterating the previous? */
-    public boolean isPrevious();
-
-    /**
-     * Is the last read entry from previous? This should be called after
-     * reading.
-     */
-    public boolean isLastReadFromPrevious();
-  }
-
-  /**
-   * To append text to the logs.
-   */
-  public interface Appender extends Appendable, Closeable {
-  }
-
-  /**
-   * Create an iterator to iterate the lines in the logs.
-   * 
-   * @param skipPrevious Should it skip reading the previous log? 
-   * @return a new iterator.
-   */
-  public LineIterator iterator(boolean skipPrevious) throws IOException;
-
-  /**
-   * @return the only appender to append text to the logs.
-   *   The same object is returned if it is invoked multiple times.
-   */
-  public Appender appender();
-
-  /**
-   * Roll current to previous.
-   *
-   * @return true if the rolling succeeded.
-   *   When it returns false, it is not equivalent to an error. 
-   *   It means that the rolling cannot be performed at the moment,
-   *   e.g. the logs are being read.
-   */
-  public boolean roll() throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index f990faf..c00d467 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -89,7 +89,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
@@ -284,7 +283,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
             RoundRobinVolumeChoosingPolicy.class,
             VolumeChoosingPolicy.class), conf);
-    volumes = new FsVolumeList(volsFailed, blockChooserImpl);
+    volumes = new FsVolumeList(volsFailed, datanode.getBlockScanner(),
+        blockChooserImpl);
     asyncDiskService = new FsDatasetAsyncDiskService(datanode);
     asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
 
@@ -312,6 +312,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // storageMap and asyncDiskService, consistent.
     FsVolumeImpl fsVolume = new FsVolumeImpl(
         this, sd.getStorageUuid(), dir, this.conf, storageType);
+    FsVolumeReference ref = fsVolume.obtainReference();
     ReplicaMap tempVolumeMap = new ReplicaMap(this);
     fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
@@ -322,7 +323,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               DatanodeStorage.State.NORMAL,
               storageType));
       asyncDiskService.addVolume(sd.getCurrentDir());
-      volumes.addVolume(fsVolume);
+      volumes.addVolume(ref);
     }
 
     LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
@@ -361,6 +362,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throw MultipleIOException.createIOException(exceptions);
     }
 
+    final FsVolumeReference ref = fsVolume.obtainReference();
     setupAsyncLazyPersistThread(fsVolume);
 
     builder.build();
@@ -371,7 +373,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               DatanodeStorage.State.NORMAL,
               storageType));
       asyncDiskService.addVolume(sd.getCurrentDir());
-      volumes.addVolume(fsVolume);
+      volumes.addVolume(ref);
     }
     LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
   }
@@ -415,9 +417,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               it.remove();
             }
           }
-          // Delete blocks from the block scanner in batch.
-          datanode.getBlockScanner().deleteBlocks(bpid,
-              blocks.toArray(new Block[blocks.size()]));
         }
 
         storageMap.remove(sd.getStorageUuid());
@@ -771,7 +770,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     // Replace the old block if any to reschedule the scanning.
-    datanode.getBlockScanner().addBlock(block, false);
     return replicaInfo;
   }
 
@@ -2006,10 +2004,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // Block is in memory and not on the disk
           // Remove the block from volumeMap
           volumeMap.remove(bpid, blockId);
-          final DataBlockScanner blockScanner = datanode.getBlockScanner();
-          if (blockScanner != null) {
-            blockScanner.deleteBlock(bpid, new Block(blockId));
-          }
           if (vol.isTransientStorage()) {
             ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
           }
@@ -2032,12 +2026,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, 
             diskFile.length(), diskGS, vol, diskFile.getParentFile());
         volumeMap.add(bpid, diskBlockInfo);
-        final DataBlockScanner blockScanner = datanode.getBlockScanner();
-        if (!vol.isTransientStorage()) {
-          if (blockScanner != null) {
-            blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo), false);
-          }
-        } else {
+        if (vol.isTransientStorage()) {
           ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
         }
         LOG.warn("Added missing block to memory " + diskBlockInfo);
@@ -2540,23 +2529,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     dataStorage.clearRollingUpgradeMarker(bpid);
   }
 
-  @Override
-  public RollingLogs createRollingLogs(String bpid, String prefix
-      ) throws IOException {
-    String dir = null;
-    final List<FsVolumeImpl> volumes = getVolumes();
-    for (FsVolumeImpl vol : volumes) {
-      String bpDir = vol.getPath(bpid);
-      if (RollingLogsImpl.isFilePresent(bpDir, prefix)) {
-        dir = bpDir;
-        break;
-      }
-    }
-    if (dir == null) {
-      dir = volumes.get(0).getPath(bpid);
-    }
-    return new RollingLogsImpl(dir, prefix);
-  }
 
   @Override
   public void onCompleteLazyPersist(String bpId, long blockId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 7c8384d..5ce2710 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -17,9 +17,18 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
+import java.io.OutputStreamWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -41,15 +50,24 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.CloseableReferenceCount;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.Time;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The underlying volume used to store replica.
@@ -59,6 +77,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 @InterfaceAudience.Private
 @VisibleForTesting
 public class FsVolumeImpl implements FsVolumeSpi {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(FsVolumeImpl.class);
+
   private final FsDatasetImpl dataset;
   private final String storageID;
   private final StorageType storageType;
@@ -395,6 +416,332 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
   }
 
+  private enum SubdirFilter implements FilenameFilter {
+    INSTANCE;
+
+    @Override
+    public boolean accept(File dir, String name) {
+      return name.startsWith("subdir");
+    }
+  }
+
+  private enum BlockFileFilter implements FilenameFilter {
+    INSTANCE;
+
+    @Override
+    public boolean accept(File dir, String name) {
+      return !name.endsWith(".meta") && name.startsWith("blk_");
+    }
+  }
+
+  @VisibleForTesting
+  public static String nextSorted(List<String> arr, String prev) {
+    int res = 0;
+    if (prev != null) {
+      res = Collections.binarySearch(arr, prev);
+      if (res < 0) {
+        res = -1 - res;
+      } else {
+        res++;
+      }
+    }
+    if (res >= arr.size()) {
+      return null;
+    }
+    return arr.get(res);
+  }
+
+  private static class BlockIteratorState {
+    BlockIteratorState() {
+      lastSavedMs = iterStartMs = Time.now();
+      curFinalizedDir = null;
+      curFinalizedSubDir = null;
+      curEntry = null;
+      atEnd = false;
+    }
+
+    // The wall-clock ms since the epoch at which this iterator was last saved.
+    @JsonProperty
+    private long lastSavedMs;
+
+    // The wall-clock ms since the epoch at which this iterator was created.
+    @JsonProperty
+    private long iterStartMs;
+
+    @JsonProperty
+    private String curFinalizedDir;
+
+    @JsonProperty
+    private String curFinalizedSubDir;
+
+    @JsonProperty
+    private String curEntry;
+
+    @JsonProperty
+    private boolean atEnd;
+  }
+
+  /**
+   * A BlockIterator implementation for FsVolumeImpl.
+   */
+  private class BlockIteratorImpl implements FsVolumeSpi.BlockIterator {
+    private final File bpidDir;
+    private final String name;
+    private final String bpid;
+    private long maxStalenessMs = 0;
+
+    private List<String> cache;
+    private long cacheMs;
+
+    private BlockIteratorState state;
+
+    BlockIteratorImpl(String bpid, String name) {
+      this.bpidDir = new File(currentDir, bpid);
+      this.name = name;
+      this.bpid = bpid;
+      rewind();
+    }
+
+    /**
+     * Get the next subdirectory within the block pool slice.
+     *
+     * @return         The next subdirectory within the block pool slice, or
+     *                   null if there are no more.
+     */
+    private String getNextSubDir(String prev, File dir)
+          throws IOException {
+      List<String> children =
+          IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
+      cache = null;
+      cacheMs = 0;
+      if (children.size() == 0) {
+        LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}",
+            storageID, bpid, dir.getAbsolutePath());
+        return null;
+      }
+      Collections.sort(children);
+      String nextSubDir = nextSorted(children, prev);
+      if (nextSubDir == null) {
+        LOG.trace("getNextSubDir({}, {}): no more subdirectories found in {}",
+            storageID, bpid, dir.getAbsolutePath());
+      } else {
+        LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} " +
+            "within {}", storageID, bpid, nextSubDir, dir.getAbsolutePath());
+      }
+      return nextSubDir;
+    }
+
+    private String getNextFinalizedDir() throws IOException {
+      File dir = Paths.get(
+          bpidDir.getAbsolutePath(), "current", "finalized").toFile();
+      return getNextSubDir(state.curFinalizedDir, dir);
+    }
+
+    private String getNextFinalizedSubDir() throws IOException {
+      if (state.curFinalizedDir == null) {
+        return null;
+      }
+      File dir = Paths.get(
+          bpidDir.getAbsolutePath(), "current", "finalized",
+              state.curFinalizedDir).toFile();
+      return getNextSubDir(state.curFinalizedSubDir, dir);
+    }
+
+    private List<String> getSubdirEntries() throws IOException {
+      if (state.curFinalizedSubDir == null) {
+        return null; // There are no entries in the null subdir.
+      }
+      long now = Time.monotonicNow();
+      if (cache != null) {
+        long delta = now - cacheMs;
+        if (delta < maxStalenessMs) {
+          return cache;
+        } else {
+          LOG.trace("getSubdirEntries({}, {}): purging entries cache for {} " +
+            "after {} ms.", storageID, bpid, state.curFinalizedSubDir, delta);
+          cache = null;
+        }
+      }
+      File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized",
+                    state.curFinalizedDir, state.curFinalizedSubDir).toFile();
+      List<String> entries =
+          IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE);
+      if (entries.size() == 0) {
+        entries = null;
+      } else {
+        Collections.sort(entries);
+      }
+      if (entries == null) {
+        LOG.trace("getSubdirEntries({}, {}): no entries found in {}",
+            storageID, bpid, dir.getAbsolutePath());
+      } else {
+        LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}", 
+            storageID, bpid, entries.size(), dir.getAbsolutePath());
+      }
+      cache = entries;
+      cacheMs = now;
+      return cache;
+    }
+
+    /**
+     * Get the next block.<p/>
+     *
+     * Each volume has a hierarchical structure.<p/>
+     *
+     * <code>
+     * BPID B0
+     *   finalized/
+     *     subdir0
+     *       subdir0
+     *         blk_000
+     *         blk_001
+     *       ...
+     *     subdir1
+     *       subdir0
+     *         ...
+     *   rbw/
+     * </code>
+     *
+     * When we run out of entries at one level of the structure, we search
+     * progressively higher levels.  For example, when we run out of blk_
+     * entries in a subdirectory, we search for the next subdirectory.
+     * And so on.
+     */
+    @Override
+    public ExtendedBlock nextBlock() throws IOException {
+      if (state.atEnd) {
+        return null;
+      }
+      try {
+        while (true) {
+          List<String> entries = getSubdirEntries();
+          if (entries != null) {
+            state.curEntry = nextSorted(entries, state.curEntry);
+            if (state.curEntry == null) {
+              LOG.trace("nextBlock({}, {}): advancing from {} to next " +
+                  "subdirectory.", storageID, bpid, state.curFinalizedSubDir);
+            } else {
+              ExtendedBlock block =
+                  new ExtendedBlock(bpid, Block.filename2id(state.curEntry));
+              LOG.trace("nextBlock({}, {}): advancing to {}",
+                  storageID, bpid, block);
+              return block;
+            }
+          }
+          state.curFinalizedSubDir = getNextFinalizedSubDir();
+          if (state.curFinalizedSubDir == null) {
+            state.curFinalizedDir = getNextFinalizedDir();
+            if (state.curFinalizedDir == null) {
+              state.atEnd = true;
+              return null;
+            }
+          }
+        }
+      } catch (IOException e) {
+        state.atEnd = true;
+        LOG.error("nextBlock({}, {}): I/O error", storageID, bpid, e);
+        throw e;
+      }
+    }
+
+    @Override
+    public boolean atEnd() {
+      return state.atEnd;
+    }
+
+    @Override
+    public void rewind() {
+      cache = null;
+      cacheMs = 0;
+      state = new BlockIteratorState();
+    }
+
+    @Override
+    public void save() throws IOException {
+      state.lastSavedMs = Time.now();
+      boolean success = false;
+      ObjectMapper mapper = new ObjectMapper();
+      try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+                new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) {
+        mapper.writerWithDefaultPrettyPrinter().writeValue(writer, state);
+        success = true;
+      } finally {
+        if (!success) {
+          if (getTempSaveFile().delete()) {
+            LOG.debug("save({}, {}): error deleting temporary file.",
+                storageID, bpid);
+          }
+        }
+      }
+      Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(),
+          StandardCopyOption.ATOMIC_MOVE);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("save({}, {}): saved {}", storageID, bpid,
+            mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state));
+      }
+    }
+
+    public void load() throws IOException {
+      ObjectMapper mapper = new ObjectMapper();
+      File file = getSaveFile();
+      this.state = mapper.reader(BlockIteratorState.class).readValue(file);
+      LOG.trace("load({}, {}): loaded iterator {} from {}: {}", storageID,
+          bpid, name, file.getAbsoluteFile(),
+          mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state));
+    }
+
+    File getSaveFile() {
+      return new File(bpidDir, name + ".cursor");
+    }
+
+    File getTempSaveFile() {
+      return new File(bpidDir, name + ".cursor.tmp");
+    }
+
+    @Override
+    public void setMaxStalenessMs(long maxStalenessMs) {
+      this.maxStalenessMs = maxStalenessMs;
+    }
+
+    @Override
+    public void close() throws IOException {
+      // No action needed for this volume implementation.
+    }
+
+    @Override
+    public long getIterStartMs() {
+      return state.iterStartMs;
+    }
+
+    @Override
+    public long getLastSavedMs() {
+      return state.lastSavedMs;
+    }
+
+    @Override
+    public String getBlockPoolId() {
+      return bpid;
+    }
+  }
+
+  @Override
+  public BlockIterator newBlockIterator(String bpid, String name) {
+    return new BlockIteratorImpl(bpid, name);
+  }
+
+  @Override
+  public BlockIterator loadBlockIterator(String bpid, String name)
+      throws IOException {
+    BlockIteratorImpl iter = new BlockIteratorImpl(bpid, name);
+    iter.load();
+    return iter;
+  }
+
+  @Override
+  public FsDatasetSpi getDataset() {
+    return dataset;
+  }
+
   /**
    * RBW files. They get moved to the finalized block directory when
    * the block is finalized.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index c837593..ae2f5b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 
@@ -42,11 +43,13 @@ class FsVolumeList {
   private Object checkDirsMutex = new Object();
 
   private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
+  private final BlockScanner blockScanner;
   private volatile int numFailedVolumes;
 
-  FsVolumeList(int failedVols,
+  FsVolumeList(int failedVols, BlockScanner blockScanner,
       VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
     this.blockChooser = blockChooser;
+    this.blockScanner = blockScanner;
     this.numFailedVolumes = failedVols;
   }
   
@@ -260,13 +263,14 @@ class FsVolumeList {
 
   /**
    * Dynamically add new volumes to the existing volumes that this DN manages.
-   * @param newVolume the instance of new FsVolumeImpl.
+   *
+   * @param ref       a reference to the new FsVolumeImpl instance.
    */
-  void addVolume(FsVolumeImpl newVolume) {
+  void addVolume(FsVolumeReference ref) {
     while (true) {
       final FsVolumeImpl[] curVolumes = volumes.get();
       final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
-      volumeList.add(newVolume);
+      volumeList.add((FsVolumeImpl)ref.getVolume());
       if (volumes.compareAndSet(curVolumes,
           volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
         break;
@@ -274,12 +278,15 @@ class FsVolumeList {
         if (FsDatasetImpl.LOG.isDebugEnabled()) {
           FsDatasetImpl.LOG.debug(
               "The volume list has been changed concurrently, " +
-                  "retry to remove volume: " + newVolume);
+                  "retry to remove volume: " + ref.getVolume().getStorageID());
         }
       }
     }
-
-    FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
+    if (blockScanner != null) {
+      blockScanner.addVolumeScanner(ref);
+    }
+    FsDatasetImpl.LOG.info("Added new volume: " +
+        ref.getVolume().getStorageID());
   }
 
   /**
@@ -293,6 +300,9 @@ class FsVolumeList {
       if (volumeList.remove(target)) {
         if (volumes.compareAndSet(curVolumes,
             volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+          if (blockScanner != null) {
+            blockScanner.removeVolumeScanner(target);
+          }
           try {
             target.closeAndWait();
           } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
deleted file mode 100644
index 121127d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-
-import com.google.common.base.Charsets;
-
-class RollingLogsImpl implements RollingLogs {
-  private static final String CURR_SUFFIX = ".curr";
-  private static final String PREV_SUFFIX = ".prev";
-
-  static boolean isFilePresent(String dir, String filePrefix) {
-    return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
-           new File(dir, filePrefix + PREV_SUFFIX).exists();
-  }
-
-  private final File curr;
-  private final File prev;
-  private PrintWriter out; //require synchronized access
-
-  private final Appender appender = new Appender() {
-    @Override
-    public Appendable append(CharSequence csq) {
-      synchronized(RollingLogsImpl.this) {
-        if (out == null) {
-          throw new IllegalStateException(RollingLogsImpl.this
-              + " is not yet opened.");
-        }
-        out.print(csq);
-        out.flush();
-      }
-      return this;
-    }
-
-    @Override
-    public Appendable append(char c) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Appendable append(CharSequence csq, int start, int end) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close() {
-      synchronized(RollingLogsImpl.this) {
-        if (out != null) {
-          out.close();
-          out = null;
-        }
-      }
-    }
-  };
-
-
-  private final AtomicInteger numReaders = new AtomicInteger();
-
-  RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
-    curr = new File(dir, filePrefix + CURR_SUFFIX);
-    prev = new File(dir, filePrefix + PREV_SUFFIX);
-    out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
-        curr, true), Charsets.UTF_8));
-  }
-
-  @Override
-  public Reader iterator(boolean skipPrevFile) throws IOException {
-    numReaders.incrementAndGet(); 
-    return new Reader(skipPrevFile);
-  }
-
-  @Override
-  public Appender appender() {
-    return appender;
-  }
-
-  @Override
-  public boolean roll() throws IOException {
-    if (numReaders.get() > 0) {
-      return false;
-    }
-    if (!prev.delete() && prev.exists()) {
-      throw new IOException("Failed to delete " + prev);
-    }
-
-    synchronized(this) {
-      appender.close();
-      final boolean renamed = curr.renameTo(prev);
-      out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
-          curr, true), Charsets.UTF_8));
-      if (!renamed) {
-        throw new IOException("Failed to rename " + curr + " to " + prev);
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return curr.toString();
-  }
-  
-  /**
-   * This is used to read the lines in order.
-   * If the data is not read completely (i.e, untill hasNext() returns
-   * false), it needs to be explicitly 
-   */
-  private class Reader implements RollingLogs.LineIterator {
-    private File file;
-    private File lastReadFile;
-    private BufferedReader reader;
-    private String line;
-    private boolean closed = false;
-    
-    private Reader(boolean skipPrevFile) throws IOException {
-      reader = null;
-      file = skipPrevFile? curr : prev;
-      readNext();        
-    }
-
-    @Override
-    public boolean isPrevious() {
-      return file == prev;
-    }
-
-    @Override
-    public boolean isLastReadFromPrevious() {
-      return lastReadFile == prev;
-    }
-
-    private boolean openFile() throws IOException {
-
-      for(int i=0; i<2; i++) {
-        if (reader != null || i > 0) {
-          // move to next file
-          file = isPrevious()? curr : null;
-        }
-        if (file == null) {
-          return false;
-        }
-        if (file.exists()) {
-          break;
-        }
-      }
-      
-      if (reader != null ) {
-        reader.close();
-        reader = null;
-      }
-      
-      reader = new BufferedReader(new InputStreamReader(new FileInputStream(
-          file), Charsets.UTF_8));
-      return true;
-    }
-    
-    // read next line if possible.
-    private void readNext() throws IOException {
-      line = null;
-      try {
-        if (reader != null && (line = reader.readLine()) != null) {
-          return;
-        }
-        // move to the next file.
-        if (openFile()) {
-          readNext();
-        }
-      } finally {
-        if (!hasNext()) {
-          close();
-        }
-      }
-    }
-    
-    @Override
-    public boolean hasNext() {
-      return line != null;
-    }
-
-    @Override
-    public String next() {
-      String curLine = line;
-      try {
-        lastReadFile = file;
-        readNext();
-      } catch (IOException e) {
-        DataBlockScanner.LOG.warn("Failed to read next line.", e);
-      }
-      return curLine;
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (!closed) {
-        try {
-          if (reader != null) {
-            reader.close();
-          }
-        } finally {
-          file = null;
-          reader = null;
-          closed = true;
-          final int n = numReaders.decrementAndGet();
-          assert(n >= 0);
-        }
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 4d60792..c24f7be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1005,6 +1005,26 @@
 </property>
 
 <property>
+  <name>dfs.datanode.scan.period.hours</name>
+  <value>0</value>
+  <description>
+        If this is 0 or negative, the DataNode's block scanner will be
+        disabled.  If this is positive, the DataNode will not scan any
+        individual block more than once in the specified scan period.
+  </description>
+</property>
+
+<property>
+  <name>dfs.block.scanner.volume.bytes.per.second</name>
+  <value>1048576</value>
+  <description>
+        If this is 0, the DataNode's block scanner will be disabled.  If this
+        is positive, this is the number of bytes per second that the DataNode's
+        block scanner will try to scan from each volume.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.readahead.bytes</name>
   <value>4193404</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 15f5f2e..0eef46f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1637,4 +1637,20 @@ public class DFSTestUtil {
       }
     }, 100, waitTime);
   }
+
+ /**
+   * Change the length of a block at datanode dnIndex
+   */
+  public static boolean changeReplicaLength(MiniDFSCluster cluster,
+      ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
+    File blockFile = cluster.getBlockFile(dnIndex, blk);
+    if (blockFile != null && blockFile.exists()) {
+      RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+      raFile.setLength(raFile.length()+lenDelta);
+      raFile.close();
+      return true;
+    }
+    LOG.info("failed to change length of block " + blk);
+    return false;
+  }
 }