You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/17 00:57:36 UTC
svn commit: r496897 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/
src/java/org/apache/hadoop/fs/s3/ src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Tue Jan 16 15:57:34 2007
New Revision: 496897
URL: http://svn.apache.org/viewvc?view=rev&rev=496897
Log:
HADOOP-855. In HDFS, try to repair files with checksum errors.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jan 16 15:57:34 2007
@@ -28,6 +28,10 @@
7. HADOOP-801. Add to jobtracker a log of task completion events.
(Sanjay Dahiya via cutting)
+ 8. HADOOP-855. In HDFS, try to repair files with checksum errors.
+ An exception is still thrown, but corrupt blocks are now removed
+ when they have replicas. (Wendy Chien via cutting)
+
Release 0.10.1 - 2007-01-10
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Tue Jan 16 15:57:34 2007
@@ -29,7 +29,7 @@
**********************************************************************/
interface ClientProtocol extends VersionedProtocol {
- public static final long versionID = 5L; // open() takes a new parameter
+ public static final long versionID = 6L; // reportBadBlocks added
///////////////////////////////////////
// File contents
@@ -142,6 +142,13 @@
* times before succeeding.
*/
public boolean complete(String src, String clientName) throws IOException;
+
+ /**
+ * The client wants to report corrupted blocks (blocks with specified
+ * locations on datanodes).
+ * @param blocks Array of located blocks to report
+ */
+ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
///////////////////////////////////////
// Namespace management
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Jan 16 15:57:34 2007
@@ -179,7 +179,14 @@
}
}
}
-
+
+ /**
+ * Report corrupt blocks that were discovered by the client.
+ */
+ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+ namenode.reportBadBlocks(blocks);
+ }
+
public short getDefaultReplication() {
return defaultReplication;
}
@@ -496,10 +503,12 @@
private DataInputStream blockStream;
private Block blocks[] = null;
private DatanodeInfo nodes[][] = null;
+ private DatanodeInfo currentNode = null;
+ private Block currentBlock = null;
private long pos = 0;
private long filelen = 0;
private long blockEnd = -1;
-
+
/**
*/
public DFSInputStream(String src) throws IOException {
@@ -538,8 +547,24 @@
}
this.blocks = newBlocks;
this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);
+ this.currentNode = null;
+ }
+
+ /**
+ * Returns the datanode from which the stream is currently reading.
+ */
+ public DatanodeInfo getCurrentDatanode() {
+ return currentNode;
+ }
+
+ /**
+ * Returns the block containing the target position.
+ */
+ public Block getCurrentBlock() {
+ return currentBlock;
}
+
/**
* Used by the automatic tests to detemine blocks locations of a
* file
@@ -623,6 +648,7 @@
this.pos = target;
this.blockEnd = targetBlockEnd;
+ this.currentBlock = blocks[targetBlock];
this.blockStream = in;
return chosenNode;
} catch (IOException ex) {
@@ -671,7 +697,7 @@
int result = -1;
if (pos < filelen) {
if (pos > blockEnd) {
- blockSeekTo(pos, new TreeSet());
+ currentNode = blockSeekTo(pos, new TreeSet());
}
result = blockStream.read();
if (result >= 0) {
@@ -691,7 +717,6 @@
}
if (pos < filelen) {
int retries = 2;
- DatanodeInfo chosenNode = null;
TreeSet deadNodes = null;
while (retries > 0) {
try {
@@ -699,7 +724,7 @@
if (deadNodes == null) {
deadNodes = new TreeSet();
}
- chosenNode = blockSeekTo(pos, deadNodes);
+ currentNode = blockSeekTo(pos, deadNodes);
}
int realLen = Math.min(len, (int) (blockEnd - pos + 1));
int result = blockStream.read(buf, off, realLen);
@@ -711,7 +736,7 @@
LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
blockEnd = -1;
if (deadNodes == null) { deadNodes = new TreeSet(); }
- if (chosenNode != null) { deadNodes.add(chosenNode); }
+ if (currentNode != null) { deadNodes.add(currentNode); }
if (--retries == 0) {
throw e;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Tue Jan 16 15:57:34 2007
@@ -24,7 +24,7 @@
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.*;
/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
@@ -248,15 +248,51 @@
return dfs;
}
- public void reportChecksumFailure(Path f, FSInputStream in,
- long start, long length, int crc) {
+
+ /**
+ * We need to find the blocks that didn't match. Likely only one
+ * is corrupt but we will report both to the namenode. In the future,
+ * we can consider figuring out exactly which block is corrupt.
+ */
+ public void reportChecksumFailure(Path f,
+ FSInputStream in, long inPos,
+ FSInputStream sums, long sumsPos) {
- // ignore for now, causing task to fail, and hope that when task is
- // retried it gets a different copy of the block that is not corrupt.
+ LocatedBlock lblocks[] = new LocatedBlock[2];
+
+ try {
+ // Find block in data stream.
+ DFSClient.DFSInputStream dfsIn = (DFSClient.DFSInputStream) in;
+ Block dataBlock = dfsIn.getCurrentBlock();
+ if (dataBlock == null) {
+ throw new IOException("Error: Current block in data stream is null! ");
+ }
+ DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()};
+ lblocks[0] = new LocatedBlock(dataBlock, dataNode);
+ LOG.info("Found checksum error in data stream at block=" + dataBlock.getBlockName() +
+ " on datanode=" + dataNode[0].getName());
+
+ // Find block in checksum stream
+ DFSClient.DFSInputStream dfsSums = (DFSClient.DFSInputStream) sums;
+ Block sumsBlock = dfsSums.getCurrentBlock();
+ if (sumsBlock == null) {
+ throw new IOException("Error: Current block in checksum stream is null! ");
+ }
+ DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()};
+ lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
+ LOG.info("Found checksum error in checksum stream at block=" + sumsBlock.getBlockName() +
+ " on datanode=" + sumsNode[0].getName());
+
+ // Ask client to delete blocks.
+ dfs.reportBadBlocks(lblocks);
+
+ } catch (IOException ie) {
+ LOG.info("Found corruption while reading "
+ + f.toString()
+ + ". Error repairing corrupt blocks. Bad blocks remain. "
+ + StringUtils.stringifyException(ie));
+ }
- // FIXME: we should move the bad block(s) involved to a bad block
- // directory on their datanode, and then re-replicate the blocks, so that
- // no data is lost. a task may fail, but on retry it should succeed.
}
/** Return the total raw capacity of the filesystem, disregarding
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Jan 16 15:57:34 2007
@@ -766,6 +766,52 @@
return true;
}
+ /**
+ * Adds block to list of blocks which will be invalidated on
+ * specified datanode.
+ */
+ private void addToInvalidates(Block b, DatanodeInfo n) {
+ Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
+ if (invalidateSet == null) {
+ invalidateSet = new ArrayList<Block>();
+ recentInvalidateSets.put(n.getStorageID(), invalidateSet);
+ }
+ invalidateSet.add(b);
+ }
+
+ /**
+ * Invalidates the given block on the given datanode.
+ */
+ public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
+ throws IOException {
+ NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
+ + blk.getBlockName() + " on "
+ + dn.getName());
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
+ }
+
+ List<DatanodeDescriptor> containingNodes = blocksMap.get(blk);
+
+ // Check how many copies we have of the block. If we have at least one
+ // copy on a live node, then we can delete it.
+ if (containingNodes != null ) {
+ if ((countContainingNodes(containingNodes) > 1) ||
+ ((countContainingNodes(containingNodes) == 1) &&
+ (dn.isDecommissionInProgress() || dn.isDecommissioned()))) {
+ addToInvalidates(blk, dn);
+ removeStoredBlock(blk, getDatanode(dn));
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+ + blk.getBlockName() + " on "
+ + dn.getName() + " listed for deletion.");
+ } else {
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+ + blk.getBlockName() + " on "
+ + dn.getName() + " is the only copy and was not deleted.");
+ }
+ }
+ }
+
////////////////////////////////////////////////////////////////
// Here's how to handle block-copy failure during client write:
// -- As usual, the client's write should result in a streaming
@@ -807,12 +853,7 @@
if (containingNodes != null) {
for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
DatanodeDescriptor node = it.next();
- Collection<Block> invalidateSet = recentInvalidateSets.get(node.getStorageID());
- if (invalidateSet == null) {
- invalidateSet = new ArrayList<Block>();
- recentInvalidateSets.put(node.getStorageID(), invalidateSet);
- }
- invalidateSet.add(b);
+ addToInvalidates(b, node);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: "
+ b.getBlockName() + " is added to invalidSet of " + node.getName() );
}
@@ -1732,7 +1773,7 @@
// be-replicated list.
//
FSDirectory.INode fileINode = dir.getFileByBlock(block);
- if( fileINode != null && (containingNodes.size() < fileINode.getReplication())) {
+ if( fileINode != null && (countContainingNodes(containingNodes) < fileINode.getReplication())) {
synchronized (neededReplications) {
neededReplications.add(block);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Jan 16 15:57:34 2007
@@ -314,6 +314,25 @@
throw new IOException("Could not complete write to file " + src + " by " + clientName);
}
}
+
+ /**
+ * The client has detected an error on the specified located blocks
+ * and is reporting them to the server. For now, the namenode will
+ * delete the blocks from the datanodes. In the future we might
+ * check the blocks are actually corrupt.
+ */
+ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+ stateChangeLog.debug("*DIR* NameNode.reportBadBlocks");
+ for (int i = 0; i < blocks.length; i++) {
+ Block blk = blocks[i].getBlock();
+ DatanodeInfo[] nodes = blocks[i].getLocations();
+ for (int j = 0; j < nodes.length; j++) {
+ DatanodeInfo dn = nodes[j];
+ namesystem.invalidateBlock(blk, dn);
+ }
+ }
+ }
+
/**
*/
public String[][] getHints(String src, long start, long len) throws IOException {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java Tue Jan 16 15:57:34 2007
@@ -46,6 +46,7 @@
private FSDataInputStream sums;
private Checksum sum = new CRC32();
private int inSum;
+ private FSInputStream sumsIn;
public Checker(FileSystem fs, Path file, Configuration conf)
throws IOException {
@@ -55,7 +56,8 @@
this.file = file;
Path sumFile = FileSystem.getChecksumFile(file);
try {
- this.sums = new FSDataInputStream(fs.openRaw(sumFile), conf);
+ sumsIn = fs.openRaw(sumFile);
+ this.sums = new FSDataInputStream(sumsIn, conf);
byte[] version = new byte[VERSION.length];
sums.readFully(version);
if (!Arrays.equals(version, VERSION))
@@ -134,7 +136,7 @@
if (crc != sumValue) {
long pos = getPos() - delta;
fs.reportChecksumFailure(file, (FSInputStream)in,
- pos, bytesPerSum, crc);
+ pos, sumsIn, pos/bytesPerSum) ;
throw new ChecksumException("Checksum error: "+file+" at "+pos);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Tue Jan 16 15:57:34 2007
@@ -879,13 +879,13 @@
* Report a checksum error to the file system.
* @param f the file name containing the error
* @param in the stream open on the file
- * @param start the position of the beginning of the bad data in the file
- * @param length the length of the bad data in the file
- * @param crc the expected CRC32 of the data
+ * @param inPos the position of the beginning of the bad data in the file
+ * @param sums the stream open on the checksum file
+ * @param sumsPos the position of the beginning of the bad data in the checksum file
*/
- public abstract void reportChecksumFailure(Path f, FSInputStream in,
- long start, long length,
- int crc);
+ public abstract void reportChecksumFailure(Path f,
+ FSInputStream in, long inPos,
+ FSInputStream sums, long sumsPos);
/**
* Get the size for a particular file.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Tue Jan 16 15:57:34 2007
@@ -367,7 +367,8 @@
/** Moves files to a bad file directory on the same device, so that their
* storage will not be reused. */
public void reportChecksumFailure(Path p, FSInputStream in,
- long start, long length, int crc) {
+ long inPos,
+ FSInputStream sums, long sumsPos) {
try {
// canonicalize f
File f = pathToFile(p).getCanonicalFile();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Tue Jan 16 15:57:34 2007
@@ -289,8 +289,9 @@
}
@Override
- public void reportChecksumFailure(Path path, FSInputStream in,
- long start, long length, int crc) {
+ public void reportChecksumFailure(Path f,
+ FSInputStream in, long inPos,
+ FSInputStream sums, long sumsPos) {
// TODO: What to do here?
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Tue Jan 16 15:57:34 2007
@@ -388,9 +388,11 @@
}
@Override
- public void reportChecksumFailure(
- Path f, FSInputStream in, long start, long length, int crc) {
- baseFS.reportChecksumFailure(f, in, start, length, crc);
+
+ public void reportChecksumFailure(Path f,
+ FSInputStream in, long inPos,
+ FSInputStream sums, long sumsPos) {
+ baseFS.reportChecksumFailure(f, in, inPos, sums, sumsPos);
}
@Override