You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/02/13 23:50:14 UTC
hadoop git commit: HDFS-7686. Re-add rapid rescan of possibly corrupt
block feature to the block scanner (cmccabe)
Repository: hadoop
Updated Branches:
refs/heads/trunk 875256834 -> 8bb9a5000
HDFS-7686. Re-add rapid rescan of possibly corrupt block feature to the block scanner (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8bb9a500
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8bb9a500
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8bb9a500
Branch: refs/heads/trunk
Commit: 8bb9a5000ed06856abbad268c43ce1d5ad5bdd43
Parents: 8752568
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Fri Feb 13 14:35:49 2015 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Fri Feb 13 14:35:49 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/datanode/BlockScanner.java | 32 +++++
.../hdfs/server/datanode/BlockSender.java | 3 +
.../hdfs/server/datanode/VolumeScanner.java | 133 ++++++++++++++-----
.../hdfs/server/datanode/TestBlockScanner.java | 131 ++++++++++++++++++
5 files changed, 268 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bb9a500/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 7705f87..747f54a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -959,6 +959,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7776. Adding additional unit tests for Quota By Storage Type.
(Xiaoyu Yao via Arpit Agarwal)
+ HDFS-7686. Re-add rapid rescan of possibly corrupt block feature to the
+ block scanner (cmccabe)
+
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bb9a500/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
index 7429fff..b0248c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -278,6 +279,37 @@ public class BlockScanner {
}
}
+ /**
+ * Mark a block as "suspect."
+ *
+ * This means that we should try to rescan it soon. Note that the
+ * VolumeScanner keeps a list of recently suspicious blocks, which
+ * it uses to avoid rescanning the same block over and over in a short
+ * time frame.
+ *
+ * @param storageId The ID of the storage where the block replica
+ * is being stored.
+ * @param block The block's ID and block pool id.
+ */
+ synchronized void markSuspectBlock(String storageId, ExtendedBlock block) {
+ if (!isEnabled()) {
+ LOG.info("Not scanning suspicious block {} on {}, because the block " +
+ "scanner is disabled.", block, storageId);
+ return;
+ }
+ VolumeScanner scanner = scanners.get(storageId);
+ if (scanner == null) {
+ // This could happen if the volume is in the process of being removed.
+ // The removal process shuts down the VolumeScanner, but the volume
+ // object stays around as long as there are references to it (which
+ // should not be that long.)
+ LOG.info("Not scanning suspicious block {} on {}, because there is no " +
+ "volume scanner for that storageId.", block, storageId);
+ return;
+ }
+ scanner.markSuspectBlock(block);
+ }
+
@InterfaceAudience.Private
public static class Servlet extends HttpServlet {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bb9a500/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index c016e62..f4cde11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -601,6 +601,9 @@ class BlockSender implements java.io.Closeable {
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
}
+ datanode.getBlockScanner().markSuspectBlock(
+ volumeRef.getVolume().getStorageID(),
+ block);
}
throw ioeToSocketException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bb9a500/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index ce0a875..615abe9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -22,12 +22,15 @@ import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
@@ -117,6 +120,21 @@ public class VolumeScanner extends Thread {
new LinkedList<BlockIterator>();
/**
+ * Blocks which are suspect.
+ * The scanner prioritizes scanning these blocks.
+ */
+ private final LinkedHashSet<ExtendedBlock> suspectBlocks =
+ new LinkedHashSet<ExtendedBlock>();
+
+ /**
+ * Blocks which were suspect which we have scanned.
+ * This is used to avoid scanning the same suspect block over and over.
+ */
+ private final Cache<ExtendedBlock, Boolean> recentSuspectBlocks =
+ CacheBuilder.newBuilder().maximumSize(1000)
+ .expireAfterAccess(10, TimeUnit.MINUTES).build();
+
+ /**
* The current block iterator, or null if there is none.
*/
private BlockIterator curBlockIter = null;
@@ -458,10 +476,13 @@ public class VolumeScanner extends Thread {
/**
* Run an iteration of the VolumeScanner loop.
*
+ * @param suspectBlock A suspect block which we should scan, or null to
+ * scan the next regularly scheduled block.
+ *
* @return The number of milliseconds to delay before running the loop
* again, or 0 to re-run the loop immediately.
*/
- private long runLoop() {
+ private long runLoop(ExtendedBlock suspectBlock) {
long bytesScanned = -1;
boolean scanError = false;
ExtendedBlock block = null;
@@ -477,40 +498,43 @@ public class VolumeScanner extends Thread {
}
// Find a usable block pool to scan.
- if ((curBlockIter == null) || curBlockIter.atEnd()) {
- long timeout = findNextUsableBlockIter();
- if (timeout > 0) {
- LOG.trace("{}: no block pools are ready to scan yet. Waiting " +
- "{} ms.", this, timeout);
+ if (suspectBlock != null) {
+ block = suspectBlock;
+ } else {
+ if ((curBlockIter == null) || curBlockIter.atEnd()) {
+ long timeout = findNextUsableBlockIter();
+ if (timeout > 0) {
+ LOG.trace("{}: no block pools are ready to scan yet. Waiting " +
+ "{} ms.", this, timeout);
+ synchronized (stats) {
+ stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
+ }
+ return timeout;
+ }
synchronized (stats) {
- stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
+ stats.scansSinceRestart++;
+ stats.blocksScannedInCurrentPeriod = 0;
+ stats.nextBlockPoolScanStartMs = -1;
}
- return timeout;
+ return 0L;
}
- synchronized (stats) {
- stats.scansSinceRestart++;
- stats.blocksScannedInCurrentPeriod = 0;
- stats.nextBlockPoolScanStartMs = -1;
+ try {
+ block = curBlockIter.nextBlock();
+ } catch (IOException e) {
+ // There was an error listing the next block in the volume. This is a
+ // serious issue.
+ LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
+ // On the next loop iteration, curBlockIter#eof will be set to true, and
+ // we will pick a different block iterator.
+ return 0L;
+ }
+ if (block == null) {
+ // The BlockIterator is at EOF.
+ LOG.info("{}: finished scanning block pool {}",
+ this, curBlockIter.getBlockPoolId());
+ saveBlockIterator(curBlockIter);
+ return 0;
}
- return 0L;
- }
-
- try {
- block = curBlockIter.nextBlock();
- } catch (IOException e) {
- // There was an error listing the next block in the volume. This is a
- // serious issue.
- LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
- // On the next loop iteration, curBlockIter#eof will be set to true, and
- // we will pick a different block iterator.
- return 0L;
- }
- if (block == null) {
- // The BlockIterator is at EOF.
- LOG.info("{}: finished scanning block pool {}",
- this, curBlockIter.getBlockPoolId());
- saveBlockIterator(curBlockIter);
- return 0;
}
long saveDelta = monotonicMs - curBlockIter.getLastSavedMs();
if (saveDelta >= conf.cursorSaveMs) {
@@ -529,7 +553,7 @@ public class VolumeScanner extends Thread {
} finally {
synchronized (stats) {
stats.bytesScannedInPastHour = scannedBytesSum;
- if (bytesScanned >= 0) {
+ if (bytesScanned > 0) {
stats.blocksScannedInCurrentPeriod++;
stats.blocksScannedSinceRestart++;
}
@@ -551,6 +575,20 @@ public class VolumeScanner extends Thread {
}
}
+ /**
+ * If there are elements in the suspectBlocks list, removes
+ * and returns the first one. Otherwise, returns null.
+ */
+ private synchronized ExtendedBlock popNextSuspectBlock() {
+ Iterator<ExtendedBlock> iter = suspectBlocks.iterator();
+ if (!iter.hasNext()) {
+ return null;
+ }
+ ExtendedBlock block = iter.next();
+ iter.remove();
+ return block;
+ }
+
@Override
public void run() {
// Record the minute on which the scanner started.
@@ -563,7 +601,9 @@ public class VolumeScanner extends Thread {
try {
long timeout = 0;
while (true) {
- // Take the lock to check if we should stop.
+ ExtendedBlock suspectBlock = null;
+ // Take the lock to check if we should stop, and access the
+ // suspect block list.
synchronized (this) {
if (stopping) {
break;
@@ -574,8 +614,9 @@ public class VolumeScanner extends Thread {
break;
}
}
+ suspectBlock = popNextSuspectBlock();
}
- timeout = runLoop();
+ timeout = runLoop(suspectBlock);
}
} catch (InterruptedException e) {
// We are exiting because of an InterruptedException,
@@ -612,6 +653,30 @@ public class VolumeScanner extends Thread {
this.interrupt();
}
+
+ public synchronized void markSuspectBlock(ExtendedBlock block) {
+ if (stopping) {
+ LOG.info("{}: Not scheduling suspect block {} for " +
+ "rescanning, because this volume scanner is stopping.", this, block);
+ return;
+ }
+ Boolean recent = recentSuspectBlocks.getIfPresent(block);
+ if (recent != null) {
+ LOG.info("{}: Not scheduling suspect block {} for " +
+ "rescanning, because we rescanned it recently.", this, block);
+ return;
+ }
+ if (suspectBlocks.contains(block)) {
+ LOG.info("{}: suspect block {} is already queued for " +
+ "rescanning.", this, block);
+ return;
+ }
+ suspectBlocks.add(block);
+ recentSuspectBlocks.put(block, true);
+ LOG.info("{}: Scheduling suspect block {} for rescanning.", this, block);
+ notify(); // wake scanner thread.
+ }
+
/**
* Allow the scanner to scan the given block pool.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bb9a500/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
index b727263..735e9a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
@@ -268,6 +268,20 @@ public class TestBlockScanner {
final Set<ExtendedBlock> goodBlocks = new HashSet<ExtendedBlock>();
long blocksScanned = 0;
Semaphore sem = null;
+
+ @Override
+ public String toString() {
+ final StringBuilder bld = new StringBuilder();
+ bld.append("ScanResultHandler.Info{");
+ bld.append("shouldRun=").append(shouldRun).append(", ");
+ bld.append("blocksScanned=").append(blocksScanned).append(", ");
+ bld.append("sem#availablePermits=").append(sem.availablePermits()).
+ append(", ");
+ bld.append("badBlocks=").append(badBlocks).append(", ");
+ bld.append("goodBlocks=").append(goodBlocks);
+ bld.append("}");
+ return bld.toString();
+ }
}
private VolumeScanner scanner;
@@ -681,4 +695,121 @@ public class TestBlockScanner {
Assert.assertFalse(VolumeScanner.
calculateShouldScan("test", 100000L, 365000000L, 0, 60));
}
+
+ /**
+ * Test that we can mark certain blocks as suspect, and get them quickly
+ * rescanned that way. See HDFS-7686 and HDFS-7548.
+ */
+ @Test(timeout=120000)
+ public void testMarkSuspectBlock() throws Exception {
+ Configuration conf = new Configuration();
+ // Set a really long scan period.
+ conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+ conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+ TestScanResultHandler.class.getName());
+ conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
+ final TestContext ctx = new TestContext(conf, 1);
+ final int NUM_EXPECTED_BLOCKS = 10;
+ ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
+ final TestScanResultHandler.Info info =
+ TestScanResultHandler.getInfo(ctx.volumes.get(0));
+ String storageID = ctx.datanode.getFSDataset().
+ getVolumes().get(0).getStorageID();
+ synchronized (info) {
+ info.sem = new Semaphore(4);
+ info.shouldRun = true;
+ info.notify();
+ }
+ // Scan the first 4 blocks
+ LOG.info("Waiting for the first 4 blocks to be scanned.");
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (info) {
+ if (info.blocksScanned >= 4) {
+ LOG.info("info = {}. blockScanned has now reached 4.", info);
+ return true;
+ } else {
+ LOG.info("info = {}. Waiting for blockScanned to reach 4.", info);
+ return false;
+ }
+ }
+ }
+ }, 50, 30000);
+ // We should have scanned 4 blocks
+ synchronized (info) {
+ assertEquals("Expected 4 good blocks.", 4, info.goodBlocks.size());
+ info.goodBlocks.clear();
+ assertEquals("Expected 4 blocksScanned", 4, info.blocksScanned);
+ assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
+ info.blocksScanned = 0;
+ }
+ ExtendedBlock first = ctx.getFileBlock(0, 0);
+ ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
+
+ // When we increment the semaphore, the TestScanResultHandler will finish
+ // adding the block that it was scanning previously (the 5th block).
+ // We increment the semaphore twice so that the handler will also
+ // get a chance to see the suspect block which we just requested the
+ // VolumeScanner to process.
+ info.sem.release(2);
+
+ LOG.info("Waiting for 2 more blocks to be scanned.");
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (info) {
+ if (info.blocksScanned >= 2) {
+ LOG.info("info = {}. blockScanned has now reached 2.", info);
+ return true;
+ } else {
+ LOG.info("info = {}. Waiting for blockScanned to reach 2.", info);
+ return false;
+ }
+ }
+ }
+ }, 50, 30000);
+
+ synchronized (info) {
+ assertTrue("Expected block " + first + " to have been scanned.",
+ info.goodBlocks.contains(first));
+ assertEquals(2, info.goodBlocks.size());
+ info.goodBlocks.clear();
+ assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
+ assertEquals(2, info.blocksScanned);
+ info.blocksScanned = 0;
+ }
+
+ // Re-mark the same block as suspect.
+ ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
+ info.sem.release(10);
+
+ LOG.info("Waiting for 5 more blocks to be scanned.");
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (info) {
+ if (info.blocksScanned >= 5) {
+ LOG.info("info = {}. blockScanned has now reached 5.", info);
+ return true;
+ } else {
+ LOG.info("info = {}. Waiting for blockScanned to reach 5.", info);
+ return false;
+ }
+ }
+ }
+ }, 50, 30000);
+ synchronized (info) {
+ assertEquals(5, info.goodBlocks.size());
+ assertEquals(0, info.badBlocks.size());
+ assertEquals(5, info.blocksScanned);
+ // We should not have rescanned the "suspect block",
+ // because it was recently rescanned by the suspect block system.
+ // This is a test of the "suspect block" rate limiting.
+ Assert.assertFalse("We should not " +
+ "have rescanned block " + first + ", because it should have been " +
+ "in recentSuspectBlocks.", info.goodBlocks.contains(first));
+ info.blocksScanned = 0;
+ }
+ }
}