You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/03/06 05:18:29 UTC

svn commit: r634153 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/net/ src/test/org/apache/hadoop/net/

Author: rangadi
Date: Wed Mar  5 20:18:26 2008
New Revision: 634153

URL: http://svn.apache.org/viewvc?rev=634153&view=rev
Log:
HADOOP-2346. Utilities to support timeout while writing to sockets.
DFSClient and DataNode sockets have 10min write timeout. (rangadi)

Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java
    hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java
    hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java
    hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java
Removed:
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
    hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java
    hadoop/core/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar  5 20:18:26 2008
@@ -42,6 +42,9 @@
     config params to map records to different output files.
     (Runping Qi via cdouglas)
 
+    HADOOP-2346. Utilities to support timeout while writing to sockets.
+    DFSClient and DataNode sockets have 10min write timeout. (rangadi)
+    
   IMPROVEMENTS
 
     HADOOP-2655. Copy on write for data and metadata files in the 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java Wed Mar  5 20:18:26 2008
@@ -36,6 +36,7 @@
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.channels.SocketChannel;
 import java.util.Arrays;
 import java.util.zip.CRC32;
 import java.util.concurrent.*;
@@ -181,7 +182,7 @@
       Socket dnSock = null;
       try {
         InetSocketAddress dnAddr = NetUtils.createSocketAddr(dn.getName());
-        dnSock = new Socket();
+        dnSock = SocketChannel.open().socket();
         dnSock.connect(dnAddr, FSConstants.READ_TIMEOUT);
         dnSock.setSoTimeout(FSConstants.READ_TIMEOUT);
         DFSClient.BlockReader reader = DFSClient.BlockReader.newBlockReader

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Mar  5 20:18:26 2008
@@ -862,22 +862,27 @@
                                        throws IOException {
       // in and out will be closed when sock is closed (by the caller)
       DataOutputStream out = new DataOutputStream(
-                       new BufferedOutputStream(sock.getOutputStream()));
-
-      //write the header.
-      out.writeShort( DATA_TRANSFER_VERSION );
-      out.write( OP_READ_BLOCK );
-      out.writeLong( blockId );
-      out.writeLong( startOffset );
-      out.writeLong( len );
-      out.flush();
+        new BufferedOutputStream(NetUtils.getOutputStream(sock,WRITE_TIMEOUT)));
 
+      try {
+        //write the header.
+        out.writeShort( DATA_TRANSFER_VERSION );
+        out.write( OP_READ_BLOCK );
+        out.writeLong( blockId );
+        out.writeLong( startOffset );
+        out.writeLong( len );
+        out.flush();
+      } finally {
+        IOUtils.closeStream(out);
+      }
+      
       //
       // Get bytes in block, set streams
       //
 
       DataInputStream in = new DataInputStream(
-                   new BufferedInputStream(sock.getInputStream(), bufferSize));
+          new BufferedInputStream(NetUtils.getInputStream(sock), 
+                                  bufferSize));
       
       if ( in.readShort() != OP_STATUS_SUCCESS ) {
         throw new IOException("Got error in response to OP_READ_BLOCK " +
@@ -921,7 +926,7 @@
      */ 
     void checksumOk(Socket sock) {
       try {
-        OutputStream out = sock.getOutputStream();
+        OutputStream out = NetUtils.getOutputStream(sock, WRITE_TIMEOUT);
         byte buf[] = { (OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
                        (OP_STATUS_CHECKSUM_OK) & 0xff };
         out.write(buf);
@@ -2065,12 +2070,16 @@
         s.setSoTimeout(timeoutValue);
         s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
         LOG.debug("Send buf size " + s.getSendBufferSize());
+        long writeTimeout = WRITE_TIMEOUT_EXTENSION * nodes.length +
+                            WRITE_TIMEOUT;
 
         //
         // Xmit header info to datanode
         //
-        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize));
-        blockReplyStream = new DataInputStream(s.getInputStream());
+        DataOutputStream out = new DataOutputStream(
+            new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), 
+                                     buffersize));
+        blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
         out.writeShort( DATA_TRANSFER_VERSION );
         out.write( OP_WRITE_BLOCK );

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Mar  5 20:18:26 2008
@@ -26,6 +26,8 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketInputStream;
+import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -38,6 +40,8 @@
 import java.io.*;
 import java.net.*;
 import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.*;
 import java.util.concurrent.Semaphore;
 import java.security.NoSuchAlgorithmException;
@@ -249,7 +253,8 @@
 
       
     // find free port
-    ServerSocket ss = new ServerSocket(tmpPort, 0, socAddr.getAddress());
+    ServerSocket ss = ServerSocketChannel.open().socket();
+    Server.bind(ss, socAddr, 0);
     ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
     // adjust machine name with the actual port
     tmpPort = ss.getLocalPort();
@@ -812,7 +817,7 @@
   private static void receiveResponse(Socket s, int numTargets) throws IOException {
     // check the response
     DataInputStream reply = new DataInputStream(new BufferedInputStream(
-        s.getInputStream(), BUFFER_SIZE));
+        new SocketInputStream(s), BUFFER_SIZE));
     try {
       for (int i = 0; i < numTargets; i++) {
         short opStatus = reply.readShort();
@@ -828,7 +833,8 @@
 
   /* utility function for sending a respose */
   private static void sendResponse(Socket s, short opStatus) throws IOException {
-    DataOutputStream reply = new DataOutputStream(s.getOutputStream());
+    DataOutputStream reply = 
+      new DataOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT));
     try {
       reply.writeShort(opStatus);
       reply.flush();
@@ -926,7 +932,7 @@
       DataInputStream in=null; 
       try {
         in = new DataInputStream(
-            new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
+            new BufferedInputStream(new SocketInputStream(s), BUFFER_SIZE));
         short version = in.readShort();
         if ( version != DATA_TRANSFER_VERSION ) {
           throw new IOException( "Version Mismatch" );
@@ -994,7 +1000,8 @@
 
       // send the block
       DataOutputStream out = new DataOutputStream(
-            new BufferedOutputStream(s.getOutputStream(), SMALL_BUFFER_SIZE));
+            new BufferedOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT),
+                                     SMALL_BUFFER_SIZE));
       BlockSender blockSender = null;
       try {
         try {
@@ -1084,7 +1091,8 @@
             s.getInetAddress().toString(), isRecovery, client);
 
         // get a connection back to the previous target
-        replyOut = new DataOutputStream(s.getOutputStream());
+        replyOut = new DataOutputStream(
+                       new SocketOutputStream(s, WRITE_TIMEOUT));
 
         //
         // Open network conn to backup machine, if 
@@ -1095,16 +1103,19 @@
           // Connect to backup machine
           mirrorNode = targets[0].getName();
           mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
-          mirrorSock = new Socket();
+          mirrorSock = SocketChannel.open().socket();
           try {
             int timeoutValue = numTargets * socketTimeout;
+            int writeTimeout = WRITE_TIMEOUT + 
+                               (WRITE_TIMEOUT_EXTENSION * numTargets);
             mirrorSock.connect(mirrorTarget, timeoutValue);
             mirrorSock.setSoTimeout(timeoutValue);
             mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
             mirrorOut = new DataOutputStream(
-                          new BufferedOutputStream(mirrorSock.getOutputStream(),
-                                                   BUFFER_SIZE));
-            mirrorIn = new DataInputStream(mirrorSock.getInputStream());
+               new BufferedOutputStream(
+                           new SocketOutputStream(mirrorSock, writeTimeout),
+                           BUFFER_SIZE));
+            mirrorIn = new DataInputStream(new SocketInputStream(mirrorSock));
 
             // Write header: Copied from DFSClient.java!
             mirrorOut.writeShort( DATA_TRANSFER_VERSION );
@@ -1214,7 +1225,7 @@
         byte [] buf = new byte[(int)fileSize];
         IOUtils.readFully(checksumIn, buf, 0, buf.length);
         
-        out = new DataOutputStream(s.getOutputStream());
+        out = new DataOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT));
         
         out.writeByte(OP_STATUS_SUCCESS);
         out.writeInt(buf.length);
@@ -1224,6 +1235,7 @@
         out.writeInt(0);
       } finally {
         xceiverCount.decr();
+        IOUtils.closeStream(out);
         IOUtils.closeStream(checksumIn);
       }
     }
@@ -1255,12 +1267,13 @@
 
         // get the output stream to the target
         InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
-        targetSock = new Socket();
+        targetSock = SocketChannel.open().socket();
         targetSock.connect(targetAddr, socketTimeout);
         targetSock.setSoTimeout(socketTimeout);
 
         targetOut = new DataOutputStream(new BufferedOutputStream(
-            targetSock.getOutputStream(), SMALL_BUFFER_SIZE));
+                        new SocketOutputStream(targetSock, WRITE_TIMEOUT),
+                        SMALL_BUFFER_SIZE));
 
         /* send request to the target */
         // fist write header info
@@ -2541,12 +2554,14 @@
       try {
         InetSocketAddress curTarget = 
           NetUtils.createSocketAddr(targets[0].getName());
-        sock = new Socket();  
+        sock = SocketChannel.open().socket();
         sock.connect(curTarget, socketTimeout);
         sock.setSoTimeout(targets.length * socketTimeout);
 
+        long writeTimeout = WRITE_TIMEOUT + 
+                            WRITE_TIMEOUT_EXTENSION * (targets.length-1);
         out = new DataOutputStream(new BufferedOutputStream(
-                       sock.getOutputStream(), SMALL_BUFFER_SIZE));
+               new SocketOutputStream(sock, writeTimeout), SMALL_BUFFER_SIZE));
 
         blockSender = new BlockSender(b, 0, -1, false, false, false);
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Mar  5 20:18:26 2008
@@ -125,6 +125,8 @@
   public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
   public static final long LEASE_HARDLIMIT_PERIOD = 60 * LEASE_SOFTLIMIT_PERIOD;
   public static int READ_TIMEOUT = 60 * 1000;
+  public static int WRITE_TIMEOUT = 10 * 60 * 1000;  
+  public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
 
   // We need to limit the length and depth of a path in the filesystem.  HADOOP-438
   // Currently we set the maximum length to 8k characters and the maximum depth to 1k.  

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Mar  5 20:18:26 2008
@@ -185,7 +185,7 @@
       socket.setSoTimeout(timeout);
       this.in = new DataInputStream
         (new BufferedInputStream
-         (new FilterInputStream(socket.getInputStream()) {
+         (new FilterInputStream(NetUtils.getInputStream(socket)) {
              public int read(byte[] buf, int off, int len) throws IOException {
                int value = super.read(buf, off, len);
                if (readingCall != null) {
@@ -196,7 +196,7 @@
            }));
       this.out = new DataOutputStream
         (new BufferedOutputStream
-         (new FilterOutputStream(socket.getOutputStream()) {
+         (new FilterOutputStream(NetUtils.getOutputStream(socket)) {
              public void write(byte[] buf, int o, int len) throws IOException {
                out.write(buf, o, len);
                if (writingCall != null) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Mar  5 20:18:26 2008
@@ -176,8 +176,8 @@
    * @throws UnknownHostException if the address isn't a valid host name
    * @throws IOException other random errors from bind
    */
-  static void bind(ServerSocket socket, InetSocketAddress address, 
-                   int backlog) throws IOException {
+  public static void bind(ServerSocket socket, InetSocketAddress address, 
+                          int backlog) throws IOException {
     try {
       socket.bind(address, backlog);
     } catch (BindException e) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java Wed Mar  5 20:18:26 2008
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.net;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.net.URI;
 import java.util.Map.Entry;
 import java.util.*;
@@ -223,5 +227,106 @@
       }
     return l;
     }
+  }
+  
+  /**
+   * Same as getInputStream(socket, socket.getSoTimeout()).<br><br>
+   * 
+   * From documentation for {@link #getInputStream(Socket, long)}:<br>
+   * Returns InputStream for the socket. If the socket has an associated
+   * SocketChannel then it returns a 
+   * {@link SocketInputStream} with the given timeout. If the socket does not
+   * have a channel, {@link Socket#getInputStream()} is returned. In the later
+   * case, the timeout argument is ignored and the timeout set with 
+   * {@link Socket#setSoTimeout(int)} applies for reads.<br><br>
+   *
+   * Any socket created using socket factories returned by {@link #NetUtils},
+   * must use this interface instead of {@link Socket#getInputStream()}.
+   *     
+   * @see #getInputStream(Socket, long)
+   * 
+   * @param socket
+   * @return InputStream for reading from the socket.
+   * @throws IOException
+   */
+  public static InputStream getInputStream(Socket socket) 
+                                           throws IOException {
+    return getInputStream(socket, socket.getSoTimeout());
+  }
+  
+  /**
+   * Returns InputStream for the socket. If the socket has an associated
+   * SocketChannel then it returns a 
+   * {@link SocketInputStream} with the given timeout. If the socket does not
+   * have a channel, {@link Socket#getInputStream()} is returned. In the later
+   * case, the timeout argument is ignored and the timeout set with 
+   * {@link Socket#setSoTimeout(int)} applies for reads.<br><br>
+   * 
+   * Any socket created using socket factories returned by {@link #NetUtils},
+   * must use this interface instead of {@link Socket#getInputStream()}.
+   *     
+   * @see Socket#getChannel()
+   * 
+   * @param socket
+   * @param timeout timeout in milliseconds. This may not always apply. zero
+   *        for waiting as long as necessary.
+   * @return InputStream for reading from the socket.
+   * @throws IOException
+   */
+  public static InputStream getInputStream(Socket socket, long timeout) 
+                                           throws IOException {
+    return (socket.getChannel() == null) ? 
+          socket.getInputStream() : new SocketInputStream(socket, timeout);
+  }
+  
+  /**
+   * Same as getOutputStream(socket, 0). Timeout of zero implies write will
+   * wait until data is available.<br><br>
+   * 
+   * From documentation for {@link #getOutputStream(Socket, long)} : <br>
+   * Returns OutputStream for the socket. If the socket has an associated
+   * SocketChannel then it returns a 
+   * {@link SocketOutputStream} with the given timeout. If the socket does not
+   * have a channel, {@link Socket#getOutputStream()} is returned. In the later
+   * case, the timeout argument is ignored and the write will wait until 
+   * data is available.<br><br>
+   * 
+   * Any socket created using socket factories returned by {@link #NetUtils},
+   * must use this interface instead of {@link Socket#getOutputStream()}.
+   * 
+   * @see #getOutputStream(Socket, long)
+   * 
+   * @param socket
+   * @return OutputStream for writing to the socket.
+   * @throws IOException
+   */  
+  public static OutputStream getOutputStream(Socket socket) 
+                                             throws IOException {
+    return getOutputStream(socket, 0);
+  }
+  
+  /**
+   * Returns OutputStream for the socket. If the socket has an associated
+   * SocketChannel then it returns a 
+   * {@link SocketOutputStream} with the given timeout. If the socket does not
+   * have a channel, {@link Socket#getOutputStream()} is returned. In the later
+   * case, the timeout argument is ignored and the write will wait until 
+   * data is available.<br><br>
+   * 
+   * Any socket created using socket factories returned by {@link #NetUtils},
+   * must use this interface instead of {@link Socket#getOutputStream()}.
+   * 
+   * @see Socket#getChannel()
+   * 
+   * @param socket
+   * @param timeout timeout in milliseconds. This may not always apply. zero
+   *        for waiting as long as necessary.
+   * @return OutputStream for writing to the socket.
+   * @throws IOException   
+   */
+  public static OutputStream getOutputStream(Socket socket, long timeout) 
+                                             throws IOException {
+    return (socket.getChannel() == null) ? 
+            socket.getOutputStream() : new SocketOutputStream(socket, timeout);            
   }
 }

Added: hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java?rev=634153&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java Wed Mar  5 20:18:26 2008
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.Pipe;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This supports input and output streams for a socket channels. 
+ * These streams can have a timeout.
+ */
+abstract class SocketIOWithTimeout {
+  // This is intentionally package private.
+
+  static final Log LOG = LogFactory.getLog(SocketIOWithTimeout.class);    
+  
+  private SelectableChannel channel;
+  private long timeout;
+  private boolean closed = false;
+  
+  private static SelectorPool selector = new SelectorPool();
+  
+  /* A timeout value of 0 implies wait for ever. 
+   * We should have a value of timeout that implies zero wait.. i.e. 
+   * read or write returns immediately.
+   * 
+   * This will set channel to non-blocking.
+   */
+  SocketIOWithTimeout(SelectableChannel channel, long timeout) 
+                                                 throws IOException {
+    checkChannelValidity(channel);
+    
+    this.channel = channel;
+    this.timeout = timeout;
+    // Set non-blocking
+    channel.configureBlocking(false);
+  }
+  
+  void close() {
+    closed = true;
+  }
+
+  boolean isOpen() {
+    return !closed && channel.isOpen();
+  }
+
+  SelectableChannel getChannel() {
+    return channel;
+  }
+  
+  /** 
+   * Utility function to check if channel is ok.
+   * Mainly to throw IOException instead of runtime exception
+   * in case of mismatch. This mismatch can occur for many runtime
+   * reasons.
+   */
+  static void checkChannelValidity(Object channel) throws IOException {
+    if (channel == null) {
+      /* Most common reason is that original socket does not have a channel.
+       * So making this an IOException rather than a RuntimeException.
+       */
+      throw new IOException("Channel is null. Check " +
+                            "how the channel or socket is created.");
+    }
+    
+    if (!(channel instanceof SelectableChannel)) {
+      throw new IOException("Channel should be a SelectableChannel");
+    }    
+  }
+  
+  /**
+   * Performs actual IO operations. This is not expected to block.
+   *  
+   * @param buf
+   * @return number of bytes (or some equivalent). 0 implies underlying
+   *         channel is drained completely. We will wait if more IO is 
+   *         required.
+   * @throws IOException
+   */
+  abstract int performIO(ByteBuffer buf) throws IOException;  
+  
+  /**
+   * Performs one IO and returns number of bytes read or written.
+   * It waits up to the specified timeout. If the channel is 
+   * not read before the timeout, SocketTimeoutException is thrown.
+   * 
+   * @param buf buffer for IO
+   * @param ops Selection Ops used for waiting. Suggested values: 
+   *        SelectionKey.OP_READ while reading and SelectionKey.OP_WRITE while
+   *        writing. 
+   *        
+   * @return number of bytes read or written. negative implies end of stream.
+   * @throws IOException
+   */
+  int doIO(ByteBuffer buf, int ops) throws IOException {
+    
+    /* For now only one thread is allowed. If user want to read or write
+     * from multiple threads, multiple streams could be created. In that
+     * case multiple threads work as well as underlying channel supports it.
+     */
+    if (!buf.hasRemaining()) {
+      throw new IllegalArgumentException("Buffer has no data left.");
+      //or should we just return 0?
+    }
+
+    while (buf.hasRemaining()) {
+      if (closed) {
+        return -1;
+      }
+
+      try {
+        int n = performIO(buf);
+        if (n != 0) {
+          // successful io or an error.
+          return n;
+        }
+      } catch (IOException e) {
+        if (!channel.isOpen()) {
+          closed = true;
+        }
+        throw e;
+      }
+
+      //now wait for socket to be ready.
+      int count = 0;
+      try {
+        count = selector.select(channel, ops, timeout);  
+      } catch (IOException e) { //unexpected IOException.
+        closed = true;
+        throw e;
+      } 
+
+      if (count == 0) {
+        String channelStr = "Unknown Channel";
+        if (channel instanceof SocketChannel) {
+          Socket sock = ((SocketChannel)channel).socket();
+          SocketAddress remote = sock.getRemoteSocketAddress();
+          SocketAddress local = sock.getLocalSocketAddress();
+          channelStr =  (remote == null ? "Unknown Addr" : remote) +
+                         " (local: " + 
+                         (local == null ? "Unknown Addr" : local) + ")";
+        } else if (channel instanceof Pipe.SinkChannel ||
+                   channel instanceof Pipe.SourceChannel) {
+          channelStr = "pipe";
+        } else if (channel instanceof DatagramChannel) {
+          channelStr = "datagram channel";
+        }
+        
+        String waitingFor = ""+ops;
+        if (ops == SelectionKey.OP_READ) {
+          waitingFor = "read";
+        } else if (ops == SelectionKey.OP_WRITE) {
+          waitingFor = "write";
+        }
+        
+        throw new SocketTimeoutException(timeout + " millis timeout while " +
+                                         "waiting for " + channelStr +
+                                         " to be ready for " + waitingFor);
+      }
+      // otherwise the socket should be ready for io.
+    }
+    
+    return 0; // does not reach here.
+  }
+  
+  /**
+   * This maintains a pool of selectors. These selectors are closed
+   * once they are idle (unused) for a few seconds.
+   */
+  private static class SelectorPool {
+    
+    private static class SelectorInfo {
+      Selector              selector;
+      long                  lastActivityTime;
+      LinkedList<SelectorInfo> queue; 
+      
+      void close() {
+        if (selector != null) {
+          try {
+            selector.close();
+          } catch (IOException e) {
+            LOG.warn("Unexpected exception while closing selector : " +
+                     StringUtils.stringifyException(e));
+          }
+        }
+      }    
+    }
+    
+    private static class ProviderInfo {
+      SelectorProvider provider;
+      LinkedList<SelectorInfo> queue; // lifo
+      ProviderInfo next;
+    }
+    
+    private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
+    
+    private ProviderInfo providerList = null;
+    
+    /**
+     * Waits on the channel with the given timeout using one of the 
+     * cached selectors. It also removes any cached selectors that are
+     * idle for a few seconds.
+     * 
+     * @param channel
+     * @param ops
+     * @param timeout
+     * @return
+     * @throws IOException
+     */
+    int select(SelectableChannel channel, int ops, long timeout) 
+                                                   throws IOException {
+     
+      SelectorInfo info = get(channel);
+      
+      SelectionKey key = null;
+      int ret = -1;
+      
+      try {
+        key = channel.register(info.selector, ops);
+        ret = info.selector.select(timeout);
+      } finally {
+        if (key != null) {
+          key.cancel();
+        }
+        
+        //clear the canceled key.
+        try {
+          info.selector.selectNow();
+        } catch (IOException e) {
+          LOG.info("Unexpected Exception while clearing selector : " +
+                   StringUtils.stringifyException(e));
+          // don't put the selector back.
+          info.close();
+          return ret; 
+        }
+        
+        release(info);
+      }
+      
+      return ret;
+    }
+    
+    /**
+     * Takes one selector from end of LRU list of free selectors.
+     * If there are no selectors awailable, it creates a new selector.
+     * Also invokes trimIdleSelectors(). 
+     * 
+     * @param channel
+     * @return 
+     * @throws IOException
+     */
+    private synchronized SelectorInfo get(SelectableChannel channel) 
+                                                         throws IOException {
+      SelectorInfo selInfo = null;
+      
+      SelectorProvider provider = channel.provider();
+      
+      // pick the list : rarely there is more than one provider in use.
+      ProviderInfo pList = providerList;
+      while (pList != null && pList.provider != provider) {
+        pList = pList.next;
+      }      
+      if (pList == null) {
+        //LOG.info("Creating new ProviderInfo : " + provider.toString());
+        pList = new ProviderInfo();
+        pList.provider = provider;
+        pList.queue = new LinkedList<SelectorInfo>();
+        pList.next = providerList;
+        providerList = pList;
+      }
+      
+      LinkedList<SelectorInfo> queue = pList.queue;
+      
+      if (queue.isEmpty()) {
+        Selector selector = provider.openSelector();
+        selInfo = new SelectorInfo();
+        selInfo.selector = selector;
+        selInfo.queue = queue;
+      } else {
+        selInfo = queue.removeLast();
+      }
+      
+      trimIdleSelectors(System.currentTimeMillis());
+      return selInfo;
+    }
+    
+    /**
+     * puts selector back at the end of LRU list of free selectos.
+     * Also invokes trimIdleSelectors().
+     * 
+     * @param info
+     */
+    private synchronized void release(SelectorInfo info) {
+      long now = System.currentTimeMillis();
+      trimIdleSelectors(now);
+      info.lastActivityTime = now;
+      info.queue.addLast(info);
+    }
+    
+    /**
+     * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
+     * traverse the whole list, just over the one that have crossed 
+     * the timeout.
+     */
+    private void trimIdleSelectors(long now) {
+      long cutoff = now - IDLE_TIMEOUT;
+      
+      for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
+        if (pList.queue.isEmpty()) {
+          continue;
+        }
+        for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
+          SelectorInfo info = it.next();
+          if (info.lastActivityTime > cutoff) {
+            break;
+          }
+          it.remove();
+          info.close();
+        }
+      }
+    }
+  }
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java?rev=634153&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java Wed Mar  5 20:18:26 2008
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+
+
+/**
+ * This implements an input stream that can have a timeout while reading.
+ * This sets non-blocking flag on the socket channel.
+ * So after create this object, read() on 
+ * {@link Socket#getInputStream()} and write() on 
+ * {@link Socket#getOutputStream()} for the associated socket will throw 
+ * IllegalBlockingModeException. 
+ * Please use {@link SocketOutputStream} for writing.
+ */
+public class SocketInputStream extends InputStream
+                               implements ReadableByteChannel {
+
+  private SocketIOWithTimeout reader;
+
+  private static class Reader extends SocketIOWithTimeout {
+    ReadableByteChannel channel;
+    
+    Reader(ReadableByteChannel channel, long timeout) throws IOException {
+      super((SelectableChannel)channel, timeout);
+      this.channel = channel;
+    }
+    
+    int performIO(ByteBuffer buf) throws IOException {
+      return channel.read(buf);
+    }
+  }
+  
+  /**
+   * Create a new input stream with the given timeout. If the timeout
+   * is zero, it will be treated as infinite timeout. The socket's
+   * channel will be configured to be non-blocking.
+   * 
+   * @param channel 
+   *        Channel for reading, should also be a {@link SelectableChannel}.
+   *        The channel will be configured to be non-blocking.
+   * @param timeout timeout in milliseconds. must not be negative.
+   * @throws IOException
+   */
+  public SocketInputStream(ReadableByteChannel channel, long timeout)
+                                                        throws IOException {
+    SocketIOWithTimeout.checkChannelValidity(channel);
+    reader = new Reader(channel, timeout);
+  }
+
+  /**
+   * Same as SocketInputStream(socket.getChannel(), timeout): <br><br>
+   * 
+   * Create a new input stream with the given timeout. If the timeout
+   * is zero, it will be treated as infinite timeout. The socket's
+   * channel will be configured to be non-blocking.
+   * 
+   * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
+   *  
+   * @param socket should have a channel associated with it.
+   * @param timeout timeout timeout in milliseconds. must not be negative.
+   * @throws IOException
+   */
+  public SocketInputStream(Socket socket, long timeout) 
+                                         throws IOException {
+    this(socket.getChannel(), timeout);
+  }
+  
+  /**
+   * Same as SocketInputStream(socket.getChannel(), socket.getSoTimeout())
+   * :<br><br>
+   * 
+   * Create a new input stream with the given timeout. If the timeout
+   * is zero, it will be treated as infinite timeout. The socket's
+   * channel will be configured to be non-blocking.
+   * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
+   *  
+   * @param socket should have a channel associated with it.
+   * @throws IOException
+   */
+  public SocketInputStream(Socket socket) throws IOException {
+    this(socket.getChannel(), socket.getSoTimeout());
+  }
+  
+  @Override
+  public int read() throws IOException {
+    /* Allocation can be removed if required.
+     * probably no need to optimize or encourage single byte read.
+     */
+    byte[] buf = new byte[1];
+    if (read(buf, 0, 1) > 0) {
+      return (byte)buf[0];
+    }
+    throw new IOException("Could not read from stream");
+  }
+
+  public int read(byte[] b, int off, int len) throws IOException {
+    return read(ByteBuffer.wrap(b, off, len));
+  }
+
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  //ReadableByteChannel interface
+    
+  public boolean isOpen() {
+    return reader.isOpen();
+  }
+    
+  public int read(ByteBuffer dst) throws IOException {
+    return reader.doIO(dst, SelectionKey.OP_READ);
+  }
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java?rev=634153&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java Wed Mar  5 20:18:26 2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * This implements an output stream that can have a timeout while writing.
+ * This sets non-blocking flag on the socket channel.
+ * So after creating this object , read() on 
+ * {@link Socket#getInputStream()} and write() on 
+ * {@link Socket#getOutputStream()} on the associated socket will throw 
+ * llegalBlockingModeException.
+ * Please use {@link SocketInputStream} for reading.
+ */
+public class SocketOutputStream extends OutputStream 
+                                implements WritableByteChannel {                                
+  
+  private SocketIOWithTimeout writer;
+  
+  private static class Writer extends SocketIOWithTimeout {
+    WritableByteChannel channel;
+    
+    Writer(WritableByteChannel channel, long timeout) throws IOException {
+      super((SelectableChannel)channel, timeout);
+      this.channel = channel;
+    }
+    
+    int performIO(ByteBuffer buf) throws IOException {
+      return channel.write(buf);
+    }
+  }
+  
+  /**
+   * Create a new ouput stream with the given timeout. If the timeout
+   * is zero, it will be treated as infinite timeout. The socket's
+   * channel will be configured to be non-blocking.
+   * 
+   * @param channel 
+   *        Channel for writing, should also be a {@link SelectableChannel}.  
+   *        The channel will be configured to be non-blocking.
+   * @param timeout timeout in milliseconds. must not be negative.
+   * @throws IOException
+   */
+  public SocketOutputStream(WritableByteChannel channel, long timeout) 
+                                                         throws IOException {
+    SocketIOWithTimeout.checkChannelValidity(channel);
+    writer = new Writer(channel, timeout);
+  }
+  
+  /**
+   * Same as SocketOutputStream(socket.getChannel(), timeout):<br><br>
+   * 
+   * Create a new ouput stream with the given timeout. If the timeout
+   * is zero, it will be treated as infinite timeout. The socket's
+   * channel will be configured to be non-blocking.
+   * 
+   * @see SocketOutputStream#SocketOutputStream(WritableByteChannel, long)
+   *  
+   * @param socket should have a channel associated with it.
+   * @param timeout timeout timeout in milliseconds. must not be negative.
+   * @throws IOException
+   */
+  public SocketOutputStream(Socket socket, long timeout) 
+                                         throws IOException {
+    this(socket.getChannel(), timeout);
+  }
+  
+  public void write(int b) throws IOException {
+    /* If we need to, we can optimize this allocation.
+     * probably no need to optimize or encourage single byte writes.
+     */
+    byte[] buf = new byte[1];
+    buf[0] = (byte)b;
+    write(buf, 0, 1);
+  }
+  
+  public void write(byte[] b, int off, int len) throws IOException {
+    ByteBuffer buf = ByteBuffer.wrap(b, off, len);
+    while (buf.hasRemaining()) {
+      try {
+        if (write(buf) < 0) {
+          throw new IOException("The stream is closed");
+        }
+      } catch (IOException e) {
+        /* Unlike read, write can not inform user of partial writes.
+         * So will close this if there was a partial write.
+         */
+        if (buf.capacity() > buf.remaining()) {
+          writer.close();
+        }
+        throw e;
+      }
+    }
+  }
+
+  public void close() throws IOException {
+    writer.close();
+  }
+
+  //WritableByteChannle interface 
+  
+  public boolean isOpen() {
+    return writer.isOpen();
+  }
+
+  public int write(ByteBuffer src) throws IOException {
+    return writer.doIO(src, SelectionKey.OP_WRITE);
+  }
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java Wed Mar  5 20:18:26 2008
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
 
 import javax.net.SocketFactory;
 
@@ -39,7 +40,22 @@
   /* @inheritDoc */
   @Override
   public Socket createSocket() throws IOException {
-    return new Socket();
+    /*
+     * NOTE: This returns an NIO socket so that it has an associated 
+     * SocketChannel. As of now, this unfortunately makes streams returned
+     * by Socket.getInputStream() and Socket.getOutputStream() unusable
+     * (because a blocking read on input stream blocks write on output stream
+     * and vice versa).
+     * 
+     * So users of these socket factories should use 
+     * NetUtils.getInputStream(socket) and 
+     * NetUtils.getOutputStream(socket) instead.
+     * 
+     * A solution for hiding from this from user is to write a 
+     * 'FilterSocket' on the lines of FilterInputStream and extend it by
+     * overriding getInputStream() and getOutputStream().
+     */
+    return SocketChannel.open().socket();
   }
 
   /* @inheritDoc */

Added: hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java?rev=634153&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java Wed Mar  5 20:18:26 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.channels.Pipe;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import junit.framework.TestCase;
+
+/**
+ * This tests timout out from SocketInputStream and
+ * SocketOutputStream using pipes.
+ * 
+ * Normal read and write using these streams are tested by pretty much
+ * every DFS unit test.
+ */
+public class TestSocketIOWithTimeout extends TestCase {
+
+  Log LOG = LogFactory.getLog(TestSocketIOWithTimeout.class);
+  
+  private static int TIMEOUT = 1*1000; 
+  private static String TEST_STRING = "1234567890";
+  
+  private void doIO(InputStream in, OutputStream out) throws IOException {
+    /* Keep on writing or reading until we get SocketTimeoutException.
+     * It expects this exception to occur within 100 millis of TIMEOUT.
+     */
+    byte buf[] = new byte[4192];
+    
+    while (true) {
+      long start = System.currentTimeMillis();
+      try {
+        if (in != null) {
+          in.read(buf);
+        } else {
+          out.write(buf);
+        }
+      } catch (SocketTimeoutException e) {
+        long diff = System.currentTimeMillis() - start;
+        LOG.info("Got SocketTimeoutException as expected after " + 
+                 diff + " millis : " + e.getMessage());
+        assertTrue(Math.abs(TIMEOUT - diff) <= 100);
+        break;
+      }
+    }
+  }
+  
+  public void testSocketIOWithTimeout() throws IOException {
+    
+    // first open pipe:
+    Pipe pipe = Pipe.open();
+    Pipe.SourceChannel source = pipe.source();
+    Pipe.SinkChannel sink = pipe.sink();
+    
+    try {
+      InputStream in = new SocketInputStream(source, TIMEOUT);
+      OutputStream out = new SocketOutputStream(sink, TIMEOUT);
+      
+      byte[] writeBytes = TEST_STRING.getBytes();
+      byte[] readBytes = new byte[writeBytes.length];
+      
+      out.write(writeBytes);
+      doIO(null, out);
+      
+      in.read(readBytes);
+      assertTrue(Arrays.equals(writeBytes, readBytes));
+      doIO(in, null);
+    } finally {
+      if (source != null) {
+        source.close();
+      }
+      if (sink != null) {
+        sink.close();
+      }
+    }
+  }
+}