You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2013/01/12 00:52:24 UTC

svn commit: r1432335 [1/2] - in /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/net/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer...

Author: todd
Date: Fri Jan 11 23:52:22 2013
New Revision: 1432335

URL: http://svn.apache.org/viewvc?rev=1432335&view=rev
Log:
HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths. Contributed by Colin Patrick McCabe.

Added:
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
Removed:
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelLocalRead.java
Modified:
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt Fri Jan 11 23:52:22 2013
@@ -7,3 +7,6 @@ HDFS-4353. Encapsulate connections to pe
 
 HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests.
 (Colin Patrick McCabe via todd)
+
+HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths.
+(Colin Patrick McCabe via todd)

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml Fri Jan 11 23:52:22 2013
@@ -290,6 +290,14 @@
        <Method name="persistPaxosData" />
        <Bug pattern="OS_OPEN_STREAM" />
      </Match>
+
+     <!-- getShortCircuitFdsForRead is supposed to return open streams. -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl" />
+       <Method name="getShortCircuitFdsForRead" />
+       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+     </Match>
+
      <!-- Don't complain about LocalDatanodeInfo's anonymous class -->
      <Match>
        <Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Fri Jan 11 23:52:22 2013
@@ -42,17 +42,28 @@ public interface BlockReader extends Byt
   long skip(long n) throws IOException;
 
   /**
+   * Returns an estimate of the number of bytes that can be read
+   * (or skipped over) from this input stream without performing
+   * network I/O.
+   */
+  int available() throws IOException;
+
+  /**
    * Close the block reader.
    *
    * @param peerCache      The PeerCache to put the Peer we're using back
    *                       into, or null if we should simply close the Peer
    *                       we're using (along with its Socket).
-   *                       Some block readers, like BlockReaderLocal, may
-   *                       not make use of this parameter.
+   *                       Ignored by Readers that don't maintain Peers.
+   * @param fisCache       The FileInputStreamCache to put our FileInputStreams
+   *                       back into, or null if we should simply close them.
+   *                       Ignored by Readers that don't maintain
+   *                       FileInputStreams.
    *
    * @throws IOException
    */
-  void close(PeerCache peerCache) throws IOException;
+  void close(PeerCache peerCache, FileInputStreamCache fisCache)
+      throws IOException;
 
   /**
    * Read exactly the given amount of data, throwing an exception

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Fri Jan 11 23:52:22 2013
@@ -17,22 +17,26 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.Token;
 
 
@@ -58,6 +62,12 @@ public class BlockReaderFactory {
    * @param clientName  Client name.  Used for log messages.
    * @param peer  The peer
    * @param datanodeID  The datanode that the Peer is connected to
+   * @param domainSocketFactory  The DomainSocketFactory to notify if the Peer
+   *                             is a DomainPeer which turns out to be faulty.
+   *                             If null, no factory will be notified in this
+   *                             case.
+   * @param allowShortCircuitLocalReads  True if short-circuit local reads
+   *                                     should be allowed.
    * @return New BlockReader instance, or null on error.
    */
   @SuppressWarnings("deprecation")
@@ -70,11 +80,44 @@ public class BlockReaderFactory {
                                      boolean verifyChecksum,
                                      String clientName,
                                      Peer peer,
-                                     DatanodeID datanodeID)
-                                     throws IOException {
+                                     DatanodeID datanodeID,
+                                     DomainSocketFactory domSockFactory,
+                                     boolean allowShortCircuitLocalReads)
+  throws IOException {
     peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
         HdfsServerConstants.READ_TIMEOUT));
     peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
+
+    if (peer.getDomainSocket() != null) {
+      if (allowShortCircuitLocalReads) {
+        // If this is a domain socket, and short-circuit local reads are 
+        // enabled, try to set up a BlockReaderLocal.
+        BlockReader reader = newShortCircuitBlockReader(conf, file,
+            block, blockToken, startOffset, len, peer, datanodeID,
+            domSockFactory, verifyChecksum);
+        if (reader != null) {
+          // One we've constructed the short-circuit block reader, we don't
+          // need the socket any more.  So let's return it to the cache.
+          PeerCache peerCache = PeerCache.getInstance(
+              conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 
+                DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT),
+              conf.getLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, 
+                DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT));
+          peerCache.put(datanodeID, peer);
+          return reader;
+        }
+      }
+      // If this is a domain socket and we couldn't (or didn't want to) set
+      // up a BlockReaderLocal, check that we are allowed to pass data traffic
+      // over the socket before proceeding.
+      if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+            DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
+        throw new IOException("Because we can't do short-circuit access, " +
+          "and data traffic over domain sockets is disabled, " +
+          "we cannot use this socket to talk to " + datanodeID);
+      }
+    }
+
     if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
         DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) {
       return RemoteBlockReader.newBlockReader(file,
@@ -88,7 +131,94 @@ public class BlockReaderFactory {
           verifyChecksum, clientName, peer, datanodeID);
     }
   }
-  
+
+  /**
+   * Create a new short-circuit BlockReader.
+   * 
+   * Here, we ask the DataNode to pass us file descriptors over our
+   * DomainSocket.  If the DataNode declines to do so, we'll return null here;
+   * otherwise, we'll return the BlockReaderLocal.  If the DataNode declines,
+   * this function will inform the DomainSocketFactory that short-circuit local
+   * reads are disabled for this DataNode, so that we don't ask again.
+   * 
+   * @param conf               the configuration.
+   * @param file               the file name. Used in log messages.
+   * @param block              The block object.
+   * @param blockToken         The block token for security.
+   * @param startOffset        The read offset, relative to block head.
+   * @param len                The number of bytes to read, or -1 to read 
+   *                           as many as possible.
+   * @param peer               The peer to use.
+   * @param datanodeID         The datanode that the Peer is connected to.
+   * @param domSockFactory     The DomainSocketFactory to notify if the Peer
+   *                           is a DomainPeer which turns out to be faulty.
+   *                           If null, no factory will be notified in this
+   *                           case.
+   * @param verifyChecksum     True if we should verify the checksums.
+   *                           Note: even if this is true, when
+   *                           DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
+   *                           set, we will skip checksums.
+   *
+   * @return                   The BlockReaderLocal, or null if the
+   *                           DataNode declined to provide short-circuit
+   *                           access.
+   * @throws IOException       If there was a communication error.
+   */
+  private static BlockReaderLocal newShortCircuitBlockReader(
+      Configuration conf, String file, ExtendedBlock block,
+      Token<BlockTokenIdentifier> blockToken, long startOffset,
+      long len, Peer peer, DatanodeID datanodeID,
+      DomainSocketFactory domSockFactory, boolean verifyChecksum)
+          throws IOException {
+    final DataOutputStream out =
+        new DataOutputStream(new BufferedOutputStream(
+          peer.getOutputStream()));
+    new Sender(out).requestShortCircuitFds(block, blockToken, 1);
+    DataInputStream in =
+        new DataInputStream(peer.getInputStream());
+    BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+        HdfsProtoUtil.vintPrefixed(in));
+    DomainSocket sock = peer.getDomainSocket();
+    switch (resp.getStatus()) {
+    case SUCCESS:
+      BlockReaderLocal reader = null;
+      byte buf[] = new byte[1];
+      FileInputStream fis[] = new FileInputStream[2];
+      sock.recvFileInputStreams(fis, buf, 0, buf.length);
+      try {
+        reader = new BlockReaderLocal(conf, file, block,
+            startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum);
+      } finally {
+        if (reader == null) {
+          IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
+        }
+      }
+      return reader;
+    case ERROR_UNSUPPORTED:
+      if (!resp.hasShortCircuitAccessVersion()) {
+        DFSClient.LOG.warn("short-circuit read access is disabled for " +
+            "DataNode " + datanodeID + ".  reason: " + resp.getMessage());
+        domSockFactory.disableShortCircuitForPath(sock.getPath());
+      } else {
+        DFSClient.LOG.warn("short-circuit read access for the file " +
+            file + " is disabled for DataNode " + datanodeID +
+            ".  reason: " + resp.getMessage());
+      }
+      return null;
+    case ERROR_ACCESS_TOKEN:
+      String msg = "access control error while " +
+          "attempting to set up short-circuit access to " +
+          file + resp.getMessage();
+      DFSClient.LOG.debug(msg);
+      throw new InvalidBlockTokenException(msg);
+    default:
+      DFSClient.LOG.warn("error while attempting to set up short-circuit " +
+          "access to " + file + ": " + resp.getMessage());
+      domSockFactory.disableShortCircuitForPath(sock.getPath());
+      return null;
+    }
+  }
+
   /**
    * File name to print when accessing a block directly (from servlets)
    * @param s Address of the block location

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Fri Jan 11 23:52:22 2013
@@ -18,30 +18,18 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.DataInputStream;
-import java.io.File;
+import org.apache.hadoop.conf.Configuration;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
 /**
@@ -53,74 +41,19 @@ import org.apache.hadoop.util.DataChecks
  * <ul>
  * <li>The client performing short circuit reads must be configured at the
  * datanode.</li>
- * <li>The client gets the path to the file where block is stored using
- * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
- * RPC call</li>
- * <li>Client uses kerberos authentication to connect to the datanode over RPC,
- * if security is enabled.</li>
+ * <li>The client gets the file descriptors for the metadata file and the data 
+ * file for the block using
+ * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
+ * </li>
+ * <li>The client reads the file descriptors.</li>
  * </ul>
  */
 class BlockReaderLocal implements BlockReader {
-  private static final Log LOG = LogFactory.getLog(DFSClient.class);
-
-  //Stores the cache and proxy for a local datanode.
-  private static class LocalDatanodeInfo {
-    private ClientDatanodeProtocol proxy = null;
-    private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
-
-    LocalDatanodeInfo() {
-      final int cacheSize = 10000;
-      final float hashTableLoadFactor = 0.75f;
-      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
-      cache = Collections
-          .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
-              hashTableCapacity, hashTableLoadFactor, true) {
-            private static final long serialVersionUID = 1;
-
-            @Override
-            protected boolean removeEldestEntry(
-                Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
-              return size() > cacheSize;
-            }
-          });
-    }
-
-    private synchronized ClientDatanodeProtocol getDatanodeProxy(
-        DatanodeInfo node, Configuration conf, int socketTimeout,
-        boolean connectToDnViaHostname) throws IOException {
-      if (proxy == null) {
-        proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
-            socketTimeout, connectToDnViaHostname);
-      }
-      return proxy;
-    }
-    
-    private synchronized void resetDatanodeProxy() {
-      if (null != proxy) {
-        RPC.stopProxy(proxy);
-        proxy = null;
-      }
-    }
-
-    private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
-      return cache.get(b);
-    }
-
-    private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
-      cache.put(b, info);
-    }
-
-    private void removeBlockLocalPathInfo(ExtendedBlock b) {
-      cache.remove(b);
-    }
-  }
-  
-  // Multiple datanodes could be running on the local machine. Store proxies in
-  // a map keyed by the ipc port of the datanode.
-  private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+  static final Log LOG = LogFactory.getLog(DFSClient.class);
 
   private final FileInputStream dataIn; // reader for the data file
   private final FileInputStream checksumIn;   // reader for the checksum file
+  private final boolean verifyChecksum;
 
   /**
    * Offset from the most recent chunk boundary at which the next read should
@@ -140,7 +73,6 @@ class BlockReaderLocal implements BlockR
   private ByteBuffer slowReadBuff = null;
   private ByteBuffer checksumBuff = null;
   private DataChecksum checksum;
-  private final boolean verifyChecksum;
 
   private static DirectBufferPool bufferPool = new DirectBufferPool();
 
@@ -150,186 +82,90 @@ class BlockReaderLocal implements BlockR
   /** offset in block where reader wants to actually read */
   private long startOffset;
   private final String filename;
-  
-  /**
-   * The only way this object can be instantiated.
-   */
-  static BlockReaderLocal newBlockReader(Configuration conf, String file,
-      ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
-      int socketTimeout, long startOffset, long length,
-      boolean connectToDnViaHostname) throws IOException {
-
-    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
-        .getIpcPort());
-    // check the cache first
-    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
-    if (pathinfo == null) {
-      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
-          connectToDnViaHostname);
-    }
-
-    // check to see if the file exists. It may so happen that the
-    // HDFS file has been deleted and this block-lookup is occurring
-    // on behalf of a new HDFS file. This time, the block file could
-    // be residing in a different portion of the fs.data.dir directory.
-    // In this case, we remove this entry from the cache. The next
-    // call to this method will re-populate the cache.
-    FileInputStream dataIn = null;
-    FileInputStream checksumIn = null;
-    BlockReaderLocal localBlockReader = null;
-    boolean skipChecksumCheck = skipChecksumCheck(conf);
-    try {
-      // get a local file system
-      File blkfile = new File(pathinfo.getBlockPath());
-      dataIn = new FileInputStream(blkfile);
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
-            + blkfile.length() + " startOffset " + startOffset + " length "
-            + length + " short circuit checksum " + !skipChecksumCheck);
-      }
-
-      if (!skipChecksumCheck) {
-        // get the metadata file
-        File metafile = new File(pathinfo.getMetaPath());
-        checksumIn = new FileInputStream(metafile);
-
-        // read and handle the common header here. For now just a version
-        BlockMetadataHeader header = BlockMetadataHeader
-            .readHeader(new DataInputStream(checksumIn));
-        short version = header.getVersion();
-        if (version != BlockMetadataHeader.VERSION) {
-          LOG.warn("Wrong version (" + version + ") for metadata file for "
-              + blk + " ignoring ...");
-        }
-        DataChecksum checksum = header.getChecksum();
-        long firstChunkOffset = startOffset
-            - (startOffset % checksum.getBytesPerChecksum());
-        localBlockReader = new BlockReaderLocal(conf, file, blk, token,
-            startOffset, length, pathinfo, checksum, true, dataIn,
-            firstChunkOffset, checksumIn);
-      } else {
-        localBlockReader = new BlockReaderLocal(conf, file, blk, token,
-            startOffset, length, pathinfo, dataIn);
-      }
-    } catch (IOException e) {
-      // remove from cache
-      localDatanodeInfo.removeBlockLocalPathInfo(blk);
-      DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk
-          + " from cache because local file " + pathinfo.getBlockPath()
-          + " could not be opened.");
-      throw e;
-    } finally {
-      if (localBlockReader == null) {
-        if (dataIn != null) {
-          dataIn.close();
-        }
-        if (checksumIn != null) {
-          checksumIn.close();
-        }
-      }
-    }
-    return localBlockReader;
-  }
-  
-  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
-    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
-    if (ldInfo == null) {
-      ldInfo = new LocalDatanodeInfo();
-      localDatanodeInfoMap.put(port, ldInfo);
-    }
-    return ldInfo;
-  }
-  
-  private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
-      DatanodeInfo node, Configuration conf, int timeout,
-      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
-          throws IOException {
-    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
-    BlockLocalPathInfo pathinfo = null;
-    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
-        conf, timeout, connectToDnViaHostname);
-    try {
-      // make RPC to local datanode to find local pathnames of blocks
-      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
-      if (pathinfo != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
-        }
-        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
-      }
-    } catch (IOException e) {
-      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
-      throw e;
-    }
-    return pathinfo;
-  }
-  
-  private static boolean skipChecksumCheck(Configuration conf) {
-    return conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
-  }
+  private final DatanodeID datanodeID;
+  private final ExtendedBlock block;
   
-  private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
-    int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-
-    if (bufferSizeBytes < bytesPerChecksum) {
-      throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + bufferSizeBytes + ") " +
-          "is not large enough to hold a single chunk (" + bytesPerChecksum +  "). Please configure " +
+  private static int getSlowReadBufferNumChunks(Configuration conf,
+      int bytesPerChecksum) {
+
+    int bufSize =
+        conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+
+    if (bufSize < bytesPerChecksum) {
+      throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
+          bufSize + ") is not large enough to hold a single chunk (" +
+          bytesPerChecksum +  "). Please configure " +
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
     }
 
     // Round down to nearest chunk size
-    return bufferSizeBytes / bytesPerChecksum;
-  }
-
-  private BlockReaderLocal(Configuration conf, String hdfsfile,
-      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
-      long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
-      throws IOException {
-    this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
-        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
-        dataIn, startOffset, null);
+    return bufSize / bytesPerChecksum;
   }
 
-  private BlockReaderLocal(Configuration conf, String hdfsfile,
-      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
-      long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
-      boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
-      FileInputStream checksumIn) throws IOException {
-    this.filename = hdfsfile;
-    this.checksum = checksum;
-    this.verifyChecksum = verifyChecksum;
-    this.startOffset = Math.max(startOffset, 0);
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-
+  public BlockReaderLocal(Configuration conf, String filename,
+      ExtendedBlock block, long startOffset, long length,
+      FileInputStream dataIn, FileInputStream checksumIn,
+      DatanodeID datanodeID, boolean verifyChecksum) throws IOException {
     this.dataIn = dataIn;
     this.checksumIn = checksumIn;
-    this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
-
-    int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
-    slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
-    checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
-    // Initially the buffers have nothing to read.
-    slowReadBuff.flip();
-    checksumBuff.flip();
+    this.startOffset = Math.max(startOffset, 0);
+    this.filename = filename;
+    this.datanodeID = datanodeID;
+    this.block = block;
+
+    // read and handle the common header here. For now just a version
+    checksumIn.getChannel().position(0);
+    BlockMetadataHeader header = BlockMetadataHeader
+        .readHeader(new DataInputStream(checksumIn));
+    short version = header.getVersion();
+    if (version != BlockMetadataHeader.VERSION) {
+      throw new IOException("Wrong version (" + version + ") of the " +
+          "metadata file for " + filename + ".");
+    }
+    if (!verifyChecksum) {
+      this.verifyChecksum = false; 
+    } else {
+      this.verifyChecksum = !conf.getBoolean(DFSConfigKeys.
+          DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, 
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+    }
+    long firstChunkOffset;
+    if (this.verifyChecksum) {
+      this.checksum = header.getChecksum();
+      this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
+      this.checksumSize = this.checksum.getChecksumSize();
+      firstChunkOffset = startOffset
+          - (startOffset % checksum.getBytesPerChecksum());
+      this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
+
+      int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+      slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+      checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+      // Initially the buffers have nothing to read.
+      slowReadBuff.flip();
+      checksumBuff.flip();
+      long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+      IOUtils.skipFully(checksumIn, checkSumOffset);
+    } else {
+      firstChunkOffset = startOffset;
+      this.checksum = null;
+      this.bytesPerChecksum = 0;
+      this.checksumSize = 0;
+      this.offsetFromChunkBoundary = 0;
+    }
+    
     boolean success = false;
     try {
-      // Skip both input streams to beginning of the chunk containing startOffset
-      IOUtils.skipFully(dataIn, firstChunkOffset);
-      if (checksumIn != null) {
-        long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
-        IOUtils.skipFully(checksumIn, checkSumOffset);
-      }
+      // Reposition both input streams to the beginning of the chunk
+      // containing startOffset
+      this.dataIn.getChannel().position(firstChunkOffset);
       success = true;
     } finally {
       if (!success) {
-        bufferPool.returnBuffer(slowReadBuff);
-        bufferPool.returnBuffer(checksumBuff);
+        if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
+        if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
       }
     }
   }
@@ -649,9 +485,17 @@ class BlockReaderLocal implements BlockR
   }
 
   @Override
-  public synchronized void close(PeerCache peerCache) throws IOException {
-    dataIn.close();
-    if (checksumIn != null) {
+  public synchronized void close(PeerCache peerCache,
+      FileInputStreamCache fisCache) throws IOException {
+    if (fisCache != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("putting FileInputStream for " + filename +
+            " back into FileInputStreamCache");
+      }
+      fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn});
+    } else {
+      LOG.debug("closing FileInputStream for " + filename);
+      dataIn.close();
       checksumIn.close();
     }
     if (slowReadBuff != null) {
@@ -675,4 +519,10 @@ class BlockReaderLocal implements BlockR
   public void readFully(byte[] buf, int off, int len) throws IOException {
     BlockReaderUtil.readFully(this, buf, off, len);
   }
+
+  @Override
+  public int available() throws IOException {
+    // We never do network I/O in BlockReaderLocal.
+    return Integer.MAX_VALUE;
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jan 11 23:52:22 2013
@@ -128,7 +128,6 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -227,6 +226,11 @@ public class DFSClient implements java.i
     final boolean getHdfsBlocksMetadataEnabled;
     final int getFileBlockStorageLocationsNumThreads;
     final int getFileBlockStorageLocationsTimeout;
+    final String domainSocketPath;
+    final boolean skipShortCircuitChecksums;
+    final int shortCircuitBufferSize;
+    final boolean shortCircuitLocalReads;
+    final boolean domainSocketDataTraffic;
 
     Conf(Configuration conf) {
       maxFailoverAttempts = conf.getInt(
@@ -288,6 +292,19 @@ public class DFSClient implements java.i
       getFileBlockStorageLocationsTimeout = conf.getInt(
           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
+      domainSocketPath = conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+      skipShortCircuitChecksums = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+      shortCircuitBufferSize = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+      shortCircuitLocalReads = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+      domainSocketDataTraffic = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+        DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
     }
 
     private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -345,7 +362,7 @@ public class DFSClient implements java.i
   private final Map<String, DFSOutputStream> filesBeingWritten
       = new HashMap<String, DFSOutputStream>();
 
-  private boolean shortCircuitLocalReads;
+  private final DomainSocketFactory domainSocketFactory;
   
   /**
    * Same as this(NameNode.getAddress(conf), conf);
@@ -417,12 +434,8 @@ public class DFSClient implements java.i
     }
 
     // read directly from the block file if configured.
-    this.shortCircuitLocalReads = conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Short circuit read is " + shortCircuitLocalReads);
-    }
+    this.domainSocketFactory = new DomainSocketFactory(dfsClientConf);
+
     String localInterfaces[] =
       conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
     localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
@@ -787,28 +800,11 @@ public class DFSClient implements java.i
                                      AccessControlException.class);
     }
   }
-
-  /**
-   * Get {@link BlockReader} for short circuited local reads.
-   */
-  static BlockReader getLocalBlockReader(Configuration conf,
-      String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
-      DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
-      boolean connectToDnViaHostname) throws InvalidToken, IOException {
-    try {
-      return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
-          chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
-              - offsetIntoBlock, connectToDnViaHostname);
-    } catch (RemoteException re) {
-      throw re.unwrapRemoteException(InvalidToken.class,
-          AccessControlException.class);
-    }
-  }
   
   private static Map<String, Boolean> localAddrMap = Collections
       .synchronizedMap(new HashMap<String, Boolean>());
   
-  private static boolean isLocalAddress(InetSocketAddress targetAddr) {
+  static boolean isLocalAddress(InetSocketAddress targetAddr) {
     InetAddress addr = targetAddr.getAddress();
     Boolean cached = localAddrMap.get(addr.getHostAddress());
     if (cached != null) {
@@ -2108,10 +2104,6 @@ public class DFSClient implements java.i
       super(in);
     }
   }
-  
-  boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
-    return shortCircuitLocalReads && isLocalAddress(targetAddr);
-  }
 
   void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
     DatanodeInfo [] dnArr = { dn };
@@ -2135,7 +2127,7 @@ public class DFSClient implements java.i
         + ", ugi=" + ugi + "]"; 
   }
 
-  void disableShortCircuit() {
-    shortCircuitLocalReads = false;
+  public DomainSocketFactory getDomainSocketFactory() {
+    return domainSocketFactory;
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jan 11 23:52:22 2013
@@ -342,7 +342,13 @@ public class DFSConfigKeys extends Commo
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
+  public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY = "dfs.client.read.shortcircuit.streams.cache.size";
+  public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 10;
+  public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY = "dfs.client.read.shortcircuit.streams.cache.expiry.ms";
+  public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 60000;
   public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
+  public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
+  public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -393,6 +399,7 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
   
   public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
+  public static final String DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY = "dfs.datanode.domain.socket.path";
 
   // HA related configuration
   public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Jan 11 23:52:22 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -38,7 +39,7 @@ import org.apache.hadoop.fs.ChecksumExce
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.net.EncryptedPeer;
+import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -46,17 +47,16 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hdfs.FileInputStreamCache;
 
 /****************************************************************
  * DFSInputStream provides bytes from a named file.  It handles 
@@ -80,6 +80,8 @@ public class DFSInputStream extends FSIn
   private long pos = 0;
   private long blockEnd = -1;
 
+  private final FileInputStreamCache fileInputStreamCache;
+
   /**
    * This variable tracks the number of failures since the start of the
    * most recent user-facing operation. That is to say, it should be reset
@@ -115,6 +117,13 @@ public class DFSInputStream extends FSIn
     this.buffersize = buffersize;
     this.src = src;
     this.peerCache = dfsClient.peerCache;
+    this.fileInputStreamCache = new FileInputStreamCache(
+      dfsClient.conf.getInt(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
+      dfsClient.conf.getLong(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT));
     prefetchSize = dfsClient.getConf().prefetchSize;
     timeWindow = dfsClient.getConf().timeWindow;
     nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@@ -247,7 +256,9 @@ public class DFSInputStream extends FSIn
         locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
   }
 
-  private synchronized boolean blockUnderConstruction() {
+  // Short circuit local reads are forbidden for files that are
+  // under construction.  See HDFS-2757.
+  synchronized boolean shortCircuitForbidden() {
     return locatedBlocks.isUnderConstruction();
   }
 
@@ -428,7 +439,7 @@ public class DFSInputStream extends FSIn
 
     // Will be getting a new BlockReader.
     if (blockReader != null) {
-      blockReader.close(peerCache);
+      blockReader.close(peerCache, fileInputStreamCache);
       blockReader = null;
     }
 
@@ -510,10 +521,11 @@ public class DFSInputStream extends FSIn
     dfsClient.checkOpen();
 
     if (blockReader != null) {
-      blockReader.close(peerCache);
+      blockReader.close(peerCache, fileInputStreamCache);
       blockReader = null;
     }
     super.close();
+    fileInputStreamCache.close();
     closed = true;
   }
 
@@ -809,10 +821,6 @@ public class DFSInputStream extends FSIn
                  e.getPos() + " from " + chosenNode);
         // we want to remember what we have tried
         addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
-      } catch (AccessControlException ex) {
-        DFSClient.LOG.warn("Short circuit access failed ", ex);
-        dfsClient.disableShortCircuit();
-        continue;
       } catch (IOException e) {
         if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
@@ -837,7 +845,7 @@ public class DFSInputStream extends FSIn
         }
       } finally {
         if (reader != null) {
-          reader.close(peerCache);
+          reader.close(peerCache, fileInputStreamCache);
         }
       }
       // Put chosen node into dead list, continue
@@ -849,19 +857,29 @@ public class DFSInputStream extends FSIn
     Peer peer = null;
     boolean success = false;
     Socket sock = null;
+    DomainSocket domSock = null;
+
     try {
-      sock = dfsClient.socketFactory.createSocket();
-      NetUtils.connect(sock, addr,
-        dfsClient.getRandomLocalInterfaceAddr(),
-        dfsClient.getConf().socketTimeout);
-      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
-          dfsClient.getDataEncryptionKey());
+      domSock = dfsClient.getDomainSocketFactory().create(addr, this);
+      if (domSock != null) {
+        // Create a UNIX Domain peer.
+        peer = new DomainPeer(domSock);
+      } else {
+        // Create a conventional TCP-based Peer.
+        sock = dfsClient.socketFactory.createSocket();
+        NetUtils.connect(sock, addr,
+          dfsClient.getRandomLocalInterfaceAddr(),
+          dfsClient.getConf().socketTimeout);
+        peer = TcpPeerServer.peerFromSocketAndKey(sock, 
+            dfsClient.getDataEncryptionKey());
+      }
       success = true;
       return peer;
     } finally {
       if (!success) {
         IOUtils.closeQuietly(peer);
         IOUtils.closeQuietly(sock);
+        IOUtils.closeQuietly(domSock);
       }
     }
   }
@@ -895,49 +913,77 @@ public class DFSInputStream extends FSIn
                                        String clientName)
       throws IOException {
     
-    // Can't local read a block under construction, see HDFS-2757
-    if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
-        !blockUnderConstruction()) {
-      return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
-          blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
-          dfsClient.connectToDnViaHostname());
-    }
-    
     IOException err = null;
-    boolean fromCache = true;
 
-    // Allow retry since there is no way of knowing whether the cached socket
-    // is good until we actually use it.
-    for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
+    // Firstly, we check to see if we have cached any file descriptors for
+    // local blocks.  If so, we can just re-use those file descriptors.
+    FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
+    if (fis != null) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
+            "the FileInputStreamCache.");
+      }
+      return new BlockReaderLocal(dfsClient.conf, file,
+        block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum);
+    }
+
+    // We retry several times here.
+    // On the first nCachedConnRetry times, we try to fetch a socket from
+    // the socketCache and use it.  This may fail, since the old socket may
+    // have been closed by the peer.
+    // After that, we try to create a new socket using newPeer().
+    // This may create either a TCP socket or a UNIX domain socket, depending
+    // on the configuration and whether the peer is remote.
+    // If we try to create a UNIX domain socket and fail, we will not try that 
+    // again.  Instead, we'll try to create a TCP socket.  Only after we've 
+    // failed to create a TCP-based BlockReader will we throw an IOException
+    // from this function.  Throwing an IOException from here is basically
+    // equivalent to declaring the DataNode bad.
+    boolean triedNonDomainSocketReader = false;
+    for (int retries = 0;
+          retries < nCachedConnRetry && (!triedNonDomainSocketReader);
+          ++retries) {
       Peer peer = null;
-      // Don't use the cache on the last attempt - it's possible that there
-      // are arbitrarily many unusable sockets in the cache, but we don't
-      // want to fail the read.
       if (retries < nCachedConnRetry) {
         peer = peerCache.get(chosenNode);
       }
       if (peer == null) {
         peer = newPeer(dnAddr);
-        fromCache = false;
+        if (peer.getDomainSocket() == null) {
+          triedNonDomainSocketReader = true;
+        }
       }
-
+      boolean success = false;
       try {
-        // The OP_READ_BLOCK request is sent as we make the BlockReader
-        BlockReader reader =
-            BlockReaderFactory.newBlockReader(dfsClient.conf,
-                                       file, block,
-                                       blockToken,
-                                       startOffset, len,
-                                       verifyChecksum,
-                                       clientName,
-                                       peer,
-                                       chosenNode);
-        return reader;
-      } catch (IOException ex) {
-        // Our socket is no good.
-        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex);
-        IOUtils.closeQuietly(peer);
+        boolean allowShortCircuitLocalReads =
+          (peer.getDomainSocket() != null) &&
+          dfsClient.getConf().shortCircuitLocalReads && 
+          (!shortCircuitForbidden());
+        // Here we will try to send either an OP_READ_BLOCK request or an 
+        // OP_REQUEST_SHORT_CIRCUIT_FDS, depending on what kind of block reader 
+        // we're trying to create.
+        BlockReader blockReader = BlockReaderFactory.newBlockReader(
+            dfsClient.conf, file, block, blockToken, startOffset,
+            len, verifyChecksum, clientName, peer, chosenNode, 
+            dfsClient.getDomainSocketFactory(), allowShortCircuitLocalReads);
+        success = true;
+        return blockReader;
+       } catch (IOException ex) {
+         // Our socket is no good.
+        DFSClient.LOG.debug("Error making BlockReader. " +
+            "Closing stale " + peer, ex);
+        if (peer.getDomainSocket() != null) {
+          // If the Peer that we got the error from was a DomainPeer,
+          // mark the socket path as bad, so that newDataSocket will not try 
+          // to re-open this socket for a while.
+          dfsClient.getDomainSocketFactory().
+              disableDomainSocketPath(peer.getDomainSocket().getPath());
+        }
         err = ex;
+      } finally {
+        if (!success) {
+          IOUtils.closeQuietly(peer);
+        }
       }
     }
 
@@ -1075,7 +1121,7 @@ public class DFSInputStream extends FSIn
       // the TCP buffer, then just eat up the intervening data.
       //
       int diff = (int)(targetPos - pos);
-      if (diff <= DFSClient.TCP_WINDOW_SIZE) {
+      if (diff <= blockReader.available()) {
         try {
           pos += blockReader.skip(diff);
           if (pos == targetPos) {

Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+class DomainSocketFactory {
+  public static final Log LOG = LogFactory.getLog(DomainSocketFactory.class);
+  private final Conf conf;
+
+  enum PathStatus {
+    UNUSABLE,
+    SHORT_CIRCUIT_DISABLED,
+  }
+
+  /**
+   * Information about domain socket paths.
+   */
+  Cache<String, PathStatus> pathInfo =
+      CacheBuilder.newBuilder()
+      .expireAfterWrite(10, TimeUnit.MINUTES)
+      .build();
+
+  public DomainSocketFactory(Conf conf) {
+    this.conf = conf;
+
+    String feature = null;
+    if (conf.shortCircuitLocalReads) {
+      feature = "The short-circuit local reads feature";
+    } else if (conf.domainSocketDataTraffic) {
+      feature = "UNIX domain socket data traffic";
+    }
+    if (feature != null) {
+      if (conf.domainSocketPath == null) {
+        LOG.warn(feature + " is disabled because you have not set " +
+            DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+      } else if (DomainSocket.getLoadingFailureReason() != null) {
+        LOG.error(feature + " is disabled because " +
+              DomainSocket.getLoadingFailureReason());
+      } else {
+        LOG.debug(feature + "is enabled.");
+      }
+    }
+  }
+
+  /**
+   * Create a DomainSocket.
+   * 
+   * @param addr        The address of the DataNode
+   * @param stream      The DFSInputStream the socket will be created for.
+   *
+   * @return            null if the socket could not be created; the
+   *                    socket otherwise.  If there was an error while
+   *                    creating the socket, we will add the socket path
+   *                    to our list of failed domain socket paths.
+   */
+  DomainSocket create(InetSocketAddress addr, DFSInputStream stream) {
+    // If there is no domain socket path configured, we can't use domain
+    // sockets.
+    if (conf.domainSocketPath == null) return null;
+    // UNIX domain sockets can only be used to talk to local peers
+    if (!DFSClient.isLocalAddress(addr)) return null;
+    // If the DomainSocket code is not loaded, we can't create
+    // DomainSocket objects.
+    if (DomainSocket.getLoadingFailureReason() != null) return null;
+    String escapedPath = DomainSocket.
+        getEffectivePath(conf.domainSocketPath, addr.getPort());
+    PathStatus info = pathInfo.getIfPresent(escapedPath);
+    if (info == PathStatus.UNUSABLE) {
+      // We tried to connect to this domain socket before, and it was totally
+      // unusable.
+      return null;
+    }
+    if ((!conf.domainSocketDataTraffic) &&
+        ((info == PathStatus.SHORT_CIRCUIT_DISABLED) || 
+            stream.shortCircuitForbidden())) {
+      // If we don't want to pass data over domain sockets, and we don't want
+      // to pass file descriptors over them either, we have no use for domain
+      // sockets.
+      return null;
+    }
+    boolean success = false;
+    DomainSocket sock = null;
+    try {
+      sock = DomainSocket.connect(escapedPath);
+      sock.setAttribute(DomainSocket.RCV_TIMEO, conf.socketTimeout);
+      success = true;
+    } catch (IOException e) {
+      LOG.error("error creating DomainSocket", e);
+      // fall through
+    } finally {
+      if (!success) {
+        if (sock != null) {
+          IOUtils.closeQuietly(sock);
+        }
+        pathInfo.put(escapedPath, PathStatus.UNUSABLE);
+        sock = null;
+      }
+    }
+    return sock;
+  }
+
+  public void disableShortCircuitForPath(String path) {
+    pathInfo.put(path, PathStatus.SHORT_CIRCUIT_DISABLED);
+  }
+
+  public void disableDomainSocketPath(String path) {
+    pathInfo.put(path, PathStatus.UNUSABLE);
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileInputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * FileInputStream cache is used to cache FileInputStream objects that we
+ * have received from the DataNode.
+ */
+class FileInputStreamCache {
+  private final static Log LOG = LogFactory.getLog(FileInputStreamCache.class);
+
+  /**
+   * The executor service that runs the cacheCleaner.  There is only one of
+   * these per VM.
+   */
+  private final static ScheduledThreadPoolExecutor executor
+      = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+          setDaemon(true).setNameFormat("FileInputStreamCache Cleaner").
+          build());
+  
+  /**
+   * The CacheCleaner for this FileInputStreamCache.  We don't create this
+   * and schedule it until it becomes necessary.
+   */
+  private CacheCleaner cacheCleaner;
+  
+  /**
+   * Maximum number of entries to allow in the cache.
+   */
+  private final int maxCacheSize;
+  
+  /**
+   * The minimum time in milliseconds to preserve an element in the cache.
+   */
+  private final long expiryTimeMs;
+  
+  /**
+   * True if the FileInputStreamCache is closed.
+   */
+  private boolean closed = false;
+  
+  /**
+   * Cache entries.
+   */
+  private final LinkedListMultimap<Key, Value> map = LinkedListMultimap.create();
+
+  /**
+   * Expiry thread which makes sure that the file descriptors get closed
+   * after a while.
+   */
+  class CacheCleaner implements Runnable {
+    @Override
+    public void run() {
+      synchronized(FileInputStreamCache.this) {
+        if (closed) return;
+        long curTime = Time.monotonicNow();
+        for (Iterator<Entry<Key, Value>> iter = map.entries().iterator();
+              iter.hasNext();
+              iter = map.entries().iterator()) {
+          Entry<Key, Value> entry = iter.next();
+          if (entry.getValue().getTime() + expiryTimeMs >= curTime) {
+            break;
+          }
+          entry.getValue().close();
+          iter.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * The key identifying a FileInputStream array.
+   */
+  static class Key {
+    private final DatanodeID datanodeID;
+    private final ExtendedBlock block;
+    
+    public Key(DatanodeID datanodeID, ExtendedBlock block) {
+      this.datanodeID = datanodeID;
+      this.block = block;
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof FileInputStreamCache.Key)) {
+        return false;
+      }
+      FileInputStreamCache.Key otherKey = (FileInputStreamCache.Key)other;
+      return (block.equals(otherKey.block) & 
+          (block.getGenerationStamp() == otherKey.block.getGenerationStamp()) &
+          datanodeID.equals(otherKey.datanodeID));
+    }
+
+    @Override
+    public int hashCode() {
+      return block.hashCode();
+    }
+  }
+
+  /**
+   * The value containing a FileInputStream array and the time it was added to
+   * the cache.
+   */
+  static class Value {
+    private final FileInputStream fis[];
+    private final long time;
+    
+    public Value (FileInputStream fis[]) {
+      this.fis = fis;
+      this.time = Time.monotonicNow();
+    }
+
+    public FileInputStream[] getFileInputStreams() {
+      return fis;
+    }
+
+    public long getTime() {
+      return time;
+    }
+    
+    public void close() {
+      IOUtils.cleanup(LOG, fis);
+    }
+  }
+  
+  /**
+   * Create a new FileInputStream
+   *
+   * @param maxCacheSize         The maximum number of elements to allow in 
+   *                             the cache.
+   * @param expiryTimeMs         The minimum time in milliseconds to preserve
+   *                             elements in the cache.
+   */
+  public FileInputStreamCache(int maxCacheSize, long expiryTimeMs) {
+    this.maxCacheSize = maxCacheSize;
+    this.expiryTimeMs = expiryTimeMs;
+  }
+  
+  /**
+   * Put an array of FileInputStream objects into the cache.
+   *
+   * @param datanodeID          The DatanodeID to store the streams under.
+   * @param block               The Block to store the streams under.
+   * @param fis                 The streams.
+   */
+  public void put(DatanodeID datanodeID, ExtendedBlock block,
+      FileInputStream fis[]) {
+    boolean inserted = false;
+    try {
+      synchronized(this) {
+        if (closed) return;
+        if (map.size() + 1 > maxCacheSize) {
+          Iterator<Entry<Key, Value>> iter = map.entries().iterator();
+          if (!iter.hasNext()) return;
+          Entry<Key, Value> entry = iter.next();
+          entry.getValue().close();
+          iter.remove();
+        }
+        if (cacheCleaner == null) {
+          cacheCleaner = new CacheCleaner();
+          executor.scheduleAtFixedRate(cacheCleaner, expiryTimeMs, expiryTimeMs, 
+              TimeUnit.MILLISECONDS);
+        }
+        map.put(new Key(datanodeID, block), new Value(fis));
+        inserted = true;
+      }
+    } finally {
+      if (!inserted) {
+        IOUtils.cleanup(LOG, fis);
+      }
+    }
+  }
+  
+  /**
+   * Find and remove an array of FileInputStream objects from the cache.
+   *
+   * @param datanodeID          The DatanodeID to search for.
+   * @param block               The Block to search for.
+   *
+   * @return                    null if no streams can be found; the
+   *                            array otherwise.  If this is non-null, the
+   *                            array will have been removed from the cache.
+   */
+  public synchronized FileInputStream[] get(DatanodeID datanodeID,
+      ExtendedBlock block) {
+    Key key = new Key(datanodeID, block);
+    List<Value> ret = map.get(key);
+    if (ret.isEmpty()) return null;
+    Value val = ret.get(0);
+    map.remove(key, val);
+    return val.getFileInputStreams();
+  }
+  
+  /**
+   * Close the cache and free all associated resources.
+   */
+  public synchronized void close() {
+    if (closed) return;
+    closed = true;
+    if (cacheCleaner != null) {
+      executor.remove(cacheCleaner);
+    }
+    for (Iterator<Entry<Key, Value>> iter = map.entries().iterator();
+          iter.hasNext();
+          iter = map.entries().iterator()) {
+      Entry<Key, Value> entry = iter.next();
+      entry.getValue().close();
+      iter.remove();
+    }
+  }
+  
+  public synchronized String toString() {
+    StringBuilder bld = new StringBuilder();
+    bld.append("FileInputStreamCache(");
+    String prefix = "";
+    for (Entry<Key, Value> entry : map.entries()) {
+      bld.append(prefix);
+      bld.append(entry.getKey());
+      prefix = ", ";
+    }
+    bld.append(")");
+    return bld.toString();
+  }
+  
+  public long getExpiryTimeMs() {
+    return expiryTimeMs;
+  }
+  
+  public int getMaxCacheSize() {
+    return maxCacheSize;
+  }
+}

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Fri Jan 11 23:52:22 2013
@@ -413,7 +413,8 @@ public class RemoteBlockReader extends F
   }
 
   @Override
-  public synchronized void close(PeerCache peerCache) throws IOException {
+  public synchronized void close(PeerCache peerCache,
+      FileInputStreamCache fisCache) throws IOException {
     startOffset = -1;
     checksum = null;
     if (peerCache != null & sentStatusCode) {
@@ -470,4 +471,11 @@ public class RemoteBlockReader extends F
   public int read(ByteBuffer buf) throws IOException {
     throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
   }
+  
+  @Override
+  public int available() throws IOException {
+    // An optimistic estimate of how much data is available
+    // to us without doing network I/O.
+    return DFSClient.TCP_WINDOW_SIZE;
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Fri Jan 11 23:52:22 2013
@@ -275,7 +275,8 @@ public class RemoteBlockReader2  impleme
 
 
   @Override
-  public synchronized void close(PeerCache peerCache) throws IOException {
+  public synchronized void close(PeerCache peerCache,
+      FileInputStreamCache fisCache) throws IOException {
     packetReceiver.close();
     startOffset = -1;
     checksum = null;
@@ -422,4 +423,11 @@ public class RemoteBlockReader2  impleme
       }
     }
   }
+  
+  @Override
+  public int available() throws IOException {
+    // An optimistic estimate of how much data is available
+    // to us without doing network I/O.
+    return DFSClient.TCP_WINDOW_SIZE;
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java Fri Jan 11 23:52:22 2013
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdfs.net;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 
@@ -27,8 +26,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.hdfs.net.PeerServer;
 import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.classification.InterfaceAudience;
 
-class DomainPeerServer implements PeerServer {
+@InterfaceAudience.Private
+public class DomainPeerServer implements PeerServer {
   static Log LOG = LogFactory.getLog(DomainPeerServer.class);
   private final DomainSocket sock;
 

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Fri Jan 11 23:52:22 2013
@@ -105,6 +105,18 @@ public interface DataTransferProtocol {
       final DatanodeInfo[] targets) throws IOException;
 
   /**
+   * Request short circuit access file descriptors from a DataNode.
+   *
+   * @param blk             The block to get file descriptors for.
+   * @param blockToken      Security token for accessing the block.
+   * @param maxVersion      Maximum version of the block data the client 
+   *                        can understand.
+   */
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      int maxVersion) throws IOException;
+
+  /**
    * Receive a block from a source datanode
    * and then notifies the namenode
    * to remove the copy from the original datanode.

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java Fri Jan 11 23:52:22 2013
@@ -34,7 +34,8 @@ public enum Op {
   REPLACE_BLOCK((byte)83),
   COPY_BLOCK((byte)84),
   BLOCK_CHECKSUM((byte)85),
-  TRANSFER_BLOCK((byte)86);
+  TRANSFER_BLOCK((byte)86),
+  REQUEST_SHORT_CIRCUIT_FDS((byte)87);
 
   /** The code for this operation. */
   public final byte code;

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Fri Jan 11 23:52:22 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 
 /** Receiver */
@@ -77,6 +78,9 @@ public abstract class Receiver implement
     case TRANSFER_BLOCK:
       opTransferBlock(in);
       break;
+    case REQUEST_SHORT_CIRCUIT_FDS:
+      opRequestShortCircuitFds(in);
+      break;
     default:
       throw new IOException("Unknown op " + op + " in data stream");
     }
@@ -117,6 +121,15 @@ public abstract class Receiver implement
         fromProtos(proto.getTargetsList()));
   }
 
+  /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
+  private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
+    final OpRequestShortCircuitAccessProto proto =
+      OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
+    requestShortCircuitFds(fromProto(proto.getHeader().getBlock()),
+        fromProto(proto.getHeader().getToken()),
+        proto.getMaxVersion());
+  }
+
   /** Receive OP_REPLACE_BLOCK */
   private void opReplaceBlock(DataInputStream in) throws IOException {
     OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Fri Jan 11 23:52:22 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
@@ -136,6 +137,17 @@ public class Sender implements DataTrans
   }
 
   @Override
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      int maxVersion) throws IOException {
+    OpRequestShortCircuitAccessProto proto =
+        OpRequestShortCircuitAccessProto.newBuilder()
+          .setHeader(DataTransferProtoUtil.buildBaseHeader(
+            blk, blockToken)).setMaxVersion(maxVersion).build();
+    send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
+  }
+  
+  @Override
   public void replaceBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Jan 11 23:52:22 2013
@@ -213,7 +213,7 @@ public class JspHelper {
         offsetIntoBlock, amtToRead,  true,
         "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
         new DatanodeID(addr.getAddress().toString(),              
-            addr.getHostName(), poolId, addr.getPort(), 0, 0));
+            addr.getHostName(), poolId, addr.getPort(), 0, 0), null, false);
         
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
@@ -232,8 +232,7 @@ public class JspHelper {
       amtToRead -= numRead;
       readOffset += numRead;
     }
-    blockReader = null;
-    s.close();
+    blockReader.close(null, null);
     out.print(HtmlQuoting.quoteHtmlChars(new String(buf)));
   }