You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/02/13 23:50:14 UTC

hadoop git commit: HDFS-7686. Re-add rapid rescan of possibly corrupt block feature to the block scanner (cmccabe)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 875256834 -> 8bb9a5000


HDFS-7686. Re-add rapid rescan of possibly corrupt block feature to the block scanner (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8bb9a500
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8bb9a500
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8bb9a500

Branch: refs/heads/trunk
Commit: 8bb9a5000ed06856abbad268c43ce1d5ad5bdd43
Parents: 8752568
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Fri Feb 13 14:35:49 2015 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Fri Feb 13 14:35:49 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/datanode/BlockScanner.java      |  32 +++++
 .../hdfs/server/datanode/BlockSender.java       |   3 +
 .../hdfs/server/datanode/VolumeScanner.java     | 133 ++++++++++++++-----
 .../hdfs/server/datanode/TestBlockScanner.java  | 131 ++++++++++++++++++
 5 files changed, 268 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bb9a500/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 7705f87..747f54a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -959,6 +959,9 @@ Release 2.7.0 - UNRELEASED
       HDFS-7776. Adding additional unit tests for Quota By Storage Type.
       (Xiaoyu Yao via Arpit Agarwal)
 
+      HDFS-7686. Re-add rapid rescan of possibly corrupt block feature to the
+      block scanner (cmccabe)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bb9a500/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
index 7429fff..b0248c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -278,6 +279,37 @@ public class BlockScanner {
     }
   }
 
+  /**
+   * Mark a block as "suspect."
+   *
+   * This means that we should try to rescan it soon.  Note that the
+   * VolumeScanner keeps a list of recently suspicious blocks, which
+   * it uses to avoid rescanning the same block over and over in a short
+   * time frame.
+   *
+   * @param storageId     The ID of the storage where the block replica
+   *                      is being stored.
+   * @param block         The block's ID and block pool id.
+   */
+  synchronized void markSuspectBlock(String storageId, ExtendedBlock block) {
+    if (!isEnabled()) {
+      LOG.info("Not scanning suspicious block {} on {}, because the block " +
+          "scanner is disabled.", block, storageId);
+      return;
+    }
+    VolumeScanner scanner = scanners.get(storageId);
+    if (scanner == null) {
+      // This could happen if the volume is in the process of being removed.
+      // The removal process shuts down the VolumeScanner, but the volume
+      // object stays around as long as there are references to it (which
+      // should not be that long.)
+      LOG.info("Not scanning suspicious block {} on {}, because there is no " +
+          "volume scanner for that storageId.", block, storageId);
+      return;
+    }
+    scanner.markSuspectBlock(block);
+  }
+
   @InterfaceAudience.Private
   public static class Servlet extends HttpServlet {
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bb9a500/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index c016e62..f4cde11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -601,6 +601,9 @@ class BlockSender implements java.io.Closeable {
         if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
           LOG.error("BlockSender.sendChunks() exception: ", e);
         }
+        datanode.getBlockScanner().markSuspectBlock(
+              volumeRef.getVolume().getStorageID(),
+              block);
       }
       throw ioeToSocketException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bb9a500/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index ce0a875..615abe9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -22,12 +22,15 @@ import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
@@ -117,6 +120,21 @@ public class VolumeScanner extends Thread {
       new LinkedList<BlockIterator>();
 
   /**
+   * Blocks which are suspect.
+   * The scanner prioritizes scanning these blocks.
+   */
+  private final LinkedHashSet<ExtendedBlock> suspectBlocks =
+      new LinkedHashSet<ExtendedBlock>();
+
+  /**
+   * Blocks which were suspect which we have scanned.
+   * This is used to avoid scanning the same suspect block over and over.
+   */
+  private final Cache<ExtendedBlock, Boolean> recentSuspectBlocks =
+      CacheBuilder.newBuilder().maximumSize(1000)
+        .expireAfterAccess(10, TimeUnit.MINUTES).build();
+
+  /**
    * The current block iterator, or null if there is none.
    */
   private BlockIterator curBlockIter = null;
@@ -458,10 +476,13 @@ public class VolumeScanner extends Thread {
   /**
    * Run an iteration of the VolumeScanner loop.
    *
+   * @param suspectBlock   A suspect block which we should scan, or null to
+   *                       scan the next regularly scheduled block.
+   *
    * @return     The number of milliseconds to delay before running the loop
    *               again, or 0 to re-run the loop immediately.
    */
-  private long runLoop() {
+  private long runLoop(ExtendedBlock suspectBlock) {
     long bytesScanned = -1;
     boolean scanError = false;
     ExtendedBlock block = null;
@@ -477,40 +498,43 @@ public class VolumeScanner extends Thread {
       }
 
       // Find a usable block pool to scan.
-      if ((curBlockIter == null) || curBlockIter.atEnd()) {
-        long timeout = findNextUsableBlockIter();
-        if (timeout > 0) {
-          LOG.trace("{}: no block pools are ready to scan yet.  Waiting " +
-              "{} ms.", this, timeout);
+      if (suspectBlock != null) {
+        block = suspectBlock;
+      } else {
+        if ((curBlockIter == null) || curBlockIter.atEnd()) {
+          long timeout = findNextUsableBlockIter();
+          if (timeout > 0) {
+            LOG.trace("{}: no block pools are ready to scan yet.  Waiting " +
+                "{} ms.", this, timeout);
+            synchronized (stats) {
+              stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
+            }
+            return timeout;
+          }
           synchronized (stats) {
-            stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
+            stats.scansSinceRestart++;
+            stats.blocksScannedInCurrentPeriod = 0;
+            stats.nextBlockPoolScanStartMs = -1;
           }
-          return timeout;
+          return 0L;
         }
-        synchronized (stats) {
-          stats.scansSinceRestart++;
-          stats.blocksScannedInCurrentPeriod = 0;
-          stats.nextBlockPoolScanStartMs = -1;
+        try {
+          block = curBlockIter.nextBlock();
+        } catch (IOException e) {
+          // There was an error listing the next block in the volume.  This is a
+          // serious issue.
+          LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
+          // On the next loop iteration, curBlockIter#eof will be set to true, and
+          // we will pick a different block iterator.
+          return 0L;
+        }
+        if (block == null) {
+          // The BlockIterator is at EOF.
+          LOG.info("{}: finished scanning block pool {}",
+              this, curBlockIter.getBlockPoolId());
+          saveBlockIterator(curBlockIter);
+          return 0;
         }
-        return 0L;
-      }
-
-      try {
-        block = curBlockIter.nextBlock();
-      } catch (IOException e) {
-        // There was an error listing the next block in the volume.  This is a
-        // serious issue.
-        LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
-        // On the next loop iteration, curBlockIter#eof will be set to true, and
-        // we will pick a different block iterator.
-        return 0L;
-      }
-      if (block == null) {
-        // The BlockIterator is at EOF.
-        LOG.info("{}: finished scanning block pool {}",
-            this, curBlockIter.getBlockPoolId());
-        saveBlockIterator(curBlockIter);
-        return 0;
       }
       long saveDelta = monotonicMs - curBlockIter.getLastSavedMs();
       if (saveDelta >= conf.cursorSaveMs) {
@@ -529,7 +553,7 @@ public class VolumeScanner extends Thread {
     } finally {
       synchronized (stats) {
         stats.bytesScannedInPastHour = scannedBytesSum;
-        if (bytesScanned >= 0) {
+        if (bytesScanned > 0) {
           stats.blocksScannedInCurrentPeriod++;
           stats.blocksScannedSinceRestart++;
         }
@@ -551,6 +575,20 @@ public class VolumeScanner extends Thread {
     }
   }
 
+  /**
+   * If there are elements in the suspectBlocks list, removes
+   * and returns the first one.  Otherwise, returns null.
+   */
+  private synchronized ExtendedBlock popNextSuspectBlock() {
+    Iterator<ExtendedBlock> iter = suspectBlocks.iterator();
+    if (!iter.hasNext()) {
+      return null;
+    }
+    ExtendedBlock block = iter.next();
+    iter.remove();
+    return block;
+  }
+
   @Override
   public void run() {
     // Record the minute on which the scanner started.
@@ -563,7 +601,9 @@ public class VolumeScanner extends Thread {
       try {
         long timeout = 0;
         while (true) {
-          // Take the lock to check if we should stop.
+          ExtendedBlock suspectBlock = null;
+          // Take the lock to check if we should stop, and access the
+          // suspect block list.
           synchronized (this) {
             if (stopping) {
               break;
@@ -574,8 +614,9 @@ public class VolumeScanner extends Thread {
                 break;
               }
             }
+            suspectBlock = popNextSuspectBlock();
           }
-          timeout = runLoop();
+          timeout = runLoop(suspectBlock);
         }
       } catch (InterruptedException e) {
         // We are exiting because of an InterruptedException,
@@ -612,6 +653,30 @@ public class VolumeScanner extends Thread {
     this.interrupt();
   }
 
+
+  public synchronized void markSuspectBlock(ExtendedBlock block) {
+    if (stopping) {
+      LOG.info("{}: Not scheduling suspect block {} for " +
+          "rescanning, because this volume scanner is stopping.", this, block);
+      return;
+    }
+    Boolean recent = recentSuspectBlocks.getIfPresent(block);
+    if (recent != null) {
+      LOG.info("{}: Not scheduling suspect block {} for " +
+          "rescanning, because we rescanned it recently.", this, block);
+      return;
+    }
+    if (suspectBlocks.contains(block)) {
+      LOG.info("{}: suspect block {} is already queued for " +
+          "rescanning.", this, block);
+      return;
+    }
+    suspectBlocks.add(block);
+    recentSuspectBlocks.put(block, true);
+    LOG.info("{}: Scheduling suspect block {} for rescanning.", this, block);
+    notify(); // wake scanner thread.
+  }
+
   /**
    * Allow the scanner to scan the given block pool.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bb9a500/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
index b727263..735e9a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
@@ -268,6 +268,20 @@ public class TestBlockScanner {
       final Set<ExtendedBlock> goodBlocks = new HashSet<ExtendedBlock>();
       long blocksScanned = 0;
       Semaphore sem = null;
+
+      @Override
+      public String toString() {
+        final StringBuilder bld = new StringBuilder();
+        bld.append("ScanResultHandler.Info{");
+        bld.append("shouldRun=").append(shouldRun).append(", ");
+        bld.append("blocksScanned=").append(blocksScanned).append(", ");
+        bld.append("sem#availablePermits=").append(sem.availablePermits()).
+            append(", ");
+        bld.append("badBlocks=").append(badBlocks).append(", ");
+        bld.append("goodBlocks=").append(goodBlocks);
+        bld.append("}");
+        return bld.toString();
+      }
     }
 
     private VolumeScanner scanner;
@@ -681,4 +695,121 @@ public class TestBlockScanner {
     Assert.assertFalse(VolumeScanner.
         calculateShouldScan("test", 100000L, 365000000L, 0, 60));
   }
+
+  /**
+   * Test that we can mark certain blocks as suspect, and get them quickly
+   * rescanned that way.  See HDFS-7686 and HDFS-7548.
+   */
+  @Test(timeout=120000)
+  public void testMarkSuspectBlock() throws Exception {
+    Configuration conf = new Configuration();
+    // Set a really long scan period.
+    conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
+    final TestContext ctx = new TestContext(conf, 1);
+    final int NUM_EXPECTED_BLOCKS = 10;
+    ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    String storageID = ctx.datanode.getFSDataset().
+        getVolumes().get(0).getStorageID();
+    synchronized (info) {
+      info.sem = new Semaphore(4);
+      info.shouldRun = true;
+      info.notify();
+    }
+    // Scan the first 4 blocks
+    LOG.info("Waiting for the first 4 blocks to be scanned.");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          if (info.blocksScanned >= 4) {
+            LOG.info("info = {}.  blockScanned has now reached 4.", info);
+            return true;
+          } else {
+            LOG.info("info = {}.  Waiting for blockScanned to reach 4.", info);
+            return false;
+          }
+        }
+      }
+    }, 50, 30000);
+    // We should have scanned 4 blocks
+    synchronized (info) {
+      assertEquals("Expected 4 good blocks.", 4, info.goodBlocks.size());
+      info.goodBlocks.clear();
+      assertEquals("Expected 4 blocksScanned", 4, info.blocksScanned);
+      assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
+      info.blocksScanned = 0;
+    }
+    ExtendedBlock first = ctx.getFileBlock(0, 0);
+    ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
+
+    // When we increment the semaphore, the TestScanResultHandler will finish
+    // adding the block that it was scanning previously (the 5th block).
+    // We increment the semaphore twice so that the handler will also
+    // get a chance to see the suspect block which we just requested the
+    // VolumeScanner to process.
+    info.sem.release(2);
+
+    LOG.info("Waiting for 2 more blocks to be scanned.");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          if (info.blocksScanned >= 2) {
+            LOG.info("info = {}.  blockScanned has now reached 2.", info);
+            return true;
+          } else {
+            LOG.info("info = {}.  Waiting for blockScanned to reach 2.", info);
+            return false;
+          }
+        }
+      }
+    }, 50, 30000);
+
+    synchronized (info) {
+      assertTrue("Expected block " + first + " to have been scanned.",
+          info.goodBlocks.contains(first));
+      assertEquals(2, info.goodBlocks.size());
+      info.goodBlocks.clear();
+      assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
+      assertEquals(2, info.blocksScanned);
+      info.blocksScanned = 0;
+    }
+
+    // Re-mark the same block as suspect.
+    ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
+    info.sem.release(10);
+
+    LOG.info("Waiting for 5 more blocks to be scanned.");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          if (info.blocksScanned >= 5) {
+            LOG.info("info = {}.  blockScanned has now reached 5.", info);
+            return true;
+          } else {
+            LOG.info("info = {}.  Waiting for blockScanned to reach 5.", info);
+            return false;
+          }
+        }
+      }
+    }, 50, 30000);
+    synchronized (info) {
+      assertEquals(5, info.goodBlocks.size());
+      assertEquals(0, info.badBlocks.size());
+      assertEquals(5, info.blocksScanned);
+      // We should not have rescanned the "suspect block",
+      // because it was recently rescanned by the suspect block system.
+      // This is a test of the "suspect block" rate limiting.
+      Assert.assertFalse("We should not " +
+          "have rescanned block " + first + ", because it should have been " +
+          "in recentSuspectBlocks.", info.goodBlocks.contains(first));
+      info.blocksScanned = 0;
+    }
+  }
 }