You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ra...@apache.org on 2009/07/14 22:18:08 UTC
svn commit: r794054 - in /hadoop/hdfs/trunk: CHANGES.txt
src/java/org/apache/hadoop/hdfs/DFSClient.java
src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java
Author: rangadi
Date: Tue Jul 14 20:18:07 2009
New Revision: 794054
URL: http://svn.apache.org/viewvc?rev=794054&view=rev
Log:
HDFS-445. pread() does not pick up changes to block locations. (Kan Zhang via rangadi)
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=794054&r1=794053&r2=794054&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Jul 14 20:18:07 2009
@@ -77,6 +77,9 @@
HDFS-489. Update TestHDFSCLI for the -skipTrash option in rm. (Jakob Homan
via szetszwo)
+ HDFS-445. pread() does not pick up changes to block locations.
+ (Kan Zhang via rangadi)
+
Release 0.20.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=794054&r1=794053&r2=794054&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jul 14 20:18:07 2009
@@ -1506,10 +1506,12 @@
* Fetch it from the namenode if not cached.
*
* @param offset
+ * @param updatePosition whether to update current position
* @return located block
* @throws IOException
*/
- private synchronized LocatedBlock getBlockAt(long offset) throws IOException {
+ private synchronized LocatedBlock getBlockAt(long offset,
+ boolean updatePosition) throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";
// search cached blocks first
int targetBlockIdx = locatedBlocks.findBlock(offset);
@@ -1523,14 +1525,16 @@
}
LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
// update current position
- this.pos = offset;
- this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
- this.currentBlock = blk.getBlock();
+ if (updatePosition) {
+ this.pos = offset;
+ this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
+ this.currentBlock = blk.getBlock();
+ }
return blk;
}
/** Fetch a block from namenode and cache it */
- private synchronized void fetchAndCacheBlockAt(long offset) throws IOException {
+ private synchronized void fetchBlockAt(long offset) throws IOException {
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
@@ -1544,17 +1548,6 @@
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
}
- /** Fetch a block without caching */
- private LocatedBlock fetchBlockAt(long offset) throws IOException {
- LocatedBlocks newBlocks;
- newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
- if (newBlocks == null) {
- throw new IOException("Could not find target position " + offset);
- }
- int index = newBlocks.findBlock(offset);
- return newBlocks.get(index);
- }
-
/**
* Get blocks in the specified range.
* Fetch them from the namenode if not cached.
@@ -1624,7 +1617,7 @@
//
// Compute desired block
//
- LocatedBlock targetBlock = getBlockAt(target);
+ LocatedBlock targetBlock = getBlockAt(target, true);
assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset();
@@ -1659,7 +1652,7 @@
* access key from its memory since it's considered expired based on
* the estimated expiration date.
*/
- fetchAndCacheBlockAt(target);
+ fetchBlockAt(target);
} else {
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
@@ -1821,13 +1814,16 @@
if (nodes == null || nodes.length == 0) {
LOG.info("No node available for block: " + blockInfo);
}
- LOG.info("Could not obtain block " + block.getBlock() + " from any node: " + ie);
+ LOG.info("Could not obtain block " + block.getBlock()
+ + " from any node: " + ie
+ + ". Will get new block locations from namenode and retry...");
try {
Thread.sleep(3000);
} catch (InterruptedException iex) {
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
openInfo();
+ block = getBlockAt(block.getStartOffset(), false);
failures++;
continue;
}
@@ -1840,11 +1836,13 @@
// Connect to best DataNode for desired Block, with potential offset
//
Socket dn = null;
- int numAttempts = block.getLocations().length;
- IOException ioe = null;
int refetchToken = 1; // only need to get a new access token once
- while (dn == null && numAttempts-- > 0 ) {
+ while (true) {
+ // cached block locations may have been updated by chooseDataNode()
+ // or fetchBlockAt(). Always get the latest list of locations at the
+ // start of the loop.
+ block = getBlockAt(block.getStartOffset(), false);
DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
@@ -1871,21 +1869,18 @@
}
return;
} catch (ChecksumException e) {
- ioe = e;
LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
src + " at " + block.getBlock() + ":" +
e.getPos() + " from " + chosenNode.getName());
reportChecksumFailure(src, block.getBlock(), chosenNode);
} catch (IOException e) {
- ioe = e;
if (e instanceof InvalidAccessTokenException && refetchToken-- > 0) {
LOG.info("Invalid access token when connecting to " + targetAddr
+ " for file " + src + " for block "
+ block.getBlock() + ":"
+ StringUtils.stringifyException(e)
+ ", get a new access token and retry...");
- block = fetchBlockAt(block.getStartOffset());
- numAttempts = block.getLocations().length;
+ fetchBlockAt(block.getStartOffset());
continue;
} else {
LOG.warn("Failed to connect to " + targetAddr + " for file " + src
@@ -1895,12 +1890,10 @@
} finally {
IOUtils.closeStream(reader);
IOUtils.closeSocket(dn);
- dn = null;
}
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
- throw (ioe == null) ? new IOException("Could not read data") : ioe;
}
/**
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=794054&r1=794053&r2=794054&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Jul 14 20:18:07 2009
@@ -657,6 +657,18 @@
}
/*
+ * Restart all datanodes
+ */
+ public synchronized boolean restartDataNodes() throws IOException {
+ for (int i = dataNodes.size()-1; i >= 0; i--) {
+ System.out.println("Restarting DataNode " + i);
+ if (!restartDataNode(i))
+ return false;
+ }
+ return true;
+ }
+
+ /*
* Shutdown a datanode by name.
*/
public synchronized DataNodeProperties stopDataNode(String name) {
@@ -731,7 +743,7 @@
while(client.datanodeReport(DatanodeReportType.LIVE).length
!= numDataNodes) {
try {
- Thread.sleep(500);
+ Thread.sleep(100);
} catch (Exception e) {
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java?rev=794054&r1=794053&r2=794054&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPread.java Tue Jul 14 20:18:07 2009
@@ -154,6 +154,37 @@
stm.close();
}
+
+ // test pread can survive datanode restarts
+ private void datanodeRestartTest(MiniDFSCluster cluster, FileSystem fileSys,
+ Path name) throws IOException {
+ // skip this test if using simulated storage since simulated blocks
+ // don't survive datanode restarts.
+ if (simulatedStorage) {
+ return;
+ }
+ int numBlocks = 1;
+ assertTrue(numBlocks <= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES);
+ byte[] expected = new byte[numBlocks * blockSize];
+ Random rand = new Random(seed);
+ rand.nextBytes(expected);
+ byte[] actual = new byte[numBlocks * blockSize];
+ FSDataInputStream stm = fileSys.open(name);
+ // read a block and get block locations cached as a result
+ stm.readFully(0, actual);
+ checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Setup");
+ // restart all datanodes. it is expected that they will
+ // restart on different ports, hence, cached block locations
+ // will no longer work.
+ assertTrue(cluster.restartDataNodes());
+ cluster.waitActive();
+ // verify the block can be read again using the same InputStream
+ // (via re-fetching of block locations from namenode). there is a
+ // 3 sec sleep in chooseDataNode(), which can be shortened for
+ // this test if configurable.
+ stm.readFully(0, actual);
+ checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test");
+ }
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(fileSys.exists(name));
@@ -185,6 +216,7 @@
Path file1 = new Path("preadtest.dat");
writeFile(fileSys, file1);
pReadFile(fileSys, file1);
+ datanodeRestartTest(cluster, fileSys, file1);
cleanupFile(fileSys, file1);
} finally {
fileSys.close();
@@ -195,7 +227,7 @@
public void testPreadDFSSimulated() throws IOException {
simulatedStorage = true;
testPreadDFS();
- simulatedStorage = true;
+ simulatedStorage = false;
}
/**