You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/26 22:49:39 UTC
svn commit: r500370 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/
src/java/org/apache/hadoop/fs/s3/
Author: cutting
Date: Fri Jan 26 13:49:38 2007
New Revision: 500370
URL: http://svn.apache.org/viewvc?view=rev&rev=500370
Log:
HADOOP-731. When a checksum error is encountered on a file stored in HDFS, try to find another replica. Contributed by Wendy.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 26 13:49:38 2007
@@ -63,6 +63,10 @@
19. HADOOP-909. Fix the 'du' command to correctly compute the size of
FileSystem directory trees. (Hairong Kuang via cutting)
+20. HADOOP-731. When a checksum error is encountered on a file stored
+ in HDFS, try another replica of the data, if any.
+ (Wendy Chien via cutting)
+
Release 0.10.1 - 2007-01-10
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Fri Jan 26 13:49:38 2007
@@ -618,7 +618,7 @@
DNAddrPair retval = chooseDataNode(targetBlock, deadNodes);
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
-
+
try {
s = new Socket();
s.connect(targetAddr, READ_TIMEOUT);
@@ -764,7 +764,7 @@
if (nodes[blockId] == null || nodes[blockId].length == 0) {
LOG.info("No node available for block: " + blockInfo);
}
- LOG.info("Could not obtain block from any node: " + ie);
+ LOG.info("Could not obtain block " + blockId + " from any node: " + ie);
try {
Thread.sleep(3000);
} catch (InterruptedException iex) {
@@ -889,6 +889,24 @@
blockEnd = -1;
}
+ /**
+ * Seek to given position on a node other than the current node. If
+ * a node other than the current node is found, then returns true.
+ * If another node could not be found, then returns false.
+ */
+ public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+ TreeSet excludeNodes = new TreeSet();
+ excludeNodes.add(currentNode);
+ String oldNodeID = currentNode.getStorageID();
+ DatanodeInfo newNode = blockSeekTo(targetPos, excludeNodes);
+ if (!oldNodeID.equals(newNode.getStorageID())) {
+ currentNode = newNode;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
/**
*/
public synchronized long getPos() throws IOException {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java Fri Jan 26 13:49:38 2007
@@ -90,34 +90,61 @@
}
public int read(byte b[], int off, int len) throws IOException {
- int read = in.read(b, off, len);
+ int read;
+ boolean retry;
+ int retriesLeft = 3;
+ long oldPos = getPos();
+ do {
+ retriesLeft--;
+ retry = false;
- if (sums != null) {
- int summed = 0;
- while (summed < read) {
-
- int goal = bytesPerSum - inSum;
- int inBuf = read - summed;
- int toSum = inBuf <= goal ? inBuf : goal;
-
+ read = in.read(b, off, len);
+
+ if (sums != null) {
+ long oldSumsPos = sums.getPos();
try {
- sum.update(b, off+summed, toSum);
- } catch (ArrayIndexOutOfBoundsException e) {
- throw new RuntimeException("Summer buffer overflow b.len=" +
- b.length + ", off=" + off +
- ", summed=" + summed + ", read=" +
- read + ", bytesPerSum=" + bytesPerSum +
- ", inSum=" + inSum, e);
- }
- summed += toSum;
+ int summed = 0;
+ while (summed < read) {
+ int goal = bytesPerSum - inSum;
+ int inBuf = read - summed;
+ int toSum = inBuf <= goal ? inBuf : goal;
- inSum += toSum;
- if (inSum == bytesPerSum) {
- verifySum(read-(summed-bytesPerSum));
+ try {
+ sum.update(b, off+summed, toSum);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new RuntimeException("Summer buffer overflow b.len=" +
+ b.length + ", off=" + off +
+ ", summed=" + summed + ", read=" +
+ read + ", bytesPerSum=" + bytesPerSum +
+ ", inSum=" + inSum, e);
+ }
+ summed += toSum;
+
+ inSum += toSum;
+ if (inSum == bytesPerSum) {
+ verifySum(read-(summed-bytesPerSum));
+ }
+ }
+ } catch (ChecksumException ce) {
+ LOG.info("Found checksum error: " + StringUtils.stringifyException(ce));
+ if (retriesLeft == 0) {
+ throw ce;
+ }
+ sums.seek(oldSumsPos);
+ if (!((FSInputStream)in).seekToNewSource(oldPos) ||
+ !((FSInputStream)sumsIn).seekToNewSource(oldSumsPos)) {
+ // Neither the data stream nor the checksum stream are being read from
+ // different sources, meaning we'll still get a checksum error if we
+ // try to do the read again. We throw an exception instead.
+ throw ce;
+ } else {
+ // Since at least one of the sources is different, the read might succeed,
+ // so we'll retry.
+ retry = true;
+ }
}
}
- }
-
+ } while (retry);
return read;
}
@@ -270,7 +297,11 @@
public FSDataInputStream(FileSystem fs, Path file, int bufferSize, Configuration conf)
throws IOException {
super(null);
- this.in = new Buffer(new PositionCache(new Checker(fs, file, conf)), bufferSize);
+ Checker chkr = new Checker(fs, file, conf); // sets bytesPerSum
+ if (bufferSize % bytesPerSum != 0) {
+ throw new IOException("Buffer size must be multiple of " + bytesPerSum);
+ }
+ this.in = new Buffer(new PositionCache(chkr), bufferSize);
}
@@ -278,7 +309,11 @@
throws IOException {
super(null);
int bufferSize = conf.getInt("io.file.buffer.size", 4096);
- this.in = new Buffer(new PositionCache(new Checker(fs, file, conf)), bufferSize);
+ Checker chkr = new Checker(fs, file, conf);
+ if (bufferSize % bytesPerSum != 0) {
+ throw new IOException("Buffer size must be multiple of " + bytesPerSum);
+ }
+ this.in = new Buffer(new PositionCache(chkr), bufferSize);
}
/** Construct without checksums. */
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java Fri Jan 26 13:49:38 2007
@@ -38,7 +38,13 @@
* Return the current offset from the start of the file
*/
public abstract long getPos() throws IOException;
-
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ */
+ public abstract boolean seekToNewSource(long targetPos) throws IOException;
+
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
synchronized (this) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Fri Jan 26 13:49:38 2007
@@ -114,6 +114,10 @@
din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
}
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
public int available() throws IOException {
return din.available();
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Fri Jan 26 13:49:38 2007
@@ -95,6 +95,10 @@
return fis.getChannel().position();
}
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
/*
* Just forward to the fis
*/
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java Fri Jan 26 13:49:38 2007
@@ -61,6 +61,11 @@
}
@Override
+ public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ @Override
public synchronized int read() throws IOException {
if (closed) {
throw new IOException("Stream closed");