You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/03/27 05:51:51 UTC
svn commit: r641705 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/dfs/DFSClient.java
Author: rangadi
Date: Wed Mar 26 21:51:49 2008
New Revision: 641705
URL: http://svn.apache.org/viewvc?rev=641705&view=rev
Log:
HADOOP-3067. DFSInputStream's position read does not close the sockets. (rangadi)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=641705&r1=641704&r2=641705&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 26 21:51:49 2008
@@ -377,6 +377,9 @@
HADOOP-3094. Fix BytesWritable.toString to avoid extending the sign bit
(Owen O'Malley via cdouglas)
+ HADOOP-3067. DFSInputStream's position read does not close the sockets.
+ (rangadi)
+
Release 0.16.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=641705&r1=641704&r2=641705&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Mar 26 21:51:49 2008
@@ -696,6 +696,7 @@
*/
static class BlockReader extends FSInputChecker {
+ private Socket dnSock; //for now just sending checksumOk.
private DataInputStream in;
private DataChecksum checksum;
private long lastChunkOffset = -1;
@@ -739,7 +740,12 @@
}
}
- return super.read(buf, off, len);
+ int nRead = super.read(buf, off, len);
+ if (nRead >= 0 && gotEOS && needChecksum()) {
+ //checksum is verified and there are no errors.
+ checksumOk(dnSock);
+ }
+ return nRead;
}
@Override
@@ -892,13 +898,15 @@
private BlockReader( String file, long blockId, DataInputStream in,
DataChecksum checksum, boolean verifyChecksum,
- long startOffset, long firstChunkOffset ) {
+ long startOffset, long firstChunkOffset,
+ Socket dnSock ) {
super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
1, verifyChecksum,
checksum.getChecksumSize() > 0? checksum : null,
checksum.getBytesPerChecksum(),
checksum.getChecksumSize());
+ this.dnSock = dnSock;
this.in = in;
this.checksum = checksum;
this.startOffset = Math.max( startOffset, 0 );
@@ -965,7 +973,7 @@
}
return new BlockReader( file, blockId, in, checksum, verifyChecksum,
- startOffset, firstChunkOffset );
+ startOffset, firstChunkOffset, sock );
}
@Override
@@ -986,7 +994,7 @@
* errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that
* checksum was verified and there was no error.
*/
- void checksumOk(Socket sock) {
+ private void checksumOk(Socket sock) {
try {
OutputStream out = NetUtils.getOutputStream(sock, WRITE_TIMEOUT);
byte buf[] = { (OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
@@ -1297,9 +1305,6 @@
if (result >= 0) {
pos += result;
- if ( pos > blockEnd ) {
- blockReader.checksumOk(s);
- }
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
@@ -1368,6 +1373,7 @@
DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
+ BlockReader reader = null;
try {
dn = socketFactory.createSocket();
@@ -1376,9 +1382,10 @@
int len = (int) (end - start + 1);
- BlockReader reader =
- BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(),
- start, len, buffersize, verifyChecksum);
+ reader = BlockReader.newBlockReader(dn, src,
+ block.getBlock().getBlockId(),
+ start, len, buffersize,
+ verifyChecksum);
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
@@ -1397,16 +1404,13 @@
" for file " + src +
" for block " + block.getBlock().getBlockId() + ":" +
StringUtils.stringifyException(e));
- }
- // Put chosen node into dead list, continue
- addToDeadNodes(chosenNode);
- if (dn != null) {
- try {
- dn.close();
- } catch (IOException iex) {
- }
+ } finally {
+ IOUtils.closeStream(reader);
+ IOUtils.closeSocket(dn);
dn = null;
}
+ // Put chosen node into dead list, continue
+ addToDeadNodes(chosenNode);
}
throw (ioe == null) ? new IOException("Could not read data") : ioe;
}