You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/03/27 06:07:39 UTC

svn commit: r641706 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/net/ src/test/org/apache/hadoop/net/

Author: rangadi
Date: Wed Mar 26 22:07:31 2008
New Revision: 641706

URL: http://svn.apache.org/viewvc?rev=641706&view=rev
Log:
HADOOP-3073. close() on SocketInputStream or SocketOutputStream should close the underlying channel. (rangadi)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java
    hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java
    hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=641706&r1=641705&r2=641706&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 26 22:07:31 2008
@@ -380,6 +380,9 @@
     HADOOP-3067. DFSInputStream's position read does not close the sockets.
     (rangadi)
 
+    HADOOP-3073. close() on SocketInputStream or SocketOutputStream should
+    close the underlying channel. (rangadi)
+
 Release 0.16.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=641706&r1=641705&r2=641706&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Mar 26 22:07:31 2008
@@ -934,17 +934,13 @@
       DataOutputStream out = new DataOutputStream(
         new BufferedOutputStream(NetUtils.getOutputStream(sock,WRITE_TIMEOUT)));
 
-      try {
-        //write the header.
-        out.writeShort( DATA_TRANSFER_VERSION );
-        out.write( OP_READ_BLOCK );
-        out.writeLong( blockId );
-        out.writeLong( startOffset );
-        out.writeLong( len );
-        out.flush();
-      } finally {
-        IOUtils.closeStream(out);
-      }
+      //write the header.
+      out.writeShort( DATA_TRANSFER_VERSION );
+      out.write( OP_READ_BLOCK );
+      out.writeLong( blockId );
+      out.writeLong( startOffset );
+      out.writeLong( len );
+      out.flush();
       
       //
       // Get bytes in block, set streams

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java?rev=641706&r1=641705&r2=641706&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java Wed Mar 26 22:07:31 2008
@@ -39,7 +39,7 @@
 public class SocketInputStream extends InputStream
                                implements ReadableByteChannel {
 
-  private SocketIOWithTimeout reader;
+  private Reader reader;
 
   private static class Reader extends SocketIOWithTimeout {
     ReadableByteChannel channel;
@@ -121,10 +121,23 @@
     return read(ByteBuffer.wrap(b, off, len));
   }
 
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
+    /* close the channel since Socket.getInputStream().close()
+     * closes the socket.
+     */
+    reader.channel.close();
     reader.close();
   }
 
+  /**
+   * Returns underlying channel used by inputstream.
+   * This is useful in certain cases like channel for 
+   * {@link FileChannel#transferFrom(ReadableByteChannel, long, long)}.
+   */
+  public ReadableByteChannel getChannel() {
+    return reader.channel; 
+  }
+  
   //ReadableByteChannel interface
     
   public boolean isOpen() {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java?rev=641706&r1=641705&r2=641706&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java Wed Mar 26 22:07:31 2008
@@ -38,7 +38,7 @@
 public class SocketOutputStream extends OutputStream 
                                 implements WritableByteChannel {                                
   
-  private SocketIOWithTimeout writer;
+  private Writer writer;
   
   private static class Writer extends SocketIOWithTimeout {
     WritableByteChannel channel;
@@ -116,8 +116,21 @@
     }
   }
 
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
+    /* close the channel since Socket.getOuputStream().close() 
+     * closes the socket.
+     */
+    writer.channel.close();
     writer.close();
+  }
+
+  /**
+   * Returns underlying channel used by this stream.
+   * This is useful in certain cases like channel for 
+   * {@link FileChannel#transferTo(long, long, WritableByteChannel)}
+   */
+  public WritableByteChannel getChannel() {
+    return writer.channel; 
   }
 
   //WritableByteChannle interface 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java?rev=641706&r1=641705&r2=641706&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java Wed Mar 26 22:07:31 2008
@@ -129,6 +129,16 @@
         throw new IOException("Unexpected InterruptedException : " + e);
       }
       
+      //make sure the channels are still open
+      assertTrue(source.isOpen());
+      assertTrue(sink.isOpen());
+      
+      // make sure close() closes the underlying channel.
+      in.close();
+      assertFalse(source.isOpen());
+      out.close();
+      assertFalse(sink.isOpen());
+      
     } finally {
       if (source != null) {
         source.close();