You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/03/27 05:51:51 UTC

svn commit: r641705 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/DFSClient.java

Author: rangadi
Date: Wed Mar 26 21:51:49 2008
New Revision: 641705

URL: http://svn.apache.org/viewvc?rev=641705&view=rev
Log:
HADOOP-3067. DFSInputStream's position read does not close the sockets. (rangadi)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=641705&r1=641704&r2=641705&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 26 21:51:49 2008
@@ -377,6 +377,9 @@
     HADOOP-3094. Fix BytesWritable.toString to avoid extending the sign bit
     (Owen O'Malley via cdouglas)
 
+    HADOOP-3067. DFSInputStream's position read does not close the sockets.
+    (rangadi)
+
 Release 0.16.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=641705&r1=641704&r2=641705&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Mar 26 21:51:49 2008
@@ -696,6 +696,7 @@
    */
   static class BlockReader extends FSInputChecker {
 
+    private Socket dnSock; //for now just sending checksumOk.
     private DataInputStream in;
     private DataChecksum checksum;
     private long lastChunkOffset = -1;
@@ -739,7 +740,12 @@
         }
       }
       
-      return super.read(buf, off, len);
+      int nRead = super.read(buf, off, len);
+      if (nRead >= 0 && gotEOS && needChecksum()) {
+        //checksum is verified and there are no errors.
+        checksumOk(dnSock);
+      }
+      return nRead;
     }
 
     @Override
@@ -892,13 +898,15 @@
     
     private BlockReader( String file, long blockId, DataInputStream in, 
                          DataChecksum checksum, boolean verifyChecksum,
-                         long startOffset, long firstChunkOffset ) {
+                         long startOffset, long firstChunkOffset, 
+                         Socket dnSock ) {
       super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
             1, verifyChecksum,
             checksum.getChecksumSize() > 0? checksum : null, 
             checksum.getBytesPerChecksum(),
             checksum.getChecksumSize());
       
+      this.dnSock = dnSock;
       this.in = in;
       this.checksum = checksum;
       this.startOffset = Math.max( startOffset, 0 );
@@ -965,7 +973,7 @@
       }
 
       return new BlockReader( file, blockId, in, checksum, verifyChecksum,
-                              startOffset, firstChunkOffset );
+                              startOffset, firstChunkOffset, sock );
     }
 
     @Override
@@ -986,7 +994,7 @@
      * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that 
      * checksum was verified and there was no error.
      */ 
-    void checksumOk(Socket sock) {
+    private void checksumOk(Socket sock) {
       try {
         OutputStream out = NetUtils.getOutputStream(sock, WRITE_TIMEOUT);
         byte buf[] = { (OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
@@ -1297,9 +1305,6 @@
             
             if (result >= 0) {
               pos += result;
-              if ( pos > blockEnd ) {
-                blockReader.checksumOk(s);
-              }
             } else {
               // got a EOS from reader though we expect more data on it.
               throw new IOException("Unexpected EOS from the reader");
@@ -1368,6 +1373,7 @@
         DNAddrPair retval = chooseDataNode(block);
         DatanodeInfo chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
+        BlockReader reader = null;
             
         try {
           dn = socketFactory.createSocket();
@@ -1376,9 +1382,10 @@
               
           int len = (int) (end - start + 1);
               
-          BlockReader reader = 
-            BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(),
-                                       start, len, buffersize, verifyChecksum);
+          reader = BlockReader.newBlockReader(dn, src, 
+                                              block.getBlock().getBlockId(),
+                                              start, len, buffersize, 
+                                              verifyChecksum);
           int nread = reader.readAll(buf, offset, len);
           if (nread != len) {
             throw new IOException("truncated return from reader.read(): " +
@@ -1397,16 +1404,13 @@
                    " for file " + src + 
                    " for block " + block.getBlock().getBlockId() + ":"  +
                    StringUtils.stringifyException(e));
-        } 
-        // Put chosen node into dead list, continue
-        addToDeadNodes(chosenNode);
-        if (dn != null) {
-          try {
-            dn.close();
-          } catch (IOException iex) {
-          }
+        } finally {
+          IOUtils.closeStream(reader);
+          IOUtils.closeSocket(dn);
           dn = null;
         }
+        // Put chosen node into dead list, continue
+        addToDeadNodes(chosenNode);
       }
       throw (ioe == null) ? new IOException("Could not read data") : ioe;
     }