You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/01/09 03:39:16 UTC

svn commit: r1430662 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/net/ src/main/java/org/apache/hadoop/hdfs/server/common/ src/main/java/org/apache/hadoop/hdfs/...

Author: szetszwo
Date: Wed Jan  9 02:39:15 2013
New Revision: 1430662

URL: http://svn.apache.org/viewvc?rev=1430662&view=rev
Log:
svn merge -c -1430507 . for reverting HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
      - copied unchanged from r1430506, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
      - copied unchanged from r1430506, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
Removed:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Jan  9 02:39:15 2013
@@ -180,9 +180,6 @@ Trunk (Unreleased)
     HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class
     (Colin Patrick McCabe via todd)
 
-    HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes
-    (Colin Patrick McCabe via todd)
-
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Wed Jan  9 02:39:15 2013
@@ -18,8 +18,10 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.net.Socket;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 
 /**
  * A BlockReader is responsible for reading a single block
@@ -41,18 +43,7 @@ public interface BlockReader extends Byt
    */
   long skip(long n) throws IOException;
 
-  /**
-   * Close the block reader.
-   *
-   * @param peerCache      The PeerCache to put the Peer we're using back
-   *                       into, or null if we should simply close the Peer
-   *                       we're using (along with its Socket).
-   *                       Some block readers, like BlockReaderLocal, may
-   *                       not make use of this parameter.
-   *
-   * @throws IOException
-   */
-  void close(PeerCache peerCache) throws IOException;
+  void close() throws IOException;
 
   /**
    * Read exactly the given amount of data, throwing an exception
@@ -69,4 +60,20 @@ public interface BlockReader extends Byt
    * filled or the next call will return EOF.
    */
   int readAll(byte[] buf, int offset, int len) throws IOException;
+
+  /**
+   * Take the socket used to talk to the DN.
+   */
+  Socket takeSocket();
+
+  /**
+   * Whether the BlockReader has reached the end of its input stream
+   * and successfully sent a status code back to the datanode.
+   */
+  boolean hasSentStatusCode();
+
+  /**
+   * @return a reference to the streams this block reader is using.
+   */
+  IOStreamPair getStreams();
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Wed Jan  9 02:39:15 2013
@@ -19,18 +19,19 @@ package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 
-import com.google.common.base.Preconditions;
-
 
 /** 
  * Utility class to create BlockReader implementations.
@@ -46,73 +47,18 @@ public class BlockReaderFactory {
   @InterfaceAudience.Private
   public static class Params {
     private final Conf conf;
-    /**
-     * The peer that this BlockReader will be connected to.
-     * You must set this.
-     */
-    private Peer peer = null;
-    
-    /**
-     * The file name that this BlockReader pertains to.
-     * This is optional and only used for display and logging purposes.
-     */
+    private Socket socket = null;
     private String file = null;
-
-    /**
-     * The block that this BlockReader is reading.
-     * You must set this.
-     */
     private ExtendedBlock block = null;
-    
-    /**
-     * The BlockTokenIdentifier to use, or null to use none.
-     */
     private Token<BlockTokenIdentifier> blockToken = null;
-
-    /**
-     * The offset in the block to start reading at.
-     */
     private long startOffset = 0;
-    
-    /**
-     * The total number of bytes we might want to read, or -1 to assume no
-     * limit.
-     */
     private long len = -1;
-    
-    /**
-     * The buffer size to use.
-     *
-     * If this is not set, we will use the default from the Conf.
-     */
     private int bufferSize;
-    
-    /**
-     * Whether or not we should verify the checksum.
-     *
-     * This is used instead of conf.verifyChecksum, because there are some
-     * cases when we may want to explicitly turn off checksum verification,
-     * such as when the caller has explicitly asked for a file to be opened
-     * without checksum verification.
-     */
     private boolean verifyChecksum = true;
-
-    /**
-     * Whether or not we should try to use short circuit local reads.
-     */
     private boolean shortCircuitLocalReads = false;
-
-    /**
-     * The name of the client using this BlockReader, for logging and
-     * debugging purposes.
-     */
     private String clientName = "";
-    
-    /**
-     * The DataNode on which this Block resides.
-     * You must set this.
-     */
-    private DatanodeID datanodeID = null;
+    private DataEncryptionKey encryptionKey = null;
+    private IOStreamPair ioStreamPair = null;
 
     public Params(Conf conf) {
       this.conf = conf;
@@ -121,11 +67,11 @@ public class BlockReaderFactory {
     public Conf getConf() {
       return conf;
     }
-    public Peer getPeer() {
-      return peer;
+    public Socket getSocket() {
+      return socket;
     }
-    public Params setPeer(Peer peer) {
-      this.peer = peer;
+    public Params setSocket(Socket socket) {
+      this.socket = socket;
       return this;
     }
     public String getFile() {
@@ -191,12 +137,19 @@ public class BlockReaderFactory {
       this.clientName = clientName;
       return this;
     }
-    public Params setDatanodeID(DatanodeID datanodeID) {
-      this.datanodeID = datanodeID;
+    public Params setEncryptionKey(DataEncryptionKey encryptionKey) {
+      this.encryptionKey = encryptionKey;
       return this;
     }
-    public DatanodeID getDatanodeID() {
-      return datanodeID;
+    public DataEncryptionKey getEncryptionKey() {
+      return encryptionKey;
+    }
+    public IOStreamPair getIoStreamPair() {
+      return ioStreamPair;
+    }
+    public Params setIoStreamPair(IOStreamPair ioStreamPair) {
+      this.ioStreamPair = ioStreamPair;
+      return this;
     }
   }
 
@@ -211,27 +164,24 @@ public class BlockReaderFactory {
    */
   @SuppressWarnings("deprecation")
   public static BlockReader newBlockReader(Params params) throws IOException {
-    Preconditions.checkNotNull(params.getPeer());
-    Preconditions.checkNotNull(params.getBlock());
-    Preconditions.checkNotNull(params.getDatanodeID());
-    // First, let's set the read and write timeouts appropriately.
-    // This will keep us from blocking forever if something goes wrong during
-    // network communication.
-    Peer peer = params.getPeer();
-    peer.setReadTimeout(params.getConf().socketTimeout);
-    peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
-
     if (params.getConf().useLegacyBlockReader) {
-      // The legacy BlockReader doesn't require that the Peers it uses
-      // have associated ReadableByteChannels.  This makes it easier to use 
-      // with some older Socket classes like, say, SocksSocketImpl.
-      //
-      // TODO: create a wrapper class that makes channel-less sockets look like
-      // they have a channel, so that we can finally remove the legacy
-      // RemoteBlockReader.  See HDFS-2534.
+      if (params.getEncryptionKey() != null) {
+        throw new RuntimeException("Encryption is not supported with the legacy block reader.");
+      }
       return RemoteBlockReader.newBlockReader(params);
     } else {
-      // The usual block reader.
+      Socket sock = params.getSocket();
+      if (params.getIoStreamPair() == null) {
+        params.setIoStreamPair(new IOStreamPair(NetUtils.getInputStream(sock),
+            NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
+        if (params.getEncryptionKey() != null) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  params.getIoStreamPair().out, params.getIoStreamPair().in, 
+                  params.getEncryptionKey());
+          params.setIoStreamPair(encryptedStreams);
+        }
+      }
       return RemoteBlockReader2.newBlockReader(params);
     }
   }
@@ -247,4 +197,4 @@ public class BlockReaderFactory {
       final String poolId, final long blockId) {
     return s.toString() + ":" + poolId + ":" + blockId;
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Wed Jan  9 02:39:15 2013
@@ -649,7 +649,7 @@ class BlockReaderLocal implements BlockR
   }
 
   @Override
-  public synchronized void close(PeerCache peerCache) throws IOException {
+  public synchronized void close() throws IOException {
     dataIn.close();
     if (checksumIn != null) {
       checksumIn.close();
@@ -675,4 +675,19 @@ class BlockReaderLocal implements BlockR
   public void readFully(byte[] buf, int off, int len) throws IOException {
     BlockReaderUtil.readFully(this, buf, off, len);
   }
+
+  @Override
+  public Socket takeSocket() {
+    return null;
+  }
+
+  @Override
+  public boolean hasSentStatusCode() {
+    return false;
+  }
+
+  @Override
+  public IOStreamPair getStreams() {
+    return null;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Jan  9 02:39:15 2013
@@ -191,7 +191,7 @@ public class DFSClient implements java.i
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
   private final String authority;
-  final PeerCache peerCache;
+  final SocketCache socketCache;
   final Conf dfsClientConf;
   private Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
@@ -433,7 +433,7 @@ public class DFSClient implements java.i
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
     
-    this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
+    this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Jan  9 02:39:15 2013
@@ -32,15 +32,12 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.net.EncryptedPeer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -49,7 +46,6 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.ipc.RPC;
@@ -64,7 +60,7 @@ import org.apache.hadoop.security.token.
  ****************************************************************/
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
-  private final PeerCache peerCache;
+  private final SocketCache socketCache;
 
   private final DFSClient dfsClient;
   private boolean closed = false;
@@ -114,7 +110,7 @@ public class DFSInputStream extends FSIn
     this.verifyChecksum = verifyChecksum;
     this.buffersize = buffersize;
     this.src = src;
-    this.peerCache = dfsClient.peerCache;
+    this.socketCache = dfsClient.socketCache;
     prefetchSize = dfsClient.getConf().prefetchSize;
     timeWindow = dfsClient.getConf().timeWindow;
     nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@@ -428,7 +424,7 @@ public class DFSInputStream extends FSIn
 
     // Will be getting a new BlockReader.
     if (blockReader != null) {
-      blockReader.close(peerCache);
+      closeBlockReader(blockReader);
       blockReader = null;
     }
 
@@ -510,7 +506,7 @@ public class DFSInputStream extends FSIn
     dfsClient.checkOpen();
 
     if (blockReader != null) {
-      blockReader.close(peerCache);
+      closeBlockReader(blockReader);
       blockReader = null;
     }
     super.close();
@@ -837,7 +833,7 @@ public class DFSInputStream extends FSIn
         }
       } finally {
         if (reader != null) {
-          reader.close(peerCache);
+          closeBlockReader(reader);
         }
       }
       // Put chosen node into dead list, continue
@@ -845,30 +841,16 @@ public class DFSInputStream extends FSIn
     }
   }
 
-  private Peer newPeer(InetSocketAddress addr) throws IOException {
-    Peer peer = null;
-    boolean success = false;
-    Socket sock = null;
-    try {
-      sock = dfsClient.socketFactory.createSocket();
-      NetUtils.connect(sock, addr,
-        dfsClient.getRandomLocalInterfaceAddr(),
-        dfsClient.getConf().socketTimeout);
-      peer = TcpPeerServer.peerFromSocket(sock);
-      
-      // Add encryption if configured.
-      DataEncryptionKey key = dfsClient.getDataEncryptionKey();
-      if (key != null) {
-        peer = new EncryptedPeer(peer, key);
-      }
-      success = true;
-      return peer;
-    } finally {
-      if (!success) {
-        IOUtils.closeQuietly(peer);
-        IOUtils.closeQuietly(sock);
-      }
+  /**
+   * Close the given BlockReader and cache its socket.
+   */
+  private void closeBlockReader(BlockReader reader) throws IOException {
+    if (reader.hasSentStatusCode()) {
+      IOStreamPair ioStreams = reader.getStreams();
+      Socket oldSock = reader.takeSocket();
+      socketCache.put(oldSock, ioStreams);
     }
+    reader.close();
   }
 
   /**
@@ -914,16 +896,40 @@ public class DFSInputStream extends FSIn
     // Allow retry since there is no way of knowing whether the cached socket
     // is good until we actually use it.
     for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
-      Peer peer = null;
+      SocketAndStreams sockAndStreams = null;
       // Don't use the cache on the last attempt - it's possible that there
       // are arbitrarily many unusable sockets in the cache, but we don't
       // want to fail the read.
       if (retries < nCachedConnRetry) {
-        peer = peerCache.get(chosenNode);
+        sockAndStreams = socketCache.get(dnAddr);
       }
-      if (peer == null) {
-        peer = newPeer(dnAddr);
+      Socket sock;
+      if (sockAndStreams == null) {
         fromCache = false;
+
+        sock = dfsClient.socketFactory.createSocket();
+        
+        // TCP_NODELAY is crucial here because of bad interactions between
+        // Nagle's Algorithm and Delayed ACKs. With connection keepalive
+        // between the client and DN, the conversation looks like:
+        //   1. Client -> DN: Read block X
+        //   2. DN -> Client: data for block X
+        //   3. Client -> DN: Status OK (successful read)
+        //   4. Client -> DN: Read block Y
+        // The fact that step #3 and #4 are both in the client->DN direction
+        // triggers Nagling. If the DN is using delayed ACKs, this results
+        // in a delay of 40ms or more.
+        //
+        // TCP_NODELAY disables nagling and thus avoids this performance
+        // disaster.
+        sock.setTcpNoDelay(true);
+
+        NetUtils.connect(sock, dnAddr,
+            dfsClient.getRandomLocalInterfaceAddr(),
+            dfsClient.getConf().socketTimeout);
+        sock.setSoTimeout(dfsClient.getConf().socketTimeout);
+      } else {
+        sock = sockAndStreams.sock;
       }
 
       try {
@@ -933,13 +939,19 @@ public class DFSInputStream extends FSIn
                 setFile(file).setBlock(block).setBlockToken(blockToken).
                 setStartOffset(startOffset).setLen(len).
                 setBufferSize(bufferSize).setVerifyChecksum(verifyChecksum).
-                setClientName(clientName).setDatanodeID(chosenNode).
-                setPeer(peer));
+                setClientName(clientName).
+                setEncryptionKey(dfsClient.getDataEncryptionKey()).
+                setIoStreamPair(sockAndStreams == null ? null : sockAndStreams.ioStreams).
+                setSocket(sock));
         return reader;
       } catch (IOException ex) {
         // Our socket is no good.
-        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex);
-        IOUtils.closeQuietly(peer);
+        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
+        if (sockAndStreams != null) {
+          sockAndStreams.close();
+        } else {
+          sock.close();
+        }
         err = ex;
       }
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Wed Jan  9 02:39:15 2013
@@ -25,20 +25,25 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
 
@@ -51,8 +56,7 @@ import org.apache.hadoop.util.DataChecks
 @Deprecated
 public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 
-  private final Peer peer;
-  private final DatanodeID datanodeID;
+  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
   private final DataInputStream in;
   private DataChecksum checksum;
 
@@ -122,9 +126,9 @@ public class RemoteBlockReader extends F
     // if eos was set in the previous read, send a status code to the DN
     if (eos && !eosBefore && nRead >= 0) {
       if (needChecksum()) {
-        sendReadResult(peer, Status.CHECKSUM_OK);
+        sendReadResult(dnSock, Status.CHECKSUM_OK);
       } else {
-        sendReadResult(peer, Status.SUCCESS);
+        sendReadResult(dnSock, Status.SUCCESS);
       }
     }
     return nRead;
@@ -318,8 +322,7 @@ public class RemoteBlockReader extends F
   
   private RemoteBlockReader(String file, String bpid, long blockId,
       DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, 
-      Peer peer, DatanodeID datanodeID) {
+      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
     // Path is used only for printing block and file information in debug
     super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
           1, verifyChecksum,
@@ -327,8 +330,7 @@ public class RemoteBlockReader extends F
           checksum.getBytesPerChecksum(),
           checksum.getChecksumSize());
     
-    this.peer = peer;
-    this.datanodeID = datanodeID;
+    this.dnSock = dnSock;
     this.in = in;
     this.checksum = checksum;
     this.startOffset = Math.max( startOffset, 0 );
@@ -365,8 +367,9 @@ public class RemoteBlockReader extends F
   public static RemoteBlockReader newBlockReader(BlockReaderFactory.Params params)
         throws IOException {
     // in and out will be closed when sock is closed (by the caller)
+    Socket sock = params.getSocket();
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-        params.getPeer().getOutputStream()));
+          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
     new Sender(out).readBlock(params.getBlock(), params.getBlockToken(), 
         params.getClientName(), params.getStartOffset(), params.getLen());
 
@@ -374,13 +377,13 @@ public class RemoteBlockReader extends F
     // Get bytes in block, set streams
     //
     DataInputStream in = new DataInputStream(
-        new BufferedInputStream(params.getPeer().getInputStream(),
+        new BufferedInputStream(NetUtils.getInputStream(sock), 
             params.getBufferSize()));
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
-    RemoteBlockReader2.checkSuccess(status, params.getPeer(),
-        params.getBlock(), params.getFile());
+    RemoteBlockReader2.checkSuccess(status, sock, params.getBlock(),
+        params.getFile());
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -399,20 +402,18 @@ public class RemoteBlockReader extends F
 
     return new RemoteBlockReader(params.getFile(), params.getBlock().getBlockPoolId(), 
         params.getBlock().getBlockId(), in, checksum, params.getVerifyChecksum(),
-        params.getStartOffset(), firstChunkOffset, params.getLen(),
-        params.getPeer(), params.getDatanodeID());
+        params.getStartOffset(), firstChunkOffset, params.getLen(), sock);
   }
 
   @Override
-  public synchronized void close(PeerCache peerCache) throws IOException {
+  public synchronized void close() throws IOException {
     startOffset = -1;
     checksum = null;
-    if (peerCache != null && sentStatusCode) {
-      peerCache.put(datanodeID, peer);
-    } else {
-      peer.close();
+    if (dnSock != null) {
+      dnSock.close();
     }
-    // in will be closed when its Peer is closed.
+
+    // in will be closed when its Socket is closed.
   }
   
   @Override
@@ -426,21 +427,37 @@ public class RemoteBlockReader extends F
     return readFully(this, buf, offset, len);
   }
 
+  @Override
+  public Socket takeSocket() {
+    assert hasSentStatusCode() :
+      "BlockReader shouldn't give back sockets mid-read";
+    Socket res = dnSock;
+    dnSock = null;
+    return res;
+  }
+
+  @Override
+  public boolean hasSentStatusCode() {
+    return sentStatusCode;
+  }
+
   /**
    * When the reader reaches end of the read, it sends a status response
    * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
    * closing our connection (which we will re-open), but won't affect
    * data correctness.
    */
-  void sendReadResult(Peer peer, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + peer;
+  void sendReadResult(Socket sock, Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + sock;
     try {
-      RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
+      RemoteBlockReader2.writeReadResult(
+          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
+          statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               datanodeID + ": " + e.getMessage());
+               sock.getInetAddress() + ": " + e.getMessage());
     }
   }
   
@@ -460,4 +477,12 @@ public class RemoteBlockReader extends F
   public int read(ByteBuffer buf) throws IOException {
     throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
   }
+
+  @Override
+  public IOStreamPair getStreams() {
+    // This class doesn't support encryption, which is the only thing this
+    // method is used for. See HDFS-3637.
+    return null;
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Wed Jan  9 02:39:15 2013
@@ -25,15 +25,16 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -41,11 +42,13 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.net.SocketInputWrapper;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This is a wrapper around connection to datanode
  * and understands checksum, offset etc.
@@ -76,8 +79,11 @@ import com.google.common.annotations.Vis
 public class RemoteBlockReader2  implements BlockReader {
 
   static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
-  private final DatanodeID datanodeID;
-  private final Peer peer;
+  
+  Socket dnSock;
+  // for now just sending the status code (e.g. checksumOk) after the read.
+  private IOStreamPair ioStreams;
+  private final ReadableByteChannel in;
   private DataChecksum checksum;
   
   private PacketReceiver packetReceiver = new PacketReceiver(true);
@@ -109,11 +115,6 @@ public class RemoteBlockReader2  impleme
   /** Amount of unread data in the current received packet */
   int dataLeft = 0;
   
-  @VisibleForTesting
-  public Peer getPeer() {
-    return peer;
-  }
-  
   @Override
   public synchronized int read(byte[] buf, int off, int len) 
                                throws IOException {
@@ -154,7 +155,7 @@ public class RemoteBlockReader2  impleme
 
   private void readNextPacket() throws IOException {
     //Read packet headers.
-    packetReceiver.receiveNextPacket(peer.getInputStreamChannel());
+    packetReceiver.receiveNextPacket(in);
 
     PacketHeader curHeader = packetReceiver.getHeader();
     curDataSlice = packetReceiver.getDataSlice();
@@ -235,7 +236,7 @@ public class RemoteBlockReader2  impleme
       LOG.trace("Reading empty packet at end of read");
     }
     
-    packetReceiver.receiveNextPacket(peer.getInputStreamChannel());
+    packetReceiver.receiveNextPacket(in);
 
     PacketHeader trailer = packetReceiver.getHeader();
     if (!trailer.isLastPacketInBlock() ||
@@ -246,10 +247,11 @@ public class RemoteBlockReader2  impleme
   }
 
   protected RemoteBlockReader2(BlockReaderFactory.Params params, 
-      DataChecksum checksum, long firstChunkOffset) {
+      DataChecksum checksum, long firstChunkOffset, ReadableByteChannel in) {
     // Path is used only for printing block and file information in debug
-    this.datanodeID = params.getDatanodeID();
-    this.peer = params.getPeer();
+    this.dnSock = params.getSocket();
+    this.ioStreams = params.getIoStreamPair();
+    this.in = in;
     this.checksum = checksum;
     this.verifyChecksum = params.getVerifyChecksum();
     this.startOffset = Math.max( params.getStartOffset(), 0 );
@@ -266,19 +268,38 @@ public class RemoteBlockReader2  impleme
 
 
   @Override
-  public synchronized void close(PeerCache peerCache) throws IOException {
+  public synchronized void close() throws IOException {
     packetReceiver.close();
     
     startOffset = -1;
     checksum = null;
-    if (peerCache != null && sentStatusCode) {
-      peerCache.put(datanodeID, peer);
-    } else {
-      peer.close();
+    if (dnSock != null) {
+      dnSock.close();
     }
 
     // in will be closed when its Socket is closed.
   }
+  
+  /**
+   * Take the socket used to talk to the DN.
+   */
+  @Override
+  public Socket takeSocket() {
+    assert hasSentStatusCode() :
+      "BlockReader shouldn't give back sockets mid-read";
+    Socket res = dnSock;
+    dnSock = null;
+    return res;
+  }
+
+  /**
+   * Whether the BlockReader has reached the end of its input stream
+   * and successfully sent a status code back to the datanode.
+   */
+  @Override
+  public boolean hasSentStatusCode() {
+    return sentStatusCode;
+  }
 
   /**
    * When the reader reaches end of the read, it sends a status response
@@ -287,14 +308,14 @@ public class RemoteBlockReader2  impleme
    * data correctness.
    */
   void sendReadResult(Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + peer;
+    assert !sentStatusCode : "already sent status code to " + dnSock;
     try {
-      writeReadResult(peer.getOutputStream(), statusCode);
+      writeReadResult(ioStreams.out, statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               peer.getRemoteAddressString() + ": " + e.getMessage());
+               dnSock.getInetAddress() + ": " + e.getMessage());
     }
   }
 
@@ -352,20 +373,29 @@ public class RemoteBlockReader2  impleme
    */
   public static BlockReader newBlockReader(BlockReaderFactory.Params params)
                                      throws IOException {
+    IOStreamPair ioStreams = params.getIoStreamPair();
+    ReadableByteChannel ch;
+    if (ioStreams.in instanceof SocketInputWrapper) {
+      ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
+    } else {
+      ch = (ReadableByteChannel) ioStreams.in;
+    }
+    
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          params.getPeer().getOutputStream()));
+          ioStreams.out));
     new Sender(out).readBlock(params.getBlock(), params.getBlockToken(), 
         params.getClientName(), params.getStartOffset(), params.getLen());
 
     //
     // Get bytes in block
     //
-    DataInputStream in = new DataInputStream(params.getPeer().getInputStream());
+    DataInputStream in = new DataInputStream(ioStreams.in);
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
-    checkSuccess(status, params.getPeer(), params.getBlock(), params.getFile());
+    checkSuccess(status, params.getSocket(), params.getBlock(),
+        params.getFile());
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -382,28 +412,33 @@ public class RemoteBlockReader2  impleme
                     params.getStartOffset() + " for file " + params.getFile());
     }
 
-    return new RemoteBlockReader2(params, checksum, firstChunkOffset);
+    return new RemoteBlockReader2(params, checksum, firstChunkOffset, ch);
   }
 
   static void checkSuccess(
-      BlockOpResponseProto status, Peer peer,
+      BlockOpResponseProto status, Socket sock,
       ExtendedBlock block, String file)
       throws IOException {
     if (status.getStatus() != Status.SUCCESS) {
       if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
         throw new InvalidBlockTokenException(
             "Got access token error for OP_READ_BLOCK, self="
-                + peer.getLocalAddressString() + ", remote="
-                + peer.getRemoteAddressString() + ", for file " + file
+                + sock.getLocalSocketAddress() + ", remote="
+                + sock.getRemoteSocketAddress() + ", for file " + file
                 + ", for pool " + block.getBlockPoolId() + " block " 
                 + block.getBlockId() + "_" + block.getGenerationStamp());
       } else {
         throw new IOException("Got error for OP_READ_BLOCK, self="
-            + peer.getLocalAddressString() + ", remote="
-            + peer.getRemoteAddressString() + ", for file " + file
+            + sock.getLocalSocketAddress() + ", remote="
+            + sock.getRemoteSocketAddress() + ", for file " + file
             + ", for pool " + block.getBlockPoolId() + " block " 
             + block.getBlockId() + "_" + block.getGenerationStamp());
       }
     }
   }
+
+  @Override
+  public IOStreamPair getStreams() {
+    return ioStreams;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Jan  9 02:39:15 2013
@@ -46,8 +46,6 @@ import org.apache.hadoop.hdfs.BlockReade
 import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -209,14 +207,12 @@ public class JspHelper {
     // Use the block name for file name. 
     BlockReader blockReader = BlockReaderFactory.newBlockReader(
         new BlockReaderFactory.Params(new Conf(conf)).
-          setPeer(TcpPeerServer.peerFromSocketAndKey(s, encryptionKey)).
+          setSocket(s).
           setBlockToken(blockToken).setStartOffset(offsetIntoBlock).
           setLen(amtToRead).
+          setEncryptionKey(encryptionKey).
           setFile(BlockReaderFactory.getFileName(addr, poolId, blockId)).
-          setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
-          setDatanodeID(new DatanodeID(addr.getAddress().toString(), 
-              addr.getHostName(), poolId, addr.getPort(), 0, 0)));
-    
+          setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)));
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
     int retries = 2;
@@ -234,7 +230,8 @@ public class JspHelper {
       amtToRead -= numRead;
       readOffset += numRead;
     }
-    blockReader.close(null);
+    blockReader = null;
+    s.close();
     out.print(HtmlQuoting.quoteHtmlChars(new String(buf)));
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Jan  9 02:39:15 2013
@@ -90,7 +90,6 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -523,19 +522,24 @@ public class DataNode extends Configured
   
   private void initDataXceiver(Configuration conf) throws IOException {
     // find free port or use privileged port provided
-    TcpPeerServer tcpPeerServer;
-    if (secureResources != null) {
-      tcpPeerServer = new TcpPeerServer(secureResources);
+    ServerSocket ss;
+    if (secureResources == null) {
+      InetSocketAddress addr = DataNode.getStreamingAddr(conf);
+      ss = (dnConf.socketWriteTimeout > 0) ? 
+          ServerSocketChannel.open().socket() : new ServerSocket();
+          Server.bind(ss, addr, 0);
     } else {
-      tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
-          DataNode.getStreamingAddr(conf));
+      ss = secureResources.getStreamingSocket();
     }
-    tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
-    streamingAddr = tcpPeerServer.getStreamingAddr();
+    ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); 
+
+    streamingAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
+                                     ss.getLocalPort());
+
     LOG.info("Opened streaming server at " + streamingAddr);
     this.threadGroup = new ThreadGroup("dataXceiverServer");
     this.dataXceiverServer = new Daemon(threadGroup, 
-        new DataXceiverServer(tcpPeerServer, conf, this));
+        new DataXceiverServer(ss, conf, this));
     this.threadGroup.setDaemon(true); // auto destroy when empty
   }
   

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Jan  9 02:39:15 2013
@@ -39,7 +39,6 @@ import java.nio.channels.ClosedChannelEx
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -79,7 +79,8 @@ class DataXceiver extends Receiver imple
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
-  private final Peer peer;
+  private final Socket s;
+  private final boolean isLocal; //is a local connection?
   private final String remoteAddress; // address of remote side
   private final String localAddress;  // local address of this daemon
   private final DataNode datanode;
@@ -87,7 +88,7 @@ class DataXceiver extends Receiver imple
   private final DataXceiverServer dataXceiverServer;
   private final boolean connectToDnViaHostname;
   private long opStartTime; //the start time of receiving an Op
-  private final InputStream socketIn;
+  private final SocketInputWrapper socketIn;
   private OutputStream socketOut;
 
   /**
@@ -96,23 +97,25 @@ class DataXceiver extends Receiver imple
    */
   private String previousOpClientName;
   
-  public static DataXceiver create(Peer peer, DataNode dn,
+  public static DataXceiver create(Socket s, DataNode dn,
       DataXceiverServer dataXceiverServer) throws IOException {
-    return new DataXceiver(peer, dn, dataXceiverServer);
+    return new DataXceiver(s, dn, dataXceiverServer);
   }
   
-  private DataXceiver(Peer peer, DataNode datanode,
+  private DataXceiver(Socket s, 
+      DataNode datanode, 
       DataXceiverServer dataXceiverServer) throws IOException {
 
-    this.peer = peer;
+    this.s = s;
     this.dnConf = datanode.getDnConf();
-    this.socketIn = peer.getInputStream();
-    this.socketOut = peer.getOutputStream();
+    this.socketIn = NetUtils.getInputStream(s);
+    this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout);
+    this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
     this.dataXceiverServer = dataXceiverServer;
     this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
-    remoteAddress = peer.getRemoteAddressString();
-    localAddress = peer.getLocalAddressString();
+    remoteAddress = s.getRemoteSocketAddress().toString();
+    localAddress = s.getLocalSocketAddress().toString();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Number of active connections is: "
@@ -152,10 +155,11 @@ class DataXceiver extends Receiver imple
   public void run() {
     int opsProcessed = 0;
     Op op = null;
-
-    dataXceiverServer.addPeer(peer);
+    
+    dataXceiverServer.childSockets.add(s);
+    
     try {
-      peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
+      
       InputStream input = socketIn;
       if (dnConf.encryptDataTransfer) {
         IOStreamPair encryptedStreams = null;
@@ -165,9 +169,8 @@ class DataXceiver extends Receiver imple
               dnConf.encryptionAlgorithm);
         } catch (InvalidMagicNumberException imne) {
           LOG.info("Failed to read expected encryption handshake from client " +
-              "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
-              "is running an older version of Hadoop which does not support " +
-              "encryption");
+              "at " + s.getInetAddress() + ". Perhaps the client is running an " +
+              "older version of Hadoop which does not support encryption");
           return;
         }
         input = encryptedStreams.in;
@@ -186,9 +189,9 @@ class DataXceiver extends Receiver imple
         try {
           if (opsProcessed != 0) {
             assert dnConf.socketKeepaliveTimeout > 0;
-            peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
+            socketIn.setTimeout(dnConf.socketKeepaliveTimeout);
           } else {
-            peer.setReadTimeout(dnConf.socketTimeout);
+            socketIn.setTimeout(dnConf.socketTimeout);
           }
           op = readOp();
         } catch (InterruptedIOException ignored) {
@@ -199,7 +202,7 @@ class DataXceiver extends Receiver imple
           if (opsProcessed > 0 &&
               (err instanceof EOFException || err instanceof ClosedChannelException)) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
+              LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops");
             }
           } else {
             throw err;
@@ -209,13 +212,13 @@ class DataXceiver extends Receiver imple
 
         // restore normal timeout
         if (opsProcessed != 0) {
-          peer.setReadTimeout(dnConf.socketTimeout);
+          s.setSoTimeout(dnConf.socketTimeout);
         }
 
         opStartTime = now();
         processOp(op);
         ++opsProcessed;
-      } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0);
+      } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0);
     } catch (Throwable t) {
       LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " +
                 ((op == null) ? "unknown" : op.name()) + " operation " +
@@ -227,8 +230,9 @@ class DataXceiver extends Receiver imple
             + datanode.getXceiverCount());
       }
       updateCurrentThreadName("Cleaning up");
-      dataXceiverServer.closePeer(peer);
       IOUtils.closeStream(in);
+      IOUtils.closeSocket(s);
+      dataXceiverServer.childSockets.remove(s);
     }
   }
 
@@ -282,9 +286,8 @@ class DataXceiver extends Receiver imple
           ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
               HdfsProtoUtil.vintPrefixed(in));
           if (!stat.hasStatus()) {
-            LOG.warn("Client " + peer.getRemoteAddressString() +
-                " did not send a valid status code after reading. " +
-                "Will close connection.");
+            LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
+                     "code after reading. Will close connection.");
             IOUtils.closeStream(out);
           }
         } catch (IOException ioe) {
@@ -317,7 +320,7 @@ class DataXceiver extends Receiver imple
 
     //update metrics
     datanode.metrics.addReadBlockOp(elapsed());
-    datanode.metrics.incrReadsFromClient(peer.isLocal());
+    datanode.metrics.incrReadsFromClient(isLocal);
   }
 
   @Override
@@ -355,8 +358,8 @@ class DataXceiver extends Receiver imple
       LOG.debug("isDatanode=" + isDatanode
           + ", isClient=" + isClient
           + ", isTransfer=" + isTransfer);
-      LOG.debug("writeBlock receive buf size " + peer.getReceiveBufferSize() +
-                " tcp no delay " + peer.getTcpNoDelay());
+      LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
+                " tcp no delay " + s.getTcpNoDelay());
     }
 
     // We later mutate block's generation stamp and length, but we need to
@@ -387,8 +390,8 @@ class DataXceiver extends Receiver imple
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         // open a block receiver
         blockReceiver = new BlockReceiver(block, in, 
-            peer.getRemoteAddressString(),
-            peer.getLocalAddressString(),
+            s.getRemoteSocketAddress().toString(),
+            s.getLocalSocketAddress().toString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum);
       } else {
@@ -543,7 +546,7 @@ class DataXceiver extends Receiver imple
 
     //update metrics
     datanode.metrics.addWriteBlockOp(elapsed());
-    datanode.metrics.incrWritesFromClient(peer.isLocal());
+    datanode.metrics.incrWritesFromClient(isLocal);
   }
 
   @Override
@@ -551,7 +554,7 @@ class DataXceiver extends Receiver imple
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets) throws IOException {
-    checkAccess(socketOut, true, blk, blockToken,
+    checkAccess(null, true, blk, blockToken,
         Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
     previousOpClientName = clientName;
     updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
@@ -638,9 +641,8 @@ class DataXceiver extends Receiver imple
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
-      String msg = "Not able to copy block " + block.getBlockId() + " " +
-          "to " + peer.getRemoteAddressString() + " because threads " +
-          "quota is exceeded.";
+      String msg = "Not able to copy block " + block.getBlockId() + " to " 
+      + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
       LOG.info(msg);
       sendResponse(ERROR, msg);
       return;
@@ -669,7 +671,7 @@ class DataXceiver extends Receiver imple
       datanode.metrics.incrBytesRead((int) read);
       datanode.metrics.incrBlocksRead();
       
-      LOG.info("Copied " + block + " to " + peer.getRemoteAddressString());
+      LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress());
     } catch (IOException ioe) {
       isOpSuccess = false;
       LOG.info("opCopyBlock " + block + " received exception " + ioe);
@@ -714,9 +716,8 @@ class DataXceiver extends Receiver imple
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
-      String msg = "Not able to receive block " + block.getBlockId() +
-          " from " + peer.getRemoteAddressString() + " because threads " +
-          "quota is exceeded.";
+      String msg = "Not able to receive block " + block.getBlockId() + " from " 
+          + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
       LOG.warn(msg);
       sendResponse(ERROR, msg);
       return;
@@ -793,7 +794,7 @@ class DataXceiver extends Receiver imple
       // notify name node
       datanode.notifyNamenodeReceivedBlock(block, delHint);
 
-      LOG.info("Moved " + block + " from " + peer.getRemoteAddressString());
+      LOG.info("Moved " + block + " from " + s.getRemoteSocketAddress());
       
     } catch (IOException ioe) {
       opStatus = ERROR;
@@ -816,7 +817,7 @@ class DataXceiver extends Receiver imple
       try {
         sendResponse(opStatus, errMsg);
       } catch (IOException ioe) {
-        LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
+        LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
       }
       IOUtils.closeStream(proxyOut);
       IOUtils.closeStream(blockReceiver);
@@ -870,7 +871,7 @@ class DataXceiver extends Receiver imple
   }
   
 
-  private void checkAccess(OutputStream out, final boolean reply, 
+  private void checkAccess(DataOutputStream out, final boolean reply, 
       final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> t,
       final Op op,
@@ -885,6 +886,11 @@ class DataXceiver extends Receiver imple
       } catch(InvalidToken e) {
         try {
           if (reply) {
+            if (out == null) {
+              out = new DataOutputStream(
+                  NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
+            }
+            
             BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
               .setStatus(ERROR_ACCESS_TOKEN);
             if (mode == BlockTokenSecretManager.AccessMode.WRITE) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Wed Jan  9 02:39:15 2013
@@ -18,16 +18,18 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.channels.AsynchronousCloseException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.net.PeerServer;
 import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
@@ -43,9 +45,11 @@ import org.apache.hadoop.util.Daemon;
 class DataXceiverServer implements Runnable {
   public static final Log LOG = DataNode.LOG;
   
-  private final PeerServer peerServer;
-  private final DataNode datanode;
-  private final Set<Peer> peers = new HashSet<Peer>();
+  ServerSocket ss;
+  DataNode datanode;
+  // Record all sockets opened for data transfer
+  Set<Socket> childSockets = Collections.synchronizedSet(
+                                       new HashSet<Socket>());
   
   /**
    * Maximal number of concurrent xceivers per node.
@@ -105,10 +109,10 @@ class DataXceiverServer implements Runna
   long estimateBlockSize;
   
   
-  DataXceiverServer(PeerServer peerServer, Configuration conf,
+  DataXceiverServer(ServerSocket ss, Configuration conf, 
       DataNode datanode) {
     
-    this.peerServer = peerServer;
+    this.ss = ss;
     this.datanode = datanode;
     
     this.maxXceiverCount = 
@@ -126,10 +130,12 @@ class DataXceiverServer implements Runna
 
   @Override
   public void run() {
-    Peer peer = null;
     while (datanode.shouldRun) {
+      Socket s = null;
       try {
-        peer = peerServer.accept();
+        s = ss.accept();
+        s.setTcpNoDelay(true);
+        // Timeouts are set within DataXceiver.run()
 
         // Make sure the xceiver count is not exceeded
         int curXceiverCount = datanode.getXceiverCount();
@@ -140,7 +146,7 @@ class DataXceiverServer implements Runna
         }
 
         new Daemon(datanode.threadGroup,
-            DataXceiver.create(peer, datanode, this))
+            DataXceiver.create(s, datanode, this))
             .start();
       } catch (SocketTimeoutException ignored) {
         // wake up to see if should continue to run
@@ -151,10 +157,10 @@ class DataXceiverServer implements Runna
           LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
         }
       } catch (IOException ie) {
-        IOUtils.cleanup(null, peer);
+        IOUtils.closeSocket(s);
         LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
       } catch (OutOfMemoryError ie) {
-        IOUtils.cleanup(null, peer);
+        IOUtils.closeSocket(s);
         // DataNode can run out of memory if there is too many transfers.
         // Log the event, Sleep for 30 seconds, other transfers may complete by
         // then.
@@ -170,35 +176,33 @@ class DataXceiverServer implements Runna
         datanode.shouldRun = false;
       }
     }
-    synchronized (this) {
-      for (Peer p : peers) {
-        IOUtils.cleanup(LOG, p);
-      }
-    }
     try {
-      peerServer.close();
+      ss.close();
     } catch (IOException ie) {
       LOG.warn(datanode.getDisplayName()
           + " :DataXceiverServer: close exception", ie);
     }
   }
-
+  
   void kill() {
     assert datanode.shouldRun == false :
       "shoudRun should be set to false before killing";
     try {
-      this.peerServer.close();
+      this.ss.close();
     } catch (IOException ie) {
       LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie);
     }
-  }
-  
-  synchronized void addPeer(Peer peer) {
-    peers.add(peer);
-  }
 
-  synchronized void closePeer(Peer peer) {
-    peers.remove(peer);
-    IOUtils.cleanup(null, peer);
+    // close all the sockets that were accepted earlier
+    synchronized (childSockets) {
+      for (Iterator<Socket> it = childSockets.iterator();
+           it.hasNext();) {
+        Socket thissock = it.next();
+        try {
+          thissock.close();
+        } catch (IOException e) {
+        }
+      }
+    }
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed Jan  9 02:39:15 2013
@@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -560,13 +559,13 @@ public class NamenodeFsck {
         
         blockReader = BlockReaderFactory.newBlockReader(
           new BlockReaderFactory.Params(new Conf(conf)).
-            setPeer(TcpPeerServer.peerFromSocketAndKey(s,
-                namenode.getRpcServer().getDataEncryptionKey())).
-            setBlock(block).
+            setSocket(s).setBlock(block).
             setFile(BlockReaderFactory.getFileName(targetAddr, 
                 block.getBlockPoolId(), block.getBlockId())).
             setBlockToken(lblock.getBlockToken()).
-            setDatanodeID(chosenNode));
+            setEncryptionKey(namenode.getRpcServer().getDataEncryptionKey()).
+            setLen(-1));
+        
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue
         LOG.info("Failed to connect to " + targetAddr + ":" + ex);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Wed Jan  9 02:39:15 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -153,13 +152,13 @@ public class BlockReaderTestUtil {
 
     return BlockReaderFactory.newBlockReader(
         new BlockReaderFactory.Params(new Conf(conf)).
-          setPeer(TcpPeerServer.peerFromSocket(sock)).
+          setSocket(sock).
           setFile(targetAddr.toString() + ":" + block.getBlockId()).
           setBlock(block).setBlockToken(testBlock.getBlockToken()).
           setStartOffset(offset).setLen(lenToRead).
           setBufferSize(conf.getInt(
               CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096)).
-          setVerifyChecksum(true).setDatanodeID(nodes[0]));
+          setVerifyChecksum(true));
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java Wed Jan  9 02:39:15 2013
@@ -61,7 +61,7 @@ public class TestClientBlockVerification
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close(null);
+    reader.close();
   }
 
   /**
@@ -76,7 +76,7 @@ public class TestClientBlockVerification
     // We asked the blockreader for the whole file, and only read
     // half of it, so no CHECKSUM_OK
     verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
-    reader.close(null);
+    reader.close();
   }
 
   /**
@@ -92,7 +92,7 @@ public class TestClientBlockVerification
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close(null);
+    reader.close();
   }
 
   /**
@@ -111,7 +111,7 @@ public class TestClientBlockVerification
             util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(Status.CHECKSUM_OK);
-        reader.close(null);
+        reader.close();
       }
     }
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Wed Jan  9 02:39:15 2013
@@ -18,20 +18,28 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-
-import junit.framework.Assert;
+import java.net.Socket;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.security.token.Token;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
@@ -47,31 +55,59 @@ public class TestConnCache {
 
   static final int BLOCK_SIZE = 4096;
   static final int FILE_SIZE = 3 * BLOCK_SIZE;
+  final static int CACHE_SIZE = 4;
+  final static long CACHE_EXPIRY_MS = 200;
+  static Configuration conf = null;
+  static MiniDFSCluster cluster = null;
+  static FileSystem fs = null;
+  static SocketCache cache;
+
+  static final Path testFile = new Path("/testConnCache.dat");
+  static byte authenticData[] = null;
+
+  static BlockReaderTestUtil util = null;
+
 
   /**
    * A mock Answer to remember the BlockReader used.
    *
    * It verifies that all invocation to DFSInputStream.getBlockReader()
-   * use the same peer.
+   * use the same socket.
    */
   private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
     public RemoteBlockReader2 reader = null;
-    private Peer peer = null;
+    private Socket sock = null;
 
     @Override
     public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
       RemoteBlockReader2 prevReader = reader;
       reader = (RemoteBlockReader2) invocation.callRealMethod();
-      if (peer == null) {
-        peer = reader.getPeer();
+      if (sock == null) {
+        sock = reader.dnSock;
       } else if (prevReader != null) {
-        Assert.assertSame("DFSInputStream should use the same peer",
-                   peer, reader.getPeer());
+        assertSame("DFSInputStream should use the same socket",
+                   sock, reader.dnSock);
       }
       return reader;
     }
   }
 
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    final int REPLICATION_FACTOR = 1;
+
+    /* create a socket cache. There is only one socket cache per jvm */
+    cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
+
+    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+    cluster = util.getCluster();
+    conf = util.getConf();
+    fs = cluster.getFileSystem();
+
+    authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
+  }
+
+
   /**
    * (Optionally) seek to position, read and verify data.
    *
@@ -81,10 +117,9 @@ public class TestConnCache {
                      long pos,
                      byte[] buffer,
                      int offset,
-                     int length,
-                     byte[] authenticData)
+                     int length)
       throws IOException {
-    Assert.assertTrue("Test buffer too small", buffer.length >= offset + length);
+    assertTrue("Test buffer too small", buffer.length >= offset + length);
 
     if (pos >= 0)
       in.seek(pos);
@@ -94,7 +129,7 @@ public class TestConnCache {
 
     while (length > 0) {
       int cnt = in.read(buffer, offset, length);
-      Assert.assertTrue("Error in read", cnt > 0);
+      assertTrue("Error in read", cnt > 0);
       offset += cnt;
       length -= cnt;
     }
@@ -110,22 +145,115 @@ public class TestConnCache {
   }
 
   /**
+   * Test the SocketCache itself.
+   */
+  @Test
+  public void testSocketCache() throws Exception {
+    // Make a client
+    InetSocketAddress nnAddr =
+        new InetSocketAddress("localhost", cluster.getNameNodePort());
+    DFSClient client = new DFSClient(nnAddr, conf);
+
+    // Find out the DN addr
+    LocatedBlock block =
+        client.getNamenode().getBlockLocations(
+            testFile.toString(), 0, FILE_SIZE)
+        .getLocatedBlocks().get(0);
+    DataNode dn = util.getDataNode(block);
+    InetSocketAddress dnAddr = dn.getXferAddress();
+
+
+    // Make some sockets to the DN
+    Socket[] dnSockets = new Socket[CACHE_SIZE];
+    for (int i = 0; i < dnSockets.length; ++i) {
+      dnSockets[i] = client.socketFactory.createSocket(
+          dnAddr.getAddress(), dnAddr.getPort());
+    }
+
+
+    // Insert a socket to the NN
+    Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
+    cache.put(nnSock, null);
+    assertSame("Read the write", nnSock, cache.get(nnAddr).sock);
+    cache.put(nnSock, null);
+
+    // Insert DN socks
+    for (Socket dnSock : dnSockets) {
+      cache.put(dnSock, null);
+    }
+
+    assertEquals("NN socket evicted", null, cache.get(nnAddr));
+    assertTrue("Evicted socket closed", nnSock.isClosed());
+ 
+    // Lookup the DN socks
+    for (Socket dnSock : dnSockets) {
+      assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
+      dnSock.close();
+    }
+
+    assertEquals("Cache is empty", 0, cache.size());
+  }
+
+
+  /**
+   * Test the SocketCache expiry.
+   * Verify that socket cache entries expire after the set
+   * expiry time.
+   */
+  @Test
+  public void testSocketCacheExpiry() throws Exception {
+    // Make a client
+    InetSocketAddress nnAddr =
+        new InetSocketAddress("localhost", cluster.getNameNodePort());
+    DFSClient client = new DFSClient(nnAddr, conf);
+
+    // Find out the DN addr
+    LocatedBlock block =
+        client.getNamenode().getBlockLocations(
+            testFile.toString(), 0, FILE_SIZE)
+        .getLocatedBlocks().get(0);
+    DataNode dn = util.getDataNode(block);
+    InetSocketAddress dnAddr = dn.getXferAddress();
+
+
+    // Make some sockets to the DN and put in cache
+    Socket[] dnSockets = new Socket[CACHE_SIZE];
+    for (int i = 0; i < dnSockets.length; ++i) {
+      dnSockets[i] = client.socketFactory.createSocket(
+          dnAddr.getAddress(), dnAddr.getPort());
+      cache.put(dnSockets[i], null);
+    }
+
+    // Client side still has the sockets cached
+    assertEquals(CACHE_SIZE, client.socketCache.size());
+
+    //sleep for a second and see if it expired
+    Thread.sleep(CACHE_EXPIRY_MS + 1000);
+    
+    // Client side has no sockets cached
+    assertEquals(0, client.socketCache.size());
+
+    //sleep for another second and see if 
+    //the daemon thread runs fine on empty cache
+    Thread.sleep(CACHE_EXPIRY_MS + 1000);
+  }
+
+
+  /**
    * Read a file served entirely from one DN. Seek around and read from
    * different offsets. And verify that they all use the same socket.
-   * @throws Exception 
+   *
+   * @throws java.io.IOException
    */
   @Test
   @SuppressWarnings("unchecked")
-  public void testReadFromOneDN() throws Exception {
-    BlockReaderTestUtil util = new BlockReaderTestUtil(1,
-        new HdfsConfiguration());
-    final Path testFile = new Path("/testConnCache.dat");
-    byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024);
+  public void testReadFromOneDN() throws IOException {
+    LOG.info("Starting testReadFromOneDN()");
     DFSClient client = new DFSClient(
-        new InetSocketAddress("localhost",
-            util.getCluster().getNameNodePort()), util.getConf());
-    DFSInputStream in = Mockito.spy(client.open(testFile.toString()));
+        new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+    DFSInputStream in = spy(client.open(testFile.toString()));
     LOG.info("opened " + testFile.toString());
+
     byte[] dataBuf = new byte[BLOCK_SIZE];
 
     MockGetBlockReader answer = new MockGetBlockReader();
@@ -142,15 +270,18 @@ public class TestConnCache {
                            Matchers.anyString());
 
     // Initial read
-    pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);
+    pread(in, 0, dataBuf, 0, dataBuf.length);
     // Read again and verify that the socket is the same
-    pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length,
-        authenticData);
-    pread(in, 1024, dataBuf, 0, dataBuf.length, authenticData);
-    // No seek; just read
-    pread(in, -1, dataBuf, 0, dataBuf.length, authenticData);
-    pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData);
+    pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length);
+    pread(in, 1024, dataBuf, 0, dataBuf.length);
+    pread(in, -1, dataBuf, 0, dataBuf.length);            // No seek; just read
+    pread(in, 64, dataBuf, 0, dataBuf.length / 2);
 
     in.close();
   }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    util.shutdown();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Wed Jan  9 02:39:15 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -93,13 +92,13 @@ public class TestDataTransferKeepalive {
     DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
 
     // Clients that write aren't currently re-used.
-    assertEquals(0, dfsClient.peerCache.size());
+    assertEquals(0, dfsClient.socketCache.size());
     assertXceiverCount(0);
 
     // Reads the file, so we should get a
     // cached socket, and should have an xceiver on the other side.
     DFSTestUtil.readFile(fs, TEST_FILE);
-    assertEquals(1, dfsClient.peerCache.size());
+    assertEquals(1, dfsClient.socketCache.size());
     assertXceiverCount(1);
 
     // Sleep for a bit longer than the keepalive timeout
@@ -110,13 +109,13 @@ public class TestDataTransferKeepalive {
     // The socket is still in the cache, because we don't
     // notice that it's closed until we try to read
     // from it again.
-    assertEquals(1, dfsClient.peerCache.size());
+    assertEquals(1, dfsClient.socketCache.size());
     
     // Take it out of the cache - reading should
     // give an EOF.
-    Peer peer = dfsClient.peerCache.get(dn.getDatanodeId());
-    assertNotNull(peer);
-    assertEquals(-1, peer.getInputStream().read());
+    Socket s = dfsClient.socketCache.get(dnAddr).sock;
+    assertNotNull(s);
+    assertEquals(-1, NetUtils.getInputStream(s).read());
   }
 
   /**
@@ -175,14 +174,14 @@ public class TestDataTransferKeepalive {
     }
     
     DFSClient client = ((DistributedFileSystem)fs).dfs;
-    assertEquals(5, client.peerCache.size());
+    assertEquals(5, client.socketCache.size());
     
     // Let all the xceivers timeout
     Thread.sleep(1500);
     assertXceiverCount(0);
 
     // Client side still has the sockets cached
-    assertEquals(5, client.peerCache.size());
+    assertEquals(5, client.socketCache.size());
 
     // Reading should not throw an exception.
     DFSTestUtil.readFile(fs, TEST_FILE);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Wed Jan  9 02:39:15 2013
@@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.DFSClient.
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -148,10 +147,9 @@ public class TestBlockTokenWithDFS {
           "test-blockpoolid", block.getBlockId());
       blockReader = BlockReaderFactory.newBlockReader(
           new BlockReaderFactory.Params(new Conf(conf)).
-            setPeer(TcpPeerServer.peerFromSocket(s)).
-            setBlock(block).setFile(file).
+            setSocket(s).setBlock(block).setFile(file).
             setBlockToken(lblock.getBlockToken()).setStartOffset(0).
-            setLen(-1).setDatanodeID(nodes[0]));
+            setLen(-1));
     } catch (IOException ex) {
       if (ex instanceof InvalidBlockTokenException) {
         assertFalse("OP_READ_BLOCK: access token is invalid, "

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1430662&r1=1430661&r2=1430662&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Wed Jan  9 02:39:15 2013
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -285,9 +284,8 @@ public class TestDataNodeVolumeFailure {
           setFile(BlockReaderFactory.getFileName(targetAddr, 
               "test-blockpoolid", block.getBlockId())).
           setBlock(block).setBlockToken(lblock.getBlockToken()).
-          setPeer(TcpPeerServer.peerFromSocket(s)).
-          setDatanodeID(datanode));
-    blockReader.close(null);
+          setSocket(s));
+    blockReader.close();
   }
   
   /**