You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2010/06/16 22:53:12 UTC
svn commit: r955377 - in /hadoop/common/branches/branch-0.20-append:
CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
src/test/org/apache/hadoop/hdfs/TestPread.java
Author: dhruba
Date: Wed Jun 16 20:53:12 2010
New Revision: 955377
URL: http://svn.apache.org/viewvc?rev=955377&view=rev
Log:
HDFS-445. pread should refetch block locations when necessary.
(Todd Lipcon via dhruba)
Modified:
hadoop/common/branches/branch-0.20-append/CHANGES.txt
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestPread.java
Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=955377&r1=955376&r2=955377&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Wed Jun 16 20:53:12 2010
@@ -28,6 +28,9 @@ Release 0.20-append - Unreleased
HDFS-1054. remove sleep before retry for allocating a block.
(Todd Lipcon via dhruba)
+ HDFS-445. pread should refetch block locations when necessary.
+ (Todd Lipcon via dhruba)
+
IMPROVEMENTS
BUG FIXES
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=955377&r1=955376&r2=955377&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Wed Jun 16 20:53:12 2010
@@ -1584,10 +1584,12 @@ public class DFSClient implements FSCons
* Fetch it from the namenode if not cached.
*
* @param offset
+ * @param updatePosition whether to update current position
* @return located block
* @throws IOException
*/
- private LocatedBlock getBlockAt(long offset) throws IOException {
+ private synchronized LocatedBlock getBlockAt(long offset,
+ boolean updatePosition) throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";
// search cached blocks first
int targetBlockIdx = locatedBlocks.findBlock(offset);
@@ -1601,9 +1603,11 @@ public class DFSClient implements FSCons
}
LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
// update current position
- this.pos = offset;
- this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
- this.currentBlock = blk.getBlock();
+ if (updatePosition) {
+ this.pos = offset;
+ this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
+ this.currentBlock = blk.getBlock();
+ }
return blk;
}
@@ -1670,7 +1674,7 @@ public class DFSClient implements FSCons
//
// Compute desired block
//
- LocatedBlock targetBlock = getBlockAt(target);
+ LocatedBlock targetBlock = getBlockAt(target, true);
assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset();
@@ -1859,13 +1863,16 @@ public class DFSClient implements FSCons
if (nodes == null || nodes.length == 0) {
LOG.info("No node available for block: " + blockInfo);
}
- LOG.info("Could not obtain block " + block.getBlock() + " from any node: " + ie);
+ LOG.info("Could not obtain block " + block.getBlock()
+ + " from any node: " + ie
+ + ". Will get new block locations from namenode and retry...");
try {
Thread.sleep(3000);
} catch (InterruptedException iex) {
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
openInfo();
+ block = getBlockAt(block.getStartOffset(), false);
failures++;
continue;
}
@@ -1878,10 +1885,13 @@ public class DFSClient implements FSCons
// Connect to best DataNode for desired Block, with potential offset
//
Socket dn = null;
- int numAttempts = block.getLocations().length;
- IOException ioe = null;
-
- while (dn == null && numAttempts-- > 0 ) {
+
+ while (true) {
+ // cached block locations may have been updated by chooseDataNode()
+ // or fetchBlockAt(). Always get the latest list of locations at the
+ // start of the loop.
+ block = getBlockAt(block.getStartOffset(), false);
+
DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
@@ -1906,13 +1916,11 @@ public class DFSClient implements FSCons
}
return;
} catch (ChecksumException e) {
- ioe = e;
LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
src + " at " + block.getBlock() + ":" +
e.getPos() + " from " + chosenNode.getName());
reportChecksumFailure(src, block.getBlock(), chosenNode);
} catch (IOException e) {
- ioe = e;
LOG.warn("Failed to connect to " + targetAddr +
" for file " + src +
" for block " + block.getBlock().getBlockId() + ":" +
@@ -1920,12 +1928,10 @@ public class DFSClient implements FSCons
} finally {
IOUtils.closeStream(reader);
IOUtils.closeSocket(dn);
- dn = null;
}
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
- throw (ioe == null) ? new IOException("Could not read data") : ioe;
}
/**
Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=955377&r1=955376&r2=955377&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Jun 16 20:53:12 2010
@@ -652,6 +652,18 @@ public class MiniDFSCluster {
}
/*
+ * Restart all datanodes
+ */
+ public synchronized boolean restartDataNodes() throws IOException {
+ for (int i = dataNodes.size()-1; i >= 0; i--) {
+ System.out.println("Restarting DataNode " + i);
+ if (!restartDataNode(i))
+ return false;
+ }
+ return true;
+ }
+
+ /*
* Shutdown a datanode by name.
*/
public synchronized DataNodeProperties stopDataNode(String name) {
Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestPread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestPread.java?rev=955377&r1=955376&r2=955377&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestPread.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestPread.java Wed Jun 16 20:53:12 2010
@@ -151,6 +151,37 @@ public class TestPread extends TestCase
stm.close();
}
+
+ // test pread can survive datanode restarts
+ private void datanodeRestartTest(MiniDFSCluster cluster, FileSystem fileSys,
+ Path name) throws IOException {
+ // skip this test if using simulated storage since simulated blocks
+ // don't survive datanode restarts.
+ if (simulatedStorage) {
+ return;
+ }
+ int numBlocks = 1;
+ assertTrue(numBlocks <= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES);
+ byte[] expected = new byte[numBlocks * blockSize];
+ Random rand = new Random(seed);
+ rand.nextBytes(expected);
+ byte[] actual = new byte[numBlocks * blockSize];
+ FSDataInputStream stm = fileSys.open(name);
+ // read a block and get block locations cached as a result
+ stm.readFully(0, actual);
+ checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Setup");
+ // restart all datanodes. it is expected that they will
+ // restart on different ports, hence, cached block locations
+ // will no longer work.
+ assertTrue(cluster.restartDataNodes());
+ cluster.waitActive();
+ // verify the block can be read again using the same InputStream
+ // (via re-fetching of block locations from namenode). there is a
+ // 3 sec sleep in chooseDataNode(), which can be shortened for
+ // this test if configurable.
+ stm.readFully(0, actual);
+ checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test");
+ }
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(fileSys.exists(name));
@@ -182,6 +213,7 @@ public class TestPread extends TestCase
Path file1 = new Path("preadtest.dat");
writeFile(fileSys, file1);
pReadFile(fileSys, file1);
+ datanodeRestartTest(cluster, fileSys, file1);
cleanupFile(fileSys, file1);
} finally {
fileSys.close();
@@ -192,7 +224,7 @@ public class TestPread extends TestCase
public void testPreadDFSSimulated() throws IOException {
simulatedStorage = true;
testPreadDFS();
- simulatedStorage = true;
+ simulatedStorage = false;
}
/**