You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/01/26 18:44:24 UTC
[09/50] [abbrv] hadoop git commit: HDFS-7548. Corrupt block reporting
delayed until datablock scanner thread detects it. Contributed by Rushabh
Shah.
HDFS-7548. Corrupt block reporting delayed until datablock scanner thread detects it. Contributed by Rushabh Shah.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a8c59ba0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a8c59ba0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a8c59ba0
Branch: refs/heads/HDFS-EC
Commit: a8c59ba0cef5b904b6f499c8e073203abb91d2b4
Parents: ce16949
Author: Kihwal Lee <ki...@apache.org>
Authored: Wed Jan 21 14:41:31 2015 -0600
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Jan 26 09:43:26 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../server/datanode/BlockPoolSliceScanner.java | 49 ++++++++++++++--
.../hdfs/server/datanode/BlockSender.java | 3 +
.../hdfs/server/datanode/DataBlockScanner.java | 15 ++++-
.../hadoop/hdfs/server/datanode/DataNode.java | 2 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 4 +-
.../hadoop/hdfs/TestDatanodeBlockScanner.java | 60 +++++++++++++++++++-
7 files changed, 125 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c59ba0/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 0a301f8..25ad33b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -756,6 +756,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7610. Fix removal of dynamically added DN volumes (Lei (Eddy) Xu via
Colin P. McCabe)
+ HDFS-7548. Corrupt block reporting delayed until datablock scanner thread
+ detects it (Rushabh Shah via kihwal)
+
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c59ba0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
index 61f1e7e..f36fea1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
@@ -105,6 +105,7 @@ class BlockPoolSliceScanner {
private long bytesLeft = 0; // Bytes to scan in this period
private long totalBytesToScan = 0;
private boolean isNewPeriod = true;
+ private int lastScanTimeDifference = 5*60*1000;
private final LogFileHandler verificationLog;
@@ -112,6 +113,7 @@ class BlockPoolSliceScanner {
200, MAX_SCAN_RATE);
private static enum ScanType {
+ IMMEDIATE_SCAN,
VERIFICATION_SCAN, // scanned as part of periodic verfication
NONE,
}
@@ -129,12 +131,17 @@ class BlockPoolSliceScanner {
@Override
public int compare(BlockScanInfo left, BlockScanInfo right) {
+ final ScanType leftNextScanType = left.nextScanType;
+ final ScanType rightNextScanType = right.nextScanType;
final long l = left.lastScanTime;
final long r = right.lastScanTime;
+ // Compare by nextScanType if they are same then compare by
+ // lastScanTimes
// compare blocks itself if scantimes are same to avoid.
// because TreeMap uses comparator if available to check existence of
// the object.
- return l < r? -1: l > r? 1: left.compareTo(right);
+ int compareByNextScanType = leftNextScanType.compareTo(rightNextScanType);
+ return compareByNextScanType < 0? -1: compareByNextScanType > 0? 1: l < r? -1: l > r? 1: left.compareTo(right);
}
};
@@ -142,6 +149,7 @@ class BlockPoolSliceScanner {
ScanType lastScanType = ScanType.NONE;
boolean lastScanOk = true;
private LinkedElement next;
+ ScanType nextScanType = ScanType.VERIFICATION_SCAN;
BlockScanInfo(Block block) {
super(block);
@@ -265,10 +273,12 @@ class BlockPoolSliceScanner {
private synchronized void updateBlockInfo(LogEntry e) {
BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
- if(info != null && e.verificationTime > 0 &&
+ if (info != null && e.verificationTime > 0 &&
info.lastScanTime < e.verificationTime) {
delBlockInfo(info);
- info.lastScanTime = e.verificationTime;
+ if (info.nextScanType != ScanType.IMMEDIATE_SCAN) {
+ info.lastScanTime = e.verificationTime;
+ }
info.lastScanType = ScanType.VERIFICATION_SCAN;
addBlockInfo(info, false);
}
@@ -285,9 +295,23 @@ class BlockPoolSliceScanner {
DFSUtil.getRandom().nextInt(periodInt);
}
- /** Adds block to list of blocks */
- synchronized void addBlock(ExtendedBlock block) {
+ /** Adds block to list of blocks
+ * @param scanNow - true if we want to make that particular block a high
+ * priority one to scan immediately
+ **/
+ synchronized void addBlock(ExtendedBlock block, boolean scanNow) {
BlockScanInfo info = blockMap.get(block.getLocalBlock());
+ long lastScanTime = 0;
+ if (info != null) {
+ lastScanTime = info.lastScanTime;
+ }
+ // If the particular block is scanned in last 5 minutes, the no need to
+ // verify that block again
+ if (scanNow && Time.monotonicNow() - lastScanTime <
+ lastScanTimeDifference) {
+ return;
+ }
+
if ( info != null ) {
LOG.warn("Adding an already existing block " + block);
delBlockInfo(info);
@@ -295,6 +319,12 @@ class BlockPoolSliceScanner {
info = new BlockScanInfo(block.getLocalBlock());
info.lastScanTime = getNewBlockScanTime();
+ if (scanNow) {
+ // Create a new BlockScanInfo object and set the lastScanTime to 0
+ // which will make it the high priority block
+ LOG.info("Adding block for immediate verification " + block);
+ info.nextScanType = ScanType.IMMEDIATE_SCAN;
+ }
addBlockInfo(info, true);
adjustThrottler();
@@ -340,6 +370,7 @@ class BlockPoolSliceScanner {
info.lastScanType = type;
info.lastScanTime = now;
info.lastScanOk = scanOk;
+ info.nextScanType = ScanType.VERIFICATION_SCAN;
addBlockInfo(info, false);
// Don't update meta data if the verification failed.
@@ -363,6 +394,11 @@ class BlockPoolSliceScanner {
}
}
+ @VisibleForTesting
+ synchronized void setLastScanTimeDifference(int lastScanTimeDifference) {
+ this.lastScanTimeDifference = lastScanTimeDifference;
+ }
+
static private class LogEntry {
long blockId = -1;
@@ -502,6 +538,9 @@ class BlockPoolSliceScanner {
private synchronized boolean isFirstBlockProcessed() {
if (!blockInfoSet.isEmpty()) {
+ if (blockInfoSet.first().nextScanType == ScanType.IMMEDIATE_SCAN) {
+ return false;
+ }
long blockId = blockInfoSet.first().getBlockId();
if ((processedBlocks.get(blockId) != null)
&& (processedBlocks.get(blockId) == 1)) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c59ba0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 182b366..2d312d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -600,6 +600,9 @@ class BlockSender implements java.io.Closeable {
String ioem = e.getMessage();
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
+ //Something might be wrong with the block. Make this block the high
+ //priority block for verification.
+ datanode.blockScanner.addBlock(block, true);
}
}
throw ioeToSocketException(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c59ba0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
index bee3625..450c2b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
@@ -186,10 +186,10 @@ public class DataBlockScanner implements Runnable {
new String[blockPoolScannerMap.keySet().size()]);
}
- public void addBlock(ExtendedBlock block) {
+ public void addBlock(ExtendedBlock block, boolean scanNow) {
BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
if (bpScanner != null) {
- bpScanner.addBlock(block);
+ bpScanner.addBlock(block, scanNow);
} else {
LOG.warn("No block pool scanner found for block pool id: "
+ block.getBlockPoolId());
@@ -293,6 +293,17 @@ public class DataBlockScanner implements Runnable {
}
}
+ @VisibleForTesting
+ public void setLastScanTimeDifference(ExtendedBlock block, int lastScanTimeDifference) {
+ BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
+ if (bpScanner != null) {
+ bpScanner.setLastScanTimeDifference(lastScanTimeDifference);
+ } else {
+ LOG.warn("No block pool scanner found for block pool id: "
+ + block.getBlockPoolId());
+ }
+ }
+
public void start() {
blockScannerThread = new Thread(this);
blockScannerThread.setDaemon(true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c59ba0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 84528e7..12df9d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2170,7 +2170,7 @@ public class DataNode extends ReconfigurableBase
}
FsVolumeSpi volume = getFSDataset().getVolume(block);
if (blockScanner != null && !volume.isTransientStorage()) {
- blockScanner.addBlock(block);
+ blockScanner.addBlock(block, false);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c59ba0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index d8cc287..f990faf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -771,7 +771,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
// Replace the old block if any to reschedule the scanning.
- datanode.getBlockScanner().addBlock(block);
+ datanode.getBlockScanner().addBlock(block, false);
return replicaInfo;
}
@@ -2035,7 +2035,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final DataBlockScanner blockScanner = datanode.getBlockScanner();
if (!vol.isTransientStorage()) {
if (blockScanner != null) {
- blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
+ blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo), false);
}
} else {
ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c59ba0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
index bf0182b..9e78c10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -28,7 +27,10 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -42,6 +44,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@@ -490,4 +493,59 @@ public class TestDatanodeBlockScanner {
cluster.shutdown();
}
}
+
+/**
+ * This test verifies whether block is added to the first location of
+ * BlockPoolSliceScanner#blockInfoSet
+ */
+ @Test
+ public void testAddBlockInfoToFirstLocation() throws Exception {
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
+ .numDataNodes(1).build();
+ FileSystem fs = null;
+ try {
+ fs = cluster.getFileSystem();
+ DataNode dataNode = cluster.getDataNodes().get(0);
+ // Creating a bunch of blocks
+ for (int i = 1; i < 10; i++) {
+ Path fileName = new Path("/test" + i);
+ DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
+ }
+ // Get block of the first file created (file1)
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/test1"));
+ dataNode.getBlockScanner().setLastScanTimeDifference(block, 0);
+ // Let it sleep for more than 5 seconds so that BlockPoolSliceScanner can
+ // scan the first set of blocks
+ Thread.sleep(10000);
+ Long scanTime1Fortest1Block = DataNodeTestUtils.getLatestScanTime(
+ dataNode, block);
+ // Create another set of blocks
+ for (int i = 10; i < 20; i++) {
+ Path fileName = new Path("/test" + i);
+ DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
+ }
+ dataNode.getBlockScanner().addBlock(block, true);
+ // Sleep so that BlockPoolSliceScanner can scan the second set of blocks
+ // and one block which we scheduled to rescan
+ Thread.sleep(10000);
+ // Get the lastScanTime of all of the second set of blocks
+ Set<Long> lastScanTimeSet = new HashSet<Long>();
+ for (int i = 10; i < 20; i++) {
+ long lastScanTime = DataNodeTestUtils.getLatestScanTime(dataNode,
+ DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
+ lastScanTimeSet.add(lastScanTime);
+ }
+ Long scanTime2Fortest1Block = DataNodeTestUtils.getLatestScanTime(
+ dataNode, DFSTestUtil.getFirstBlock(fs, new Path("/test1")));
+ Long minimumLastScanTime = Collections.min(lastScanTimeSet);
+ assertTrue("The second scanTime for test1 block should be greater than "
+ + "first scanTime", scanTime2Fortest1Block > scanTime1Fortest1Block);
+ assertTrue("The second scanTime for test1 block should be less than or"
+ + " equal to minimum of the lastScanTime of second set of blocks",
+ scanTime2Fortest1Block <= minimumLastScanTime);
+ } finally {
+ IOUtils.closeStream(fs);
+ cluster.shutdown();
+ }
+ }
}