You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/11/22 03:57:06 UTC

svn commit: r1204792 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ src/main/java/org/apache...

Author: szetszwo
Date: Tue Nov 22 02:57:04 2011
New Revision: 1204792

URL: http://svn.apache.org/viewvc?rev=1204792&view=rev
Log:
HDFS-2246. Enable reading a block directly from local file system for a client on the same node as the block file.  Contributed by Andrew Purtell, Suresh Srinivas and Jitendra Nath Pandey

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Nov 22 02:57:04 2011
@@ -161,6 +161,10 @@ Release 0.23.1 - UNRELEASED
     HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker.
     (todd)
 
+    HDFS-2246. Enable reading a block directly from local file system
+    for a client on the same node as the block file.  (Andrew Purtell,
+    Suresh Srinivas and Jitendra Nath Pandey via szetszwo)
+
   BUG FIXES
 
     HDFS-2541. For a sufficiently large value of blocks, the DN Scanner 

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1204792&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Tue Nov 22 02:57:04 2011
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/**
+ * BlockReaderLocal enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ * {@link BlockReaderLocal} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the path to the file where block is stored using
+ * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
+ * RPC call</li>
+ * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+ * if security is enabled.</li>
+ * </ul>
+ */
+class BlockReaderLocal extends RemoteBlockReader2 {
+  public static final Log LOG = LogFactory.getLog(DFSClient.class);
+
+  //Stores the cache and proxy for a local datanode.
+  private static class LocalDatanodeInfo {
+    private ClientDatanodeProtocol proxy = null;
+    private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
+
+    LocalDatanodeInfo() {
+      final int cacheSize = 10000;
+      final float hashTableLoadFactor = 0.75f;
+      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
+      cache = Collections
+          .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
+              hashTableCapacity, hashTableLoadFactor, true) {
+            private static final long serialVersionUID = 1;
+
+            @Override
+            protected boolean removeEldestEntry(
+                Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
+              return size() > cacheSize;
+            }
+          });
+    }
+
+    private synchronized ClientDatanodeProtocol getDatanodeProxy(
+        DatanodeInfo node, Configuration conf, int socketTimeout)
+        throws IOException {
+      if (proxy == null) {
+        proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
+            socketTimeout);
+      }
+      return proxy;
+    }
+    
+    private synchronized void resetDatanodeProxy() {
+      if (null != proxy) {
+        RPC.stopProxy(proxy);
+        proxy = null;
+      }
+    }
+
+    private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
+      return cache.get(b);
+    }
+
+    private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
+      cache.put(b, info);
+    }
+
+    private void removeBlockLocalPathInfo(ExtendedBlock b) {
+      cache.remove(b);
+    }
+  }
+  
+  // Multiple datanodes could be running on the local machine. Store proxies in
+  // a map keyed by the ipc port of the datanode.
+  private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+
+  private final FileInputStream dataIn; // reader for the data file
+
+  private FileInputStream checksumIn;   // reader for the checksum file
+
+  private int offsetFromChunkBoundary;
+  
+  ByteBuffer dataBuff = null;
+  ByteBuffer checksumBuff = null;
+  
+  /**
+   * The only way this object can be instantiated.
+   */
+  static BlockReaderLocal newBlockReader(Configuration conf, String file,
+      ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
+      int socketTimeout, long startOffset, long length) throws IOException {
+
+    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
+        .getIpcPort());
+    // check the cache first
+    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+    if (pathinfo == null) {
+      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+    }
+
+    // check to see if the file exists. It may so happen that the
+    // HDFS file has been deleted and this block-lookup is occurring
+    // on behalf of a new HDFS file. This time, the block file could
+    // be residing in a different portion of the fs.data.dir directory.
+    // In this case, we remove this entry from the cache. The next
+    // call to this method will re-populate the cache.
+    FileInputStream dataIn = null;
+    FileInputStream checksumIn = null;
+    BlockReaderLocal localBlockReader = null;
+    boolean skipChecksumCheck = skipChecksumCheck(conf);
+    try {
+      // get a local file system
+      File blkfile = new File(pathinfo.getBlockPath());
+      dataIn = new FileInputStream(blkfile);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+            + blkfile.length() + " startOffset " + startOffset + " length "
+            + length + " short circuit checksum " + skipChecksumCheck);
+      }
+
+      if (!skipChecksumCheck) {
+        // get the metadata file
+        File metafile = new File(pathinfo.getMetaPath());
+        checksumIn = new FileInputStream(metafile);
+
+        // read and handle the common header here. For now just a version
+        BlockMetadataHeader header = BlockMetadataHeader
+            .readHeader(new DataInputStream(checksumIn));
+        short version = header.getVersion();
+        if (version != FSDataset.METADATA_VERSION) {
+          LOG.warn("Wrong version (" + version + ") for metadata file for "
+              + blk + " ignoring ...");
+        }
+        DataChecksum checksum = header.getChecksum();
+        long firstChunkOffset = startOffset
+            - (startOffset % checksum.getBytesPerChecksum());
+        localBlockReader = new BlockReaderLocal(conf, file, blk, token,
+            startOffset, length, pathinfo, checksum, true, dataIn,
+            firstChunkOffset, checksumIn);
+      } else {
+        localBlockReader = new BlockReaderLocal(conf, file, blk, token,
+            startOffset, length, pathinfo, dataIn);
+      }
+    } catch (IOException e) {
+      // remove from cache
+      localDatanodeInfo.removeBlockLocalPathInfo(blk);
+      DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk
+          + " from cache because local file " + pathinfo.getBlockPath()
+          + " could not be opened.");
+      throw e;
+    } finally {
+      if (localBlockReader == null) {
+        if (dataIn != null) {
+          dataIn.close();
+        }
+        if (checksumIn != null) {
+          checksumIn.close();
+        }
+      }
+    }
+    return localBlockReader;
+  }
+  
+  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+    if (ldInfo == null) {
+      ldInfo = new LocalDatanodeInfo();
+      localDatanodeInfoMap.put(port, ldInfo);
+    }
+    return ldInfo;
+  }
+  
+  private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
+      DatanodeInfo node, Configuration conf, int timeout,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
+    BlockLocalPathInfo pathinfo = null;
+    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
+        conf, timeout);
+    try {
+      // make RPC to local datanode to find local pathnames of blocks
+      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+      if (pathinfo != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
+        }
+        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+      }
+    } catch (IOException e) {
+      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+      throw e;
+    }
+    return pathinfo;
+  }
+  
+  private static boolean skipChecksumCheck(Configuration conf) {
+    return conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+  }
+  
+  private BlockReaderLocal(Configuration conf, String hdfsfile,
+      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
+      long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
+      throws IOException {
+    this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
+        DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, 4), false,
+        dataIn, startOffset, null);
+  }
+
+  private BlockReaderLocal(Configuration conf, String hdfsfile,
+      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
+      long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
+      boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
+      FileInputStream checksumIn) throws IOException {
+    super(hdfsfile, block.getBlockPoolId(), block.getBlockId(), dataIn
+        .getChannel(), checksum, verifyChecksum, startOffset, firstChunkOffset,
+        length, null);
+    this.dataIn = dataIn;
+    this.checksumIn = checksumIn;
+    this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
+    dataBuff = bufferPool.getBuffer(bytesPerChecksum*64);
+    checksumBuff = bufferPool.getBuffer(checksumSize*64);
+    //Initially the buffers have nothing to read.
+    dataBuff.flip();
+    checksumBuff.flip();
+    long toSkip = firstChunkOffset;
+    while (toSkip > 0) {
+      long skipped = dataIn.skip(toSkip);
+      if (skipped == 0) {
+        throw new IOException("Couldn't initialize input stream");
+      }
+      toSkip -= skipped;
+    }
+    if (checksumIn != null) {
+      long checkSumOffset = (firstChunkOffset / bytesPerChecksum)
+          * checksumSize;
+      while (checkSumOffset > 0) {
+        long skipped = checksumIn.skip(checkSumOffset);
+        if (skipped == 0) {
+          throw new IOException("Couldn't initialize checksum input stream");
+        }
+        checkSumOffset -= skipped;
+      }
+    }
+  }
+
+  private int readIntoBuffer(FileInputStream stream, ByteBuffer buf)
+      throws IOException {
+    int bytesRead = stream.getChannel().read(buf);
+    if (bytesRead < 0) {
+      //EOF
+      return bytesRead;
+    }
+    while (buf.remaining() > 0) {
+      int n = stream.getChannel().read(buf);
+      if (n < 0) {
+        //EOF
+        return bytesRead;
+      }
+      bytesRead += n;
+    }
+    return bytesRead;
+  }
+  
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.info("read off " + off + " len " + len);
+    }
+    if (!verifyChecksum) {
+      return dataIn.read(buf, off, len);
+    } else {
+      int dataRead = -1;
+      if (dataBuff.remaining() == 0) {
+        dataBuff.clear();
+        checksumBuff.clear();
+        dataRead = readIntoBuffer(dataIn, dataBuff);
+        readIntoBuffer(checksumIn, checksumBuff);
+        checksumBuff.flip();
+        dataBuff.flip();
+        if (verifyChecksum) {
+          checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
+              this.startOffset);
+        }
+      } else {
+        dataRead = dataBuff.remaining();
+      }
+      if (dataRead > 0) {
+        int nRead = Math.min(dataRead - offsetFromChunkBoundary, len);
+        if (offsetFromChunkBoundary > 0) {
+          dataBuff.position(offsetFromChunkBoundary);
+          // Its either end of file or dataRead is greater than the
+          // offsetFromChunkBoundary
+          offsetFromChunkBoundary = 0;
+        }
+        if (nRead > 0) {
+          dataBuff.get(buf, off, nRead);
+          return nRead;
+        } else {
+          return 0;
+        }
+      } else {
+        return -1;
+      }
+    }
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("skip " + n);
+    }
+    if (!verifyChecksum) {
+      return dataIn.skip(n);
+    } else {
+     return super.skip(n);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    dataIn.close();
+    if (checksumIn != null) {
+      checksumIn.close();
+    }
+    if (dataBuff != null) {
+      bufferPool.returnBuffer(dataBuff);
+      dataBuff = null;
+    }
+    if (checksumBuff != null) {
+      bufferPool.returnBuffer(checksumBuff);
+      checksumBuff = null;
+    }
+    super.close();
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Nov 22 02:57:04 2011
@@ -25,12 +25,18 @@ import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
 import java.net.Socket;
+import java.net.SocketException;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.net.SocketFactory;
 
@@ -78,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -100,6 +107,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -230,7 +238,8 @@ public class DFSClient implements java.i
    */
   private final Map<String, DFSOutputStream> filesBeingWritten
       = new HashMap<String, DFSOutputStream>();
-        
+  private boolean shortCircuitLocalReads;
+  
   /**
    * Same as this(NameNode.getAddress(conf), conf);
    * @see #DFSClient(InetSocketAddress, Configuration)
@@ -293,6 +302,13 @@ public class DFSClient implements java.i
           "Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
           + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
     }
+    // read directly from the block file if configured.
+    this.shortCircuitLocalReads = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Short circuit read is " + shortCircuitLocalReads);
+    }
   }
 
   /**
@@ -500,6 +516,82 @@ public class DFSClient implements java.i
   }
 
   /**
+   * Get {@link BlockReader} for short circuited local reads.
+   */
+  static BlockReader getLocalBlockReader(Configuration conf,
+      String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
+      DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
+      throws InvalidToken, IOException {
+    try {
+      return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
+          chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
+              - offsetIntoBlock);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(InvalidToken.class,
+          AccessControlException.class);
+    }
+  }
+  
+  private static Set<String> localIpAddresses = Collections
+      .synchronizedSet(new HashSet<String>());
+  
+  private static boolean isLocalAddress(InetSocketAddress targetAddr) {
+    InetAddress addr = targetAddr.getAddress();
+    if (localIpAddresses.contains(addr.getHostAddress())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Address " + targetAddr + " is local");
+      }
+      return true;
+    }
+
+    // Check if the address is any local or loop back
+    boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+    // Check if the address is defined on any interface
+    if (!local) {
+      try {
+        local = NetworkInterface.getByInetAddress(addr) != null;
+      } catch (SocketException e) {
+        local = false;
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Address " + targetAddr + " is local");
+    }
+    if (local == true) {
+      localIpAddresses.add(addr.getHostAddress());
+    }
+    return local;
+  }
+  
+  /**
+   * Should the block access token be refetched on an exception
+   * 
+   * @param ex Exception received
+   * @param targetAddr Target datanode address from where exception was received
+   * @return true if block access token has expired or invalid and it should be
+   *         refetched
+   */
+  private static boolean tokenRefetchNeeded(IOException ex,
+      InetSocketAddress targetAddr) {
+    /*
+     * Get a new access token and retry. Retry is needed in 2 cases. 1) When
+     * both NN and DN re-started while DFSClient holding a cached access token.
+     * 2) In the case that NN fails to update its access key at pre-set interval
+     * (by a wide margin) and subsequently restarts. In this case, DN
+     * re-registers itself with NN and receives a new access key, but DN will
+     * delete the old access key from its memory since it's considered expired
+     * based on the estimated expiration date.
+     */
+    if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
+      LOG.info("Access token was invalid when connecting to " + targetAddr
+          + " : " + ex);
+      return true;
+    }
+    return false;
+  }
+  
+  /**
    * Cancel a delegation token
    * @param token the token to cancel
    * @throws InvalidToken
@@ -1590,7 +1682,7 @@ public class DFSClient implements java.i
     synchronized List<LocatedBlock> getAllBlocks() throws IOException {
       return ((DFSInputStream)in).getAllBlocks();
     }
-
+    
     /**
      * @return The visible length of the file.
      */
@@ -1598,6 +1690,14 @@ public class DFSClient implements java.i
       return ((DFSInputStream)in).getFileLength();
     }
   }
+  
+  boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
+      throws IOException {
+    if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
+      return true;
+    }
+    return false;
+  }
 
   void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
     DatanodeInfo [] dnArr = { dn };
@@ -1620,4 +1720,8 @@ public class DFSClient implements java.i
     return getClass().getSimpleName() + "[clientName=" + clientName
         + ", ugi=" + ugi + "]"; 
   }
+
+  void disableShortCircuit() {
+    shortCircuitLocalReads = false;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Nov 22 02:57:04 2011
@@ -261,6 +261,10 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max";
   public static final int     DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED = 500;
 
+  public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
+  public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
+  public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
+  public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -301,4 +305,5 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_CHECKED_VOLUMES_KEY = "dfs.namenode.resource.checked.volumes";
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
+  public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Nov 22 02:57:04 2011
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 
 /****************************************************************
@@ -405,11 +406,8 @@ public class DFSInputStream extends FSIn
       try {
         ExtendedBlock blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
-        
-        blockReader = getBlockReader(
-            targetAddr, src, blk,
-            accessToken,
-            offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
+        blockReader = getBlockReader(targetAddr, chosenNode, src, blk,
+            accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
             buffersize, verifyChecksum, dfsClient.clientName);
         if(connectFailedOnce) {
           DFSClient.LOG.info("Successfully connected to " + targetAddr +
@@ -666,12 +664,9 @@ public class DFSInputStream extends FSIn
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
             
         int len = (int) (end - start + 1);
-
-        reader = getBlockReader(targetAddr, src,
-                                block.getBlock(),
-                                blockToken,
-                                start, len, buffersize,
-                                verifyChecksum, dfsClient.clientName);
+        reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
+            blockToken, start, len, buffersize, verifyChecksum,
+            dfsClient.clientName);
         int nread = reader.readAll(buf, offset, len);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
@@ -684,6 +679,10 @@ public class DFSInputStream extends FSIn
                  e.getPos() + " from " + chosenNode.getName());
         // we want to remember what we have tried
         addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
+      } catch (AccessControlException ex) {
+        DFSClient.LOG.warn("Short circuit access failed ", ex);
+        dfsClient.disableShortCircuit();
+        continue;
       } catch (IOException e) {
         if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
           DFSClient.LOG.info("Will get a new access token and retry, "
@@ -726,6 +725,7 @@ public class DFSInputStream extends FSIn
    * Otherwise, it will create a new connection.
    *
    * @param dnAddr  Address of the datanode
+   * @param chosenNode Chosen datanode information
    * @param file  File location
    * @param block  The Block object
    * @param blockToken  The access token for security
@@ -737,6 +737,7 @@ public class DFSInputStream extends FSIn
    * @return New BlockReader instance
    */
   protected BlockReader getBlockReader(InetSocketAddress dnAddr,
+                                       DatanodeInfo chosenNode,
                                        String file,
                                        ExtendedBlock block,
                                        Token<BlockTokenIdentifier> blockToken,
@@ -746,6 +747,12 @@ public class DFSInputStream extends FSIn
                                        boolean verifyChecksum,
                                        String clientName)
       throws IOException {
+    
+    if (dfsClient.shouldTryShortCircuitRead(dnAddr)) {
+      return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
+          blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset);
+    }
+    
     IOException err = null;
     boolean fromCache = true;
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Tue Nov 22 02:57:04 2011
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -638,6 +639,14 @@ public class DFSUtil {
              locatedBlock);
   }
   
+  /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
+  static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf, int socketTimeout)
+      throws IOException {
+    return new org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeProtocolTranslatorR23(
+        datanodeid, conf, socketTimeout);
+  }
+  
   /** Create a {@link ClientDatanodeProtocol} proxy */
   public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Nov 22 02:57:04 2011
@@ -85,7 +85,7 @@ public class RemoteBlockReader2  impleme
   
   Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
   private ReadableByteChannel in;
-  private DataChecksum checksum;
+  protected DataChecksum checksum;
   
   private PacketHeader curHeader;
   private ByteBuffer curPacketBuf = null;
@@ -96,25 +96,25 @@ public class RemoteBlockReader2  impleme
   private long lastSeqNo = -1;
 
   /** offset in block where reader wants to actually read */
-  private long startOffset;
-  private final String filename;
+  protected long startOffset;
+  protected final String filename;
 
-  private static DirectBufferPool bufferPool =
+  protected static DirectBufferPool bufferPool =
     new DirectBufferPool();
   private ByteBuffer headerBuf = ByteBuffer.allocate(
       PacketHeader.PKT_HEADER_LEN);
 
-  private int bytesPerChecksum;
-  private int checksumSize;
+  protected int bytesPerChecksum;
+  protected int checksumSize;
 
   /**
    * The total number of bytes we need to transfer from the DN.
    * This is the amount that the user has requested plus some padding
    * at the beginning so that the read can begin on a chunk boundary.
    */
-  private long bytesNeededToFinish;
+  protected long bytesNeededToFinish;
 
-  private final boolean verifyChecksum;
+  protected final boolean verifyChecksum;
 
   private boolean sentStatusCode = false;
   
@@ -271,7 +271,7 @@ public class RemoteBlockReader2  impleme
     }
   }
 
-  private RemoteBlockReader2(String file, String bpid, long blockId,
+  protected RemoteBlockReader2(String file, String bpid, long blockId,
       ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
       long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
     // Path is used only for printing block and file information in debug

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java?rev=1204792&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java Tue Nov 22 02:57:04 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * A block and the full path information to the block data file and
+ * the metadata file stored on the local file system.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockLocalPathInfo implements Writable {
+  static final WritableFactory FACTORY = new WritableFactory() {
+    public Writable newInstance() { return new BlockLocalPathInfo(); }
+  };
+  static {                                      // register a ctor
+    WritableFactories.setFactory(BlockLocalPathInfo.class, FACTORY);
+  }
+
+  private ExtendedBlock block;
+  private String localBlockPath = "";  // local file storing the data
+  private String localMetaPath = "";   // local file storing the checksum
+
+  public BlockLocalPathInfo() {}
+
+  /**
+   * Constructs BlockLocalPathInfo.
+   * @param b The block corresponding to this lock path info. 
+   * @param file Block data file.
+   * @param metafile Metadata file for the block.
+   */
+  public BlockLocalPathInfo(ExtendedBlock b, String file, String metafile) {
+    block = b;
+    localBlockPath = file;
+    localMetaPath = metafile;
+  }
+
+  /**
+   * Get the Block data file.
+   * @return Block data file.
+   */
+  public String getBlockPath() {return localBlockPath;}
+  
+  /**
+   * Get the Block metadata file.
+   * @return Block metadata file.
+   */
+  public String getMetaPath() {return localMetaPath;}
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    block.write(out);
+    Text.writeString(out, localBlockPath);
+    Text.writeString(out, localMetaPath);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    block = new ExtendedBlock();
+    block.readFields(in);
+    localBlockPath = Text.readString(in);
+    localMetaPath = Text.readString(in);
+  }
+  
+  /**
+   * Get number of bytes in the block.
+   * @return Number of bytes in the block.
+   */
+  public long getNumBytes() {
+    return block.getNumBytes();
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Tue Nov 22 02:57:04 2011
@@ -22,9 +22,11 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
 
 /** An client-datanode protocol for block recovery
@@ -82,5 +84,30 @@ public interface ClientDatanodeProtocol 
    *          deleted along with its contents.
    * @throws IOException
    */
-  void deleteBlockPool(String bpid, boolean force) throws IOException; 
+  void deleteBlockPool(String bpid, boolean force) throws IOException;
+  
+  /**
+   * Retrieves the path names of the block file and metadata file stored on the
+   * local file system.
+   * 
+   * In order for this method to work, one of the following should be satisfied:
+   * <ul>
+   * <li>
+   * The client user must be configured at the datanode to be able to use this
+   * method.</li>
+   * <li>
+   * When security is enabled, kerberos authentication must be used to connect
+   * to the datanode.</li>
+   * </ul>
+   * 
+   * @param block
+   *          the specified block on the local datanode
+   * @param token
+   *          the block access token.
+   * @return the BlockLocalPathInfo of a block
+   * @throws IOException
+   *           on error
+   */
+  BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+      Token<BlockTokenIdentifier> token) throws IOException;
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolServerSideTranslatorR23.java Tue Nov 22 02:57:04 2011
@@ -21,9 +21,13 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
 
 /**
  * This class is used on the server side.
@@ -116,4 +120,10 @@ public class ClientDatanodeProtocolServe
   public void deleteBlockPool(String bpid, boolean force) throws IOException {
     server.deleteBlockPool(bpid, force);
   }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    return server.getBlockLocalPathInfo(block, token);
+  }
 }
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeProtocolTranslatorR23.java Tue Nov 22 02:57:04 2011
@@ -26,14 +26,17 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 
 
 /**
@@ -63,6 +66,23 @@ public class ClientDatanodeProtocolTrans
     rpcProxy = createClientDatanodeProtocolProxy(addr, ticket, conf, factory);
   }
   
+  /**
+   * Constructor.
+   * @param datanodeid Datanode to connect to.
+   * @param conf Configuration.
+   * @param socketTimeout Socket timeout to use.
+   * @throws IOException
+   */
+  public ClientDatanodeProtocolTranslatorR23(DatanodeID datanodeid,
+      Configuration conf, int socketTimeout) throws IOException {
+    InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getHost()
+        + ":" + datanodeid.getIpcPort());
+    rpcProxy = RPC.getProxy(ClientDatanodeWireProtocol.class,
+        ClientDatanodeWireProtocol.versionID, addr,
+        UserGroupInformation.getCurrentUser(), conf,
+        NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+  }
+
   static ClientDatanodeWireProtocol createClientDatanodeProtocolProxy(
       DatanodeID datanodeid, Configuration conf, int socketTimeout,
       LocatedBlock locatedBlock)
@@ -134,4 +154,9 @@ public class ClientDatanodeProtocolTrans
 
   }
 
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    return rpcProxy.getBlockLocalPathInfo(block, token);
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ClientDatanodeWireProtocol.java Tue Nov 22 02:57:04 2011
@@ -24,11 +24,15 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.ProtocolInfo;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
 
 /** 
@@ -78,6 +82,13 @@ public interface ClientDatanodeWireProto
   void deleteBlockPool(String bpid, boolean force) throws IOException; 
   
   /**
+   * The specification of this method matches that of
+   * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
+   */
+  BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+      Token<BlockTokenIdentifier> token) throws IOException;
+  
+  /**
    * This method is defined to get the protocol signature using 
    * the R23 protocol - hence we have added the suffix of 2 to the method name
    * to avoid conflict.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Tue Nov 22 02:57:04 2011
@@ -28,6 +28,9 @@ import java.io.RandomAccessFile;
 
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 
 
 /**
@@ -35,7 +38,9 @@ import org.apache.hadoop.util.DataChecks
  * This is not related to the Block related functionality in Namenode.
  * The biggest part of data block metadata is CRC for the block.
  */
-class BlockMetadataHeader {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMetadataHeader {
 
   static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
   
@@ -51,12 +56,14 @@ class BlockMetadataHeader {
     this.checksum = checksum;
     this.version = version;
   }
-    
-  short getVersion() {
+  
+  /** Get the version */
+  public short getVersion() {
     return version;
   }
 
-  DataChecksum getChecksum() {
+  /** Get the checksum */
+  public DataChecksum getChecksum() {
     return checksum;
   }
 
@@ -67,7 +74,7 @@ class BlockMetadataHeader {
    * @return Metadata Header
    * @throws IOException
    */
-  static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+  public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
     return readHeader(in.readShort(), in);
   }
   

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Nov 22 02:57:04 2011
@@ -54,6 +54,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
 
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -89,6 +90,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -110,6 +112,7 @@ import org.apache.hadoop.hdfs.protocolR2
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -130,6 +133,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolServerSideTranslatorR23;
 import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolTranslatorR23;
 import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeWireProtocol;
@@ -145,8 +149,10 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -398,6 +404,8 @@ public class DataNode extends Configured
   private AbstractList<File> dataDirs;
   private Configuration conf;
 
+  private final String userWithLocalPathAccess;
+
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
    * 'dataDirs' is where the blocks are stored.
@@ -416,6 +424,8 @@ public class DataNode extends Configured
            final SecureResources resources) throws IOException {
     super(conf);
 
+    this.userWithLocalPathAccess = conf
+        .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
     try {
       hostName = getHostName(conf);
       startDataNode(conf, dataDirs, resources);
@@ -1074,6 +1084,68 @@ public class DataNode extends Configured
     return "DS-" + rand + "-" + ip + "-" + port + "-"
         + System.currentTimeMillis();
   }
+  
+  /** Ensure the authentication method is kerberos */
+  private void checkKerberosAuthMethod(String msg) throws IOException {
+    // User invoking the call must be same as the datanode user
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() != 
+        AuthenticationMethod.KERBEROS) {
+      throw new AccessControlException("Error in " + msg
+          + "Only kerberos based authentication is allowed.");
+    }
+  }
+  
+  private void checkBlockLocalPathAccess() throws IOException {
+    checkKerberosAuthMethod("getBlockLocalPathInfo()");
+    String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (!currentUser.equals(this.userWithLocalPathAccess)) {
+      throw new AccessControlException(
+          "Can't continue with getBlockLocalPathInfo() "
+              + "authorization. The user " + currentUser
+              + " is not allowed to call getBlockLocalPathInfo");
+    }
+  }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    checkBlockLocalPathAccess();
+    checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+    BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
+    if (LOG.isDebugEnabled()) {
+      if (info != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("getBlockLocalPathInfo successful block=" + block
+              + " blockfile " + info.getBlockPath() + " metafile "
+              + info.getMetaPath());
+        }
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("getBlockLocalPathInfo for block=" + block
+              + " returning null");
+        }
+      }
+    }
+    metrics.incrBlocksGetLocalPathInfo();
+    return info;
+  }
+  
+  private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
+      AccessMode accessMode) throws IOException {
+    if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+      BlockTokenIdentifier id = new BlockTokenIdentifier();
+      ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+      DataInputStream in = new DataInputStream(buf);
+      id.readFields(in);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got: " + id.toString());
+      }
+      blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode);
+    }
+  }
 
   /**
    * Shut down this instance of the datanode.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Nov 22 02:57:04 2011
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -2658,4 +2659,14 @@ public class FSDataset implements FSData
       volume.deleteBPDirectories(bpid, force);
     }
   }
+  
+  @Override // FSDatasetInterface
+  public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
+      throws IOException {
+    File datafile = getBlockFile(block);
+    File metafile = getMetaFile(datafile, block.getGenerationStamp());
+    BlockLocalPathInfo info = new BlockLocalPathInfo(block,
+        datafile.getAbsolutePath(), metafile.getAbsolutePath());
+    return info;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Nov 22 02:57:04 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
 
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
@@ -402,4 +404,9 @@ public interface FSDatasetInterface exte
    * @throws IOException
    */
   public void deleteBlockPool(String bpid, boolean force) throws IOException;
+  
+  /**
+   * Get {@link BlockLocalPathInfo} for the given block.
+   **/
+  public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) throws IOException;
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Tue Nov 22 02:57:04 2011
@@ -60,6 +60,7 @@ public class DataNodeMetrics {
   @Metric MutableCounterLong readsFromRemoteClient;
   @Metric MutableCounterLong writesFromLocalClient;
   @Metric MutableCounterLong writesFromRemoteClient;
+  @Metric MutableCounterLong blocksGetLocalPathInfo;
   
   @Metric MutableCounterLong volumeFailures;
 
@@ -165,4 +166,9 @@ public class DataNodeMetrics {
   public void incrVolumeFailures() {
     volumeFailures.incr();
   }
+
+  /** Increment for getBlockLocalPathInfo calls */
+  public void incrBlocksGetLocalPathInfo() {
+    blocksGetLocalPathInfo.incr();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Tue Nov 22 02:57:04 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 
 import org.apache.hadoop.security.token.Token;
 import org.junit.Test;
@@ -209,6 +210,7 @@ public class TestConnCache {
     MockGetBlockReader answer = new MockGetBlockReader();
     Mockito.doAnswer(answer).when(in).getBlockReader(
                            (InetSocketAddress) Matchers.anyObject(),
+                           (DatanodeInfo) Matchers.anyObject(),
                            Matchers.anyString(),
                            (ExtendedBlock) Matchers.anyObject(),
                            (Token<BlockTokenIdentifier>) Matchers.anyObject(),

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1204792&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Tue Nov 22 02:57:04 2011
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for short circuit read functionality using {@link BlockReaderLocal}.
+ * When a block is being read by a client is on the local datanode, instead of
+ * using {@link DataTransferProtocol} and connect to datanode, the short circuit
+ * read allows reading the file directly from the files on the local file
+ * system.
+ */
+public class TestShortCircuitLocalRead {
+
+  static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 5120;
+  boolean simulatedStorage = false;
+  
+  // creates a file but does not close it
+  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, (long)blockSize);
+    return stm;
+  }
+
+  static private void checkData(byte[] actual, int from, byte[] expected,
+      String message) {
+    checkData(actual, from, expected, actual.length, message);
+  }
+  
+  static private void checkData(byte[] actual, int from, byte[] expected, int len,
+      String message) {
+    for (int idx = 0; idx < len; idx++) {
+      if (expected[from + idx] != actual[idx]) {
+        Assert.fail(message + " byte " + (from + idx) + " differs. expected "
+            + expected[from + idx] + " actual " + actual[idx]);
+      }
+    }
+  }
+
+  static void checkFileContent(FileSystem fs, Path name, byte[] expected,
+      int readOffset) throws IOException {
+    FSDataInputStream stm = fs.open(name);
+    byte[] actual = new byte[expected.length-readOffset];
+    stm.readFully(readOffset, actual);
+    checkData(actual, readOffset, expected, "Read 2");
+    stm.close();
+    // Now read using a different API.
+    actual = new byte[expected.length-readOffset];
+    stm = fs.open(name);
+    long skipped = stm.skip(readOffset);
+    Assert.assertEquals(skipped, readOffset);
+    //Read a small number of bytes first.
+    int nread = stm.read(actual, 0, 3);
+    nread += stm.read(actual, nread, 2);
+    //Read across chunk boundary
+    nread += stm.read(actual, nread, 517);
+    checkData(actual, readOffset, expected, nread, "A few bytes");
+    //Now read rest of it
+    while (nread < actual.length) {
+      int nbytes = stm.read(actual, nread, actual.length - nread);
+      if (nbytes < 0) {
+        throw new EOFException("End of file reached before reading fully.");
+      }
+      nread += nbytes;
+    }
+    checkData(actual, readOffset, expected, "Read 3");
+    stm.close();
+  }
+
+  /**
+   * Test that file data can be read by reading the block file
+   * directly from the local store.
+   */
+  public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+      int readOffset) throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        ignoreChecksum);
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+        .format(true).build();
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      // check that / exists
+      Path path = new Path("/");
+      assertTrue("/ should be a directory", fs.getFileStatus(path)
+          .isDirectory() == true);
+      
+      byte[] fileData = AppendTestUtil.randomBytes(seed, size);
+      // create a new file in home directory. Do not close it.
+      Path file1 = new Path("filelocal.dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1);
+
+      // write to file
+      stm.write(fileData);
+      stm.close();
+      checkFileContent(fs, file1, fileData, readOffset);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFileLocalReadNoChecksum() throws IOException {
+    doTestShortCircuitRead(true, 3*blockSize+100, 0);
+  }
+
+  @Test
+  public void testFileLocalReadChecksum() throws IOException {
+    doTestShortCircuitRead(false, 3*blockSize+100, 0);
+  }
+  
+  @Test
+  public void testSmallFileLocalRead() throws IOException {
+    doTestShortCircuitRead(false, 13, 0);
+    doTestShortCircuitRead(false, 13, 5);
+    doTestShortCircuitRead(true, 13, 0);
+    doTestShortCircuitRead(true, 13, 5);
+  }
+  
+  @Test
+  public void testReadFromAnOffset() throws IOException {
+    doTestShortCircuitRead(false, 3*blockSize+100, 777);
+    doTestShortCircuitRead(true, 3*blockSize+100, 777);
+  }
+  
+  @Test
+  public void testLongFile() throws IOException {
+    doTestShortCircuitRead(false, 10*blockSize+100, 777);
+    doTestShortCircuitRead(true, 10*blockSize+100, 777);
+  }
+   
+  @Test
+  public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+    final Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser");
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+        .format(true).build();
+    cluster.waitActive();
+    final DataNode dn = cluster.getDataNodes().get(0);
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
+      UserGroupInformation aUgi = UserGroupInformation
+          .createRemoteUser("alloweduser");
+      LocatedBlocks lb = cluster.getNameNode().getRpcServer()
+          .getBlockLocations("/tmp/x", 0, 16);
+      // Create a new block object, because the block inside LocatedBlock at
+      // namenode is of type BlockInfo.
+      ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
+      Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
+      final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
+      ClientDatanodeProtocol proxy = aUgi
+          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+            @Override
+            public ClientDatanodeProtocol run() throws Exception {
+              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
+                  60000);
+            }
+          });
+      
+      //This should succeed
+      BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
+      Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(),
+          blpi.getBlockPath());
+
+      // Now try with a not allowed user.
+      UserGroupInformation bUgi = UserGroupInformation
+          .createRemoteUser("notalloweduser");
+      proxy = bUgi
+          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+            @Override
+            public ClientDatanodeProtocol run() throws Exception {
+              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
+                  60000);
+            }
+          });
+      try {
+        proxy.getBlockLocalPathInfo(blk, token);
+        Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+            + " is not allowed to call getBlockLocalPathInfo");
+      } catch (IOException ex) {
+        Assert.assertTrue(ex.getMessage().contains(
+            "not allowed to call getBlockLocalPathInfo"));
+      }
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Test to run benchmarks between shortcircuit read vs regular read with
+   * specified number of threads simultaneously reading.
+   * <br>
+   * Run this using the following command:
+   * bin/hadoop --config confdir \
+   * org.apache.hadoop.hdfs.TestShortCircuitLocalRead \
+   * <shortcircuit on?> <checsum on?> <Number of threads>
+   */
+  public static void main(String[] args) throws Exception {    
+    if (args.length != 3) {
+      System.out.println("Usage: test shortcircuit checksum threadCount");
+      System.exit(1);
+    }
+    boolean shortcircuit = Boolean.valueOf(args[0]);
+    boolean checksum = Boolean.valueOf(args[1]);
+    int threadCount = Integer.valueOf(args[2]);
+
+    // Setup create a file
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        checksum);
+    
+    //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
+    int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
+    final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
+    
+    // create a new file in home directory. Do not close it.
+    final Path file1 = new Path("filelocal.dat");
+    final FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream stm = createFile(fs, file1, 1);
+    
+    stm.write(dataToWrite);
+    stm.close();
+
+    long start = System.currentTimeMillis();
+    final int iteration = 20;
+    Thread[] threads = new Thread[threadCount];
+    for (int i = 0; i < threadCount; i++) {
+      threads[i] = new Thread() {
+        public void run() {
+          for (int i = 0; i < iteration; i++) {
+            try {
+              checkFileContent(fs, file1, dataToWrite, 0);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+      };
+    }
+    for (int i = 0; i < threadCount; i++) {
+      threads[i].start();
+    }
+    for (int i = 0; i < threadCount; i++) {
+      threads[i].join();
+    }
+    long end = System.currentTimeMillis();
+    System.out.println("Iteration " + iteration + " took " + (end - start));
+    fs.delete(file1, false);
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1204792&r1=1204791&r2=1204792&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Nov 22 02:57:04 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.BlockPoolSlice;
@@ -991,4 +992,10 @@ public class SimulatedFSDataset  impleme
     }
     return r;
   }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b)
+      throws IOException {
+    throw new IOException("getBlockLocalPathInfo not supported.");
+  }
 }