You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2012/04/23 23:37:56 UTC

svn commit: r1329468 [1/2] - in /hadoop/common/branches/branch-0.22/hdfs: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/...

Author: shv
Date: Mon Apr 23 21:37:55 2012
New Revision: 1329468

URL: http://svn.apache.org/viewvc?rev=1329468&view=rev
Log:
HDFS-2246. Enable reading a block directly from local file system for a client on the same node as the block file. Contributed by Andrew Purtell, Suresh, Jitendra and Benoy

Added:
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java   (with props)
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java   (with props)
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java   (with props)
    hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java   (with props)
Modified:
    hadoop/common/branches/branch-0.22/hdfs/CHANGES.txt
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests
    hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
    hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java
    hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java

Modified: hadoop/common/branches/branch-0.22/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/CHANGES.txt?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.22/hdfs/CHANGES.txt Mon Apr 23 21:37:55 2012
@@ -16,6 +16,10 @@ Release 0.22.1 - Unreleased
 
     HDFS-1601. Pipeline ACKs are sent as lots of tiny TCP packets (todd)
 
+    HDFS-2246. Enable reading a block directly from local file system
+    for a client on the same node as the block file.  (Andrew Purtell,
+    Suresh Srinivas, Jitendra Nath Pandey and Benoy Antony via shv)
+
   BUG FIXES
 
     HDFS-1910. NameNode should not save fsimage twice. (shv)

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java Mon Apr 23 21:37:55 2012
@@ -17,507 +17,30 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.Closeable;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-
-/** This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- *   <dd>The hdfs block, typically large (~64MB).
- *   </dd>
- * <dt>chunk</dt>
- *   <dd>A block is divided into chunks, each comes with a checksum.
- *       We want transfers to be chunk-aligned, to be able to
- *       verify checksums.
- *   </dd>
- * <dt>packet</dt>
- *   <dd>A grouping of chunks used for transport. It contains a
- *       header, followed by checksum data, followed by real data.
- *   </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
- */
-@InterfaceAudience.Private
-public class BlockReader extends FSInputChecker {
-
-  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
-  private DataInputStream in;
-  private DataChecksum checksum;
-
-  /** offset in block of the last chunk received */
-  private long lastChunkOffset = -1;
-  private long lastChunkLen = -1;
-  private long lastSeqNo = -1;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
 
-  /** offset in block of of first chunk - may be less than startOffset
-      if startOffset is not chunk-aligned */
-  private final long firstChunkOffset;
-
-  private int bytesPerChecksum;
-  private int checksumSize;
 
   /**
-   * The total number of bytes we need to transfer from the DN.
-   * This is the amount that the user has requested plus some padding
-   * at the beginning so that the read can begin on a chunk boundary.
-   */
-  private final long bytesNeededToFinish;
-
-  private boolean eos = false;
-  private boolean sentStatusCode = false;
-  
-  byte[] skipBuf = null;
-  ByteBuffer checksumBytes = null;
-  /** Amount of unread data in the current received packet */
-  int dataLeft = 0;
-  
-  /* FSInputChecker interface */
-  
-  /* same interface as inputStream java.io.InputStream#read()
-   * used by DFSInputStream#read()
-   * This violates one rule when there is a checksum error:
-   * "Read should not modify user buffer before successful read"
-   * because it first reads the data to user buffer and then checks
-   * the checksum.
+ * The API shared between local and remote block readers.
    */
-  @Override
-  public synchronized int read(byte[] buf, int off, int len) 
-                               throws IOException {
-    
-    // This has to be set here, *before* the skip, since we can
-    // hit EOS during the skip, in the case that our entire read
-    // is smaller than the checksum chunk.
-    boolean eosBefore = eos;
+public interface BlockReader extends Closeable {
 
-    //for the first read, skip the extra bytes at the front.
-    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
-      // Skip these bytes. But don't call this.skip()!
-      int toSkip = (int)(startOffset - firstChunkOffset);
-      if ( skipBuf == null ) {
-        skipBuf = new byte[bytesPerChecksum];
-      }
-      if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
-        // should never happen
-        throw new IOException("Could not skip required number of bytes");
-      }
-    }
-    
-    int nRead = super.read(buf, off, len);
+  public int read(byte buf[], int off, int len) throws IOException;
 
-    // if eos was set in the previous read, send a status code to the DN
-    if (eos && !eosBefore && nRead >= 0) {
-      if (needChecksum()) {
-        sendReadResult(dnSock, CHECKSUM_OK);
-      } else {
-        sendReadResult(dnSock, SUCCESS);
-      }
-    }
-    return nRead;
-  }
+  public int readAll(byte[] buf, int offset, int len) throws IOException;
 
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */  
-    if ( skipBuf == null ) {
-      skipBuf = new byte[bytesPerChecksum]; 
-    }
-
-    long nSkipped = 0;
-    while ( nSkipped < n ) {
-      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
-      int ret = read(skipBuf, 0, toSkip);
-      if ( ret <= 0 ) {
-        return nSkipped;
-      }
-      nSkipped += ret;
-    }
-    return nSkipped;
-  }
-
-  @Override
-  public int read() throws IOException {
-    throw new IOException("read() is not expected to be invoked. " +
-                          "Use read(buf, off, len) instead.");
-  }
-  
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    /* Checksum errors are handled outside the BlockReader. 
-     * DFSInputStream does not always call 'seekToNewSource'. In the 
-     * case of pread(), it just tries a different replica without seeking.
-     */ 
-    return false;
-  }
-  
-  @Override
-  public void seek(long pos) throws IOException {
-    throw new IOException("Seek() is not supported in BlockInputChecker");
-  }
-
-  @Override
-  protected long getChunkPosition(long pos) {
-    throw new RuntimeException("getChunkPosition() is not supported, " +
-                               "since seek is not required");
-  }
-  
-  /**
-   * Makes sure that checksumBytes has enough capacity 
-   * and limit is set to the number of checksum bytes needed 
-   * to be read.
-   */
-  private void adjustChecksumBytes(int dataLen) {
-    int requiredSize = 
-      ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
-    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
-      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
-    } else {
-      checksumBytes.clear();
-    }
-    checksumBytes.limit(requiredSize);
-  }
-  
-  @Override
-  protected synchronized int readChunk(long pos, byte[] buf, int offset, 
-                                       int len, byte[] checksumBuf) 
-                                       throws IOException {
-    // Read one chunk.
-    if (eos) {
-      // Already hit EOF
-      return -1;
-    }
-    
-    // Read one DATA_CHUNK.
-    long chunkOffset = lastChunkOffset;
-    if ( lastChunkLen > 0 ) {
-      chunkOffset += lastChunkLen;
-    }
-    
-    // pos is relative to the start of the first chunk of the read.
-    // chunkOffset is relative to the start of the block.
-    // This makes sure that the read passed from FSInputChecker is the
-    // for the same chunk we expect to be reading from the DN.
-    if ( (pos + firstChunkOffset) != chunkOffset ) {
-      throw new IOException("Mismatch in pos : " + pos + " + " + 
-                            firstChunkOffset + " != " + chunkOffset);
-    }
-
-    // Read next packet if the previous packet has been read completely.
-    if (dataLeft <= 0) {
-      //Read packet headers.
-      PacketHeader header = new PacketHeader();
-      header.readFields(in);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("DFSClient readChunk got header " + header);
-      }
-
-      // Sanity check the lengths
-      if (!header.sanityCheck(lastSeqNo)) {
-           throw new IOException("BlockReader: error in packet header " +
-                                 header);
-      }
-
-      lastSeqNo = header.getSeqno();
-      dataLeft = header.getDataLen();
-      adjustChecksumBytes(header.getDataLen());
-      if (header.getDataLen() > 0) {
-        IOUtils.readFully(in, checksumBytes.array(), 0,
-                          checksumBytes.limit());
-      }
-    }
-
-    // Sanity checks
-    assert len >= bytesPerChecksum;
-    assert checksum != null;
-    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
-    int checksumsToRead, bytesToRead;
-
-    if (checksumSize > 0) {
-
-      // How many chunks left in our packet - this is a ceiling
-      // since we may have a partial chunk at the end of the file
-      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
-      // How many chunks we can fit in databuffer
-      //  - note this is a floor since we always read full chunks
-      int chunksCanFit = Math.min(len / bytesPerChecksum,
-                                  checksumBuf.length / checksumSize);
-
-      // How many chunks should we read
-      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
-      // How many bytes should we actually read
-      bytesToRead = Math.min(
-        checksumsToRead * bytesPerChecksum, // full chunks
-        dataLeft); // in case we have a partial
-    } else {
-      // no checksum
-      bytesToRead = Math.min(dataLeft, len);
-      checksumsToRead = 0;
-    }
-
-    if ( bytesToRead > 0 ) {
-      // Assert we have enough space
-      assert bytesToRead <= len;
-      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
-      assert checksumBuf.length >= checksumSize * checksumsToRead;
-      IOUtils.readFully(in, buf, offset, bytesToRead);
-      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
-    }
-
-    dataLeft -= bytesToRead;
-    assert dataLeft >= 0;
-
-    lastChunkOffset = chunkOffset;
-    lastChunkLen = bytesToRead;
-
-    // If there's no data left in the current packet after satisfying
-    // this read, and we have satisfied the client read, we expect
-    // an empty packet header from the DN to signify this.
-    // Note that pos + bytesToRead may in fact be greater since the
-    // DN finishes off the entire last chunk.
-    if (dataLeft == 0 &&
-        pos + bytesToRead >= bytesNeededToFinish) {
-
-      // Read header
-      int packetLen = in.readInt();
-      long offsetInBlock = in.readLong();
-      long seqno = in.readLong();
-      boolean lastPacketInBlock = in.readBoolean();
-      int dataLen = in.readInt();
-
-      if (!lastPacketInBlock ||
-          dataLen != 0) {
-        throw new IOException("Expected empty end-of-read packet! Header: " +
-                              "(packetLen : " + packetLen + 
-                              ", offsetInBlock : " + offsetInBlock +
-                              ", seqno : " + seqno + 
-                              ", lastInBlock : " + lastPacketInBlock +
-                              ", dataLen : " + dataLen);
-      }
-
-      eos = true;
-    }
-
-    if ( bytesToRead == 0 ) {
-      return -1;
-    }
-
-    return bytesToRead;
-  }
-  
-  private BlockReader( String file, long blockId, DataInputStream in, 
-                       DataChecksum checksum, boolean verifyChecksum,
-                       long startOffset, long firstChunkOffset,
-                       long bytesToRead,
-                       Socket dnSock ) {
-    // Path is used only for printing block and file information in debug
-    super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
-          1, verifyChecksum,
-          checksum.getChecksumSize() > 0? checksum : null, 
-          checksum.getBytesPerChecksum(),
-          checksum.getChecksumSize());
-    
-    this.dnSock = dnSock;
-    this.in = in;
-    this.checksum = checksum;
-    this.startOffset = Math.max( startOffset, 0 );
-
-    // The total number of bytes that we need to transfer from the DN is
-    // the amount that the user wants (bytesToRead), plus the padding at
-    // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read starts/ends mid-chunk.
-    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
-    this.firstChunkOffset = firstChunkOffset;
-    lastChunkOffset = firstChunkOffset;
-    lastChunkLen = -1;
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-  }
-
-  public static BlockReader newBlockReader(Socket sock, String file,
-      Block block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len, int bufferSize) throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
-        true);
-  }
-
-  /** Java Doc required */
-  public static BlockReader newBlockReader( Socket sock, String file, 
-                                     Block block, 
-                                     Token<BlockTokenIdentifier> blockToken,
-                                     long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum)
-                                     throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset,
-                          len, bufferSize, verifyChecksum, "");
-  }
-
-  /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   *
-   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @return New BlockReader instance, or null on error.
-   */
-  public static BlockReader newBlockReader( Socket sock, String file,
-                                     Block block, 
-                                     Token<BlockTokenIdentifier> blockToken,
-                                     long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum,
-                                     String clientName)
-                                     throws IOException {
-    // in and out will be closed when sock is closed (by the caller)
-    DataTransferProtocol.Sender.opReadBlock(
-        new DataOutputStream(new BufferedOutputStream(
-            NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
-        block, startOffset, len, clientName, blockToken);
-    
-    //
-    // Get bytes in block, set streams
-    //
-
-    DataInputStream in = new DataInputStream(
-        new BufferedInputStream(NetUtils.getInputStream(sock), 
-                                bufferSize));
-    
-    DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
-    if (status != SUCCESS) {
-      if (status == ERROR_ACCESS_TOKEN) {
-        throw new InvalidBlockTokenException(
-            "Got access token error for OP_READ_BLOCK, self="
-                + sock.getLocalSocketAddress() + ", remote="
-                + sock.getRemoteSocketAddress() + ", for file " + file
-                + ", for block " + block.getBlockId() 
-                + "_" + block.getGenerationStamp());
-      } else {
-        throw new IOException("Got error for OP_READ_BLOCK, self="
-            + sock.getLocalSocketAddress() + ", remote="
-            + sock.getRemoteSocketAddress() + ", for file " + file
-            + ", for block " + block.getBlockId() + "_" 
-            + block.getGenerationStamp());
-      }
-    }
-    DataChecksum checksum = DataChecksum.newDataChecksum( in );
-    //Warning when we get CHECKSUM_NULL?
-    
-    // Read the first chunk offset.
-    long firstChunkOffset = in.readLong();
-    
-    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-        firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
-      throw new IOException("BlockReader: error in first chunk offset (" +
-                            firstChunkOffset + ") startOffset is " + 
-                            startOffset + " for file " + file);
-    }
-
-    return new BlockReader(file, block.getBlockId(), in, checksum,
-        verifyChecksum, startOffset, firstChunkOffset, len, sock);
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    startOffset = -1;
-    checksum = null;
-    if (dnSock != null) {
-      dnSock.close();
-    }
-
-    // in will be closed when its Socket is closed.
-  }
-  
-  /** kind of like readFully(). Only reads as much as possible.
-   * And allows use of protected readFully().
-   */
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return readFully(this, buf, offset, len);
-  }
+  public long skip(long n) throws IOException;
 
   /**
    * Take the socket used to talk to the DN.
    */
-  public Socket takeSocket() {
-    assert hasSentStatusCode() :
-      "BlockReader shouldn't give back sockets mid-read";
-    Socket res = dnSock;
-    dnSock = null;
-    return res;
-  }
+  Socket takeSocket();
 
   /**
    * Whether the BlockReader has reached the end of its input stream
    * and successfully sent a status code back to the datanode.
    */
-  public boolean hasSentStatusCode() {
-    return sentStatusCode;
-  }
-
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + sock;
-    try {
-      OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-      statusCode.writeOutputStream(out);
-      out.flush();
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               sock.getInetAddress() + ": " + e.getMessage());
-    }
-  }
-  
-  // File name to print when accessing a block directory from servlets
-  public static String getFileName(final InetSocketAddress s,
-      final long blockId) {
-    return s.toString() + ":" + blockId;
-  }
+  boolean hasSentStatusCode();
 }

Added: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1329468&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (added)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Mon Apr 23 21:37:55 2012
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/**
+ * BlockReaderLocal enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ * {@link BlockReaderLocal} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the path to the file where block is stored using
+ * {@link ClientDatanodeProtocol#getBlockLocalPathInfo(Block, Token)} RPC call</li>
+ * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+ * if security is enabled.</li>
+ * </ul>
+ */
+class BlockReaderLocal  extends FSInputChecker implements BlockReader  {
+  public static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
+
+
+  private DataChecksum checksum;
+  private int bytesPerChecksum;
+  private int checksumSize;
+  private long firstChunkOffset;
+  private long lastChunkLen = -1;
+  private long lastChunkOffset = -1;
+  private long startOffset;
+  private boolean eos = false;
+  private byte[] skipBuf = null;
+
+
+
+  //Stores the cache and proxy for a local datanode.
+  private static class LocalDatanodeInfo {
+    private ClientDatanodeProtocol proxy = null;
+    private final Map<Block, BlockLocalPathInfo> cache;
+
+    LocalDatanodeInfo() {
+      final int cacheSize = 10000;
+      final float hashTableLoadFactor = 0.75f;
+      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
+      cache = Collections
+      .synchronizedMap(new LinkedHashMap<Block, BlockLocalPathInfo>(
+          hashTableCapacity, hashTableLoadFactor, true) {
+        private static final long serialVersionUID = 1;
+
+        @Override
+        protected boolean removeEldestEntry(
+            Map.Entry<Block, BlockLocalPathInfo> eldest) {
+          return size() > cacheSize;
+        }
+      });
+    }
+
+    private synchronized ClientDatanodeProtocol getDatanodeProxy(
+        DatanodeInfo node, Configuration conf, int socketTimeout)
+    throws IOException {
+      if (proxy == null) {
+        proxy = DFSClient.createClientDatanodeProtocolProxy(node, conf,
+            socketTimeout);
+      }
+      return proxy;
+    }
+
+    private synchronized void resetDatanodeProxy() {
+      if (null != proxy) {
+        RPC.stopProxy(proxy);
+        proxy = null;
+      }
+    }
+
+    private BlockLocalPathInfo getBlockLocalPathInfo(Block b) {
+      return cache.get(b);
+    }
+
+    private void setBlockLocalPathInfo(Block b, BlockLocalPathInfo info) {
+      cache.put(b, info);
+    }
+
+    private void removeBlockLocalPathInfo(Block b) {
+      cache.remove(b);
+    }
+  }
+
+  // Multiple datanodes could be running on the local machine. Store proxies in
+  // a map keyed by the ipc port of the datanode.
+  private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+
+  private FileInputStream dataIn; // reader for the data file
+  private FileInputStream checksumIn;   // reader for the checksum file
+
+  /**
+   * The only way this object can be instantiated.
+   */
+  static BlockReaderLocal newBlockReader(Configuration conf,
+      String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, 
+      int socketTimeout, long startOffset, long length) throws IOException {
+
+    LocalDatanodeInfo localDatanodeInfo =  getLocalDatanodeInfo(node.getIpcPort());
+    // check the cache first
+    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+    if (pathinfo == null) {
+      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+    }
+
+    // check to see if the file exists. It may so happen that the
+    // HDFS file has been deleted and this block-lookup is occurring
+    // on behalf of a new HDFS file. This time, the block file could
+    // be residing in a different portion of the fs.data.dir directory.
+    // In this case, we remove this entry from the cache. The next
+    // call to this method will re-populate the cache.
+    FileInputStream dataIn = null;
+    FileInputStream checksumIn = null;
+    BlockReaderLocal localBlockReader = null;
+    boolean skipChecksum = skipChecksumCheck(conf);
+    try {
+      // get a local file system
+      File blkfile = new File(pathinfo.getBlockPath());
+      dataIn = new FileInputStream(blkfile);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+            + blkfile.length() + " startOffset " + startOffset + " length "
+            + length + " short circuit checksum " + skipChecksum);
+      }
+
+      if (!skipChecksum) {
+        // get the metadata file
+        File metafile = new File(pathinfo.getMetaPath());
+        checksumIn = new FileInputStream(metafile);
+
+        // read and handle the common header here. For now just a version
+        BlockMetadataHeader header = BlockMetadataHeader
+        .readHeader(new DataInputStream(checksumIn));
+        short version = header.getVersion();
+        if (version != FSDataset.METADATA_VERSION) {
+          LOG.warn("Wrong version (" + version + ") for metadata file for "
+              + blk + " ignoring ...");
+        }
+        DataChecksum checksum = header.getChecksum();
+        localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
+            pathinfo, checksum, true, dataIn, checksumIn);
+      } else {
+        localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
+            pathinfo, dataIn);
+      }
+    } catch (IOException e) {
+      // remove from cache
+      localDatanodeInfo.removeBlockLocalPathInfo(blk);
+      DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk +
+          " from cache because local file " + pathinfo.getBlockPath() +
+      " could not be opened.");
+      throw e;
+    } finally {
+      if (localBlockReader == null) {
+        if (dataIn != null) {
+          dataIn.close();
+        }
+        if (checksumIn != null) {
+          checksumIn.close();
+        }
+      }  
+    }
+    return localBlockReader;
+  }
+
+  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+    if (ldInfo == null) {
+      ldInfo = new LocalDatanodeInfo();
+      localDatanodeInfoMap.put(port, ldInfo);
+    }
+    return ldInfo;
+  }
+
+  private static BlockLocalPathInfo getBlockPathInfo(Block blk,
+      DatanodeInfo node, Configuration conf, int timeout,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
+    BlockLocalPathInfo pathinfo = null;
+    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
+        conf, timeout);
+    try {
+      // make RPC to local datanode to find local pathnames of blocks
+      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+      if (pathinfo != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
+        }
+        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+      }
+    } catch (IOException e) {
+      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+      throw e;
+    }
+    return pathinfo;
+  }
+
+  private static boolean skipChecksumCheck(Configuration conf) {
+    return conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+  }
+
+  private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
+      Token<BlockTokenIdentifier> token, long startOffset, long length,
+      BlockLocalPathInfo pathinfo, FileInputStream dataIn) throws IOException {
+    super(
+        new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
+        1);
+    this.startOffset = startOffset;
+    this.dataIn = dataIn;
+    long toSkip = startOffset;
+    while (toSkip > 0) {
+      long skipped = dataIn.skip(toSkip);
+      if (skipped == 0) {
+        throw new IOException("Couldn't initialize input stream");
+      }
+      toSkip -= skipped;
+    }
+  }
+
+  private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
+      Token<BlockTokenIdentifier> token, long startOffset, long length,
+      BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum,
+      FileInputStream dataIn, FileInputStream checksumIn) throws IOException {
+    super(
+        new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
+        1,
+        verifyChecksum,
+        checksum.getChecksumSize() > 0? checksum : null,
+            checksum.getBytesPerChecksum(),
+            checksum.getChecksumSize());
+    this.startOffset = startOffset;
+    this.dataIn = dataIn;
+    this.checksumIn = checksumIn;
+    this.checksum = checksum;
+
+    long blockLength = pathinfo.getNumBytes();
+
+    /* If bytesPerChecksum is very large, then the metadata file
+     * is mostly corrupted. For now just truncate bytesPerchecksum to
+     * blockLength.
+     */
+    bytesPerChecksum = checksum.getBytesPerChecksum();
+    if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+      checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+          Math.max((int) blockLength, 10 * 1024 * 1024));
+      bytesPerChecksum = checksum.getBytesPerChecksum();
+    }
+
+    checksumSize = checksum.getChecksumSize();
+
+    if (startOffset < 0 || startOffset > blockLength
+        || (length + startOffset) > blockLength) {
+      String msg = " Offset " + startOffset + " and length " + length
+      + " don't match block " + block + " ( blockLen " + blockLength + " )";
+      LOG.warn("BlockReaderLocal requested with incorrect offset: " + msg);
+      throw new IOException(msg);
+    }
+
+    firstChunkOffset = (startOffset - (startOffset % bytesPerChecksum));
+
+    if (firstChunkOffset > 0) {
+      dataIn.getChannel().position(firstChunkOffset);
+
+      long checksumSkip = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+      if (checksumSkip > 0) {
+        checksumIn.skip(checksumSkip);
+      }
+    }
+
+    lastChunkOffset = firstChunkOffset;
+    lastChunkLen = -1;
+  }
+
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read off " + off + " len " + len);
+    }
+    if (checksum == null) {
+      return dataIn.read(buf, off, len);
+    }
+    // For the first read, skip the extra bytes at the front.
+    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+      // Skip these bytes. But don't call this.skip()!
+      int toSkip = (int)(startOffset - firstChunkOffset);
+      if (skipBuf == null) {
+        skipBuf = new byte[bytesPerChecksum];
+      }
+      if (super.read(skipBuf, 0, toSkip) != toSkip) {
+        // Should never happen
+        throw new IOException("Could not skip " + toSkip + " bytes");
+      }
+    }
+    return super.read(buf, off, len);
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return readFully(this, buf, offset, len);
+  }
+
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("skip " + n);
+    }
+    if (checksum == null) {
+      return dataIn.skip(n);
+    }
+    // Skip by reading the data so we stay in sync with checksums.
+    // This could be implemented more efficiently in the future to
+    // skip to the beginning of the appropriate checksum chunk
+    // and then only read to the middle of that chunk.
+    if (skipBuf == null) {
+      skipBuf = new byte[bytesPerChecksum]; 
+    }
+    long nSkipped = 0;
+    while (nSkipped < n) {
+      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+      int ret = read(skipBuf, 0, toSkip);
+      if (ret <= 0) {
+        return nSkipped;
+      }
+      nSkipped += ret;
+    }
+    return nSkipped;
+  }
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    // Checksum errors are handled outside BlockReaderLocal 
+    return false;
+  }
+
+  @Override
+  protected long getChunkPosition(long pos) {
+    throw new RuntimeException("getChunkPosition() is not supported, " +
+    "since seek is not implemented");
+  }
+
+  @Override
+  public synchronized void seek(long n) throws IOException {
+    throw new IOException("Seek() is not supported in BlockReaderLocal");
+  }
+
+  @Override
+  protected synchronized int readChunk(long pos, byte[] buf, int offset,
+      int len, byte[] checksumBuf) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reading chunk from position " + pos + " at offset " +
+          offset + " with length " + len);
+    }
+
+    if (eos) {
+      startOffset = -1;
+      return -1;
+    }
+
+    if ((pos + firstChunkOffset) != lastChunkOffset) {
+      throw new IOException("Mismatch in pos : " + pos + " + "
+          + firstChunkOffset + " != " + lastChunkOffset);
+    }
+
+    int checksumsToRead, bytesToRead;
+    int nRead = 0;
+
+    if (checksumIn != null) {
+
+      // How many chunks we can fit in databuffer  and checksum Buffer
+      int chunksCanFit = Math.min(len / bytesPerChecksum,checksumBuf.length / checksumSize);
+
+      //compute the bytes to read
+      bytesToRead =  chunksCanFit * bytesPerChecksum;
+
+      nRead = dataIn.read(buf, offset, bytesToRead);
+
+      //now compute the number of checksums to read
+      checksumsToRead = Math.min(((nRead-1)/bytesPerChecksum) + 1 , chunksCanFit);
+
+      int nChecksumRead = checksumIn.read(checksumBuf, 0, checksumSize * checksumsToRead);
+
+      if (nChecksumRead !=  checksumSize * checksumsToRead) {
+        throw new IOException("Could not read checksum at offset " +
+            checksumIn.getChannel().position() + " from the meta file.");
+      }	
+    }
+    else {
+      nRead = dataIn.read(buf, offset, len);
+    }
+
+
+    if (nRead < bytesPerChecksum) {
+      eos = true;
+    }
+
+    lastChunkOffset += nRead;
+    lastChunkLen = nRead;
+
+    return nRead;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    IOUtils.closeStream(dataIn);
+    IOUtils.closeStream(checksumIn);
+  }
+
+  @Override
+  public Socket takeSocket() {
+    return null;
+  }
+
+  @Override
+  public boolean hasSentStatusCode() {
+    return false;
+  }
+}
\ No newline at end of file

Propchange: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Mon Apr 23 21:37:55 2012
@@ -29,14 +29,20 @@ import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
 import java.net.Socket;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
@@ -78,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -96,7 +103,6 @@ import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -138,6 +144,7 @@ public class DFSClient implements FSCons
   int socketTimeout;
   final int writePacketSize;
   final FileSystem.Statistics stats;
+  boolean shortCircuitLocalReads;
   final int hdfsTimeout;    // timeout value for a DFS operation.
 
   final SocketCache socketCache;
@@ -218,6 +225,21 @@ public class DFSClient implements FSCons
         NetUtils.getDefaultSocketFactory(conf), socketTimeout);
   }
         
+  /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
+  static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf, int socketTimeout)
+  throws IOException {
+    InetSocketAddress addr = NetUtils.createSocketAddr(
+        datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+    if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
+      ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
+    }
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
+        ClientDatanodeProtocol.versionID, addr, ugi, conf, NetUtils
+        .getDefaultSocketFactory(conf), socketTimeout);
+  }
+
   /**
    * Same as this(NameNode.getAddress(conf), conf);
    * @see #DFSClient(InetSocketAddress, Configuration)
@@ -290,6 +312,13 @@ public class DFSClient implements FSCons
           "Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
           + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
     }
+    // read directly from the block file if configured.
+    this.shortCircuitLocalReads = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Short circuit read is " + shortCircuitLocalReads);
+    }
   }
 
   /**
@@ -399,7 +428,6 @@ public class DFSClient implements FSCons
     }
   }
 
-  
   /**
    * @see ClientProtocol#getDelegationToken(Text)
    */
@@ -425,6 +453,38 @@ public class DFSClient implements FSCons
     }
   }
 
+  private static Set<String> localIpAddresses = Collections
+  .synchronizedSet(new HashSet<String>());
+
+  static boolean isLocalAddress(InetSocketAddress targetAddr) {
+    InetAddress addr = targetAddr.getAddress();
+    if (localIpAddresses.contains(addr.getHostAddress())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Address " + targetAddr + " is local");
+      }
+      return true;
+    }
+
+    // Check if the address is any local or loop back
+    boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+    // Check if the address is defined on any interface
+    if (!local) {
+      try {
+        local = NetworkInterface.getByInetAddress(addr) != null;
+      } catch (SocketException e) {
+        local = false;
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Address " + targetAddr + " is local");
+    }
+    if (local == true) {
+      localIpAddresses.add(addr.getHostAddress());
+    }
+    return local;
+  }
+
   /**
    * @see ClientProtocol#cancelDelegationToken(Token)
    */
@@ -1555,4 +1615,29 @@ public class DFSClient implements FSCons
     return getClass().getSimpleName() + "[clientName=" + clientName
         + ", ugi=" + ugi + "]"; 
   }
+
+  boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
+      throws IOException {
+    if (shortCircuitLocalReads && DFSClient.isLocalAddress(targetAddr)) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Get {@link BlockReader} for short circuited local reads.
+   */
+   BlockReader getLocalBlockReader(
+      String src, Block blk, Token<BlockTokenIdentifier> accessToken,
+      DatanodeInfo chosenNode,  long offsetIntoBlock)
+  throws InvalidToken, IOException {
+    try {
+      return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
+          chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
+          - offsetIntoBlock);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(InvalidToken.class,
+          AccessControlException.class);
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Apr 23 21:37:55 2012
@@ -207,6 +207,10 @@ public class DFSConfigKeys extends Commo
   public static final long    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 21600000;
   public static final String  DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
   public static final int     DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
+  public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
+  public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
+  public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
+  public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -237,4 +241,5 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.https.principal";
   public static final String  DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
   public static final int     DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
+  public static final String  DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
 }

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Mon Apr 23 21:37:55 2012
@@ -35,12 +35,14 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 
@@ -383,9 +385,33 @@ public class DFSInputStream extends FSIn
       chosenNode = retval.info;
       InetSocketAddress targetAddr = retval.addr;
 
-      try {
+      // try getting a local blockReader. if this fails, then go via
+      // the datanode
         Block blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+      if (dfsClient.shouldTryShortCircuitRead(targetAddr)) {
+        try {
+          blockReader = dfsClient.getLocalBlockReader( src, blk, accessToken,
+              chosenNode,  offsetIntoBlock);
+          return chosenNode;
+        } catch (AccessControlException ex) {
+          DFSClient.LOG.warn("Short circuit access failed ", ex);
+          //Disable short circuit reads
+          dfsClient.shortCircuitLocalReads = false;
+        } catch (IOException ex) {
+          if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
+            /* Get a new access token and retry. */
+            refetchToken--;
+            fetchBlockAt(target);
+            continue;
+          } else {
+            DFSClient.LOG.info("Failed to read block " + targetBlock.getBlock()
+                + " on local machine" + StringUtils.stringifyException(ex));
+            DFSClient.LOG.info("Try reading via the datanode on " + targetAddr);
+          }
+        }
+      }
+      try {
         
         blockReader = getBlockReader(
             targetAddr, src, blk,
@@ -394,20 +420,7 @@ public class DFSInputStream extends FSIn
             buffersize, verifyChecksum, dfsClient.clientName);
         return chosenNode;
       } catch (IOException ex) {
-        if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
-          DFSClient.LOG.info("Will fetch a new access token and retry, " 
-              + "access token was invalid when connecting to " + targetAddr
-              + " : " + ex);
-          /*
-           * Get a new access token and retry. Retry is needed in 2 cases. 1)
-           * When both NN and DN re-started while DFSClient holding a cached
-           * access token. 2) In the case that NN fails to update its
-           * access key at pre-set interval (by a wide margin) and
-           * subsequently restarts. In this case, DN re-registers itself with
-           * NN and receives a new access key, but DN will delete the old
-           * access key from its memory since it's considered expired based on
-           * the estimated expiration date.
-           */
+        if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
           refetchToken--;
           fetchBlockAt(target);
         } else {
@@ -610,14 +623,27 @@ public class DFSInputStream extends FSIn
           
       try {
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
-            
         int len = (int) (end - start + 1);
 
+        if (dfsClient.shouldTryShortCircuitRead(targetAddr)) {
+          try {
+            reader = dfsClient.getLocalBlockReader( src, block.getBlock(),
+                blockToken, chosenNode, start);
+          } catch (AccessControlException ex) {
+            DFSClient.LOG.warn("Short circuit access failed ", ex);
+            //Disable short circuit reads
+            dfsClient.shortCircuitLocalReads = false;
+            continue;
+          }
+        } else {
+          // go to the datanode
         reader = getBlockReader(targetAddr, src,
                                 block.getBlock(),
                                 blockToken,
                                 start, len, buffersize,
                                 verifyChecksum, dfsClient.clientName);
+        }
+
         int nread = reader.readAll(buf, offset, len);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
@@ -630,10 +656,7 @@ public class DFSInputStream extends FSIn
                  e.getPos() + " from " + chosenNode.getName());
         dfsClient.reportChecksumFailure(src, block.getBlock(), chosenNode);
       } catch (IOException e) {
-        if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
-          DFSClient.LOG.info("Will get a new access token and retry, "
-              + "access token was invalid when connecting to " + targetAddr
-              + " : " + e);
+        if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
           refetchToken--;
           fetchBlockAt(block.getStartOffset());
           continue;
@@ -723,7 +746,7 @@ public class DFSInputStream extends FSIn
       try {
         // The OP_READ_BLOCK request is sent as we make the BlockReader
         BlockReader reader =
-            BlockReader.newBlockReader(sock, file, block,
+          RemoteBlockReader.newBlockReader(sock, file, block,
                                        blockToken,
                                        startOffset, len,
                                        bufferSize, verifyChecksum,
@@ -740,7 +763,6 @@ public class DFSInputStream extends FSIn
     throw err;
   }
 
-
   /**
    * Read bytes starting from the specified position.
    * 
@@ -914,6 +936,33 @@ public class DFSInputStream extends FSIn
   }
 
   /**
+   * Should the block access token be refetched on an exception
+   * 
+   * @param ex Exception received
+   * @param targetAddr Target datanode address from where exception was received
+   * @return true if block access token has expired or invalid and it should be
+   *         refetched
+   */
+  private static boolean tokenRefetchNeeded(IOException ex,
+      InetSocketAddress targetAddr) {
+    /*
+     * Get a new access token and retry. Retry is needed in 2 cases. 1) When
+     * both NN and DN re-started while DFSClient holding a cached access token.
+     * 2) In the case that NN fails to update its access key at pre-set interval
+     * (by a wide margin) and subsequently restarts. In this case, DN
+     * re-registers itself with NN and receives a new access key, but DN will
+     * delete the old access key from its memory since it's considered expired
+     * based on the estimated expiration date.
+     */
+    if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
+      DFSClient.LOG.info("Access token was invalid when connecting to " + targetAddr
+          + " : " + ex);
+      return true;
+    }
+    return false;
+  }
+
+  /**
    * Pick the best node from which to stream the data.
    * Entries in <i>nodes</i> are already in the priority order
    */

Added: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1329468&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (added)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Mon Apr 23 21:37:55 2012
@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/** This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ *   <dd>The hdfs block, typically large (~64MB).
+ *   </dd>
+ * <dt>chunk</dt>
+ *   <dd>A block is divided into chunks, each comes with a checksum.
+ *       We want transfers to be chunk-aligned, to be able to
+ *       verify checksums.
+ *   </dd>
+ * <dt>packet</dt>
+ *   <dd>A grouping of chunks used for transport. It contains a
+ *       header, followed by checksum data, followed by real data.
+ *   </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ */
+@InterfaceAudience.Private
+public class RemoteBlockReader extends FSInputChecker implements BlockReader  {
+  public static final Log LOG = LogFactory.getLog(RemoteBlockReader.class);
+
+  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  private DataInputStream in;
+  private DataChecksum checksum;
+
+  /** offset in block of the last chunk received */
+  private long lastChunkOffset = -1;
+  private long lastChunkLen = -1;
+  private long lastSeqNo = -1;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+
+  /** offset in block of of first chunk - may be less than startOffset
+      if startOffset is not chunk-aligned */
+  private  long firstChunkOffset;
+
+  private int bytesPerChecksum;
+  private int checksumSize;
+
+  /**
+   * The total number of bytes we need to transfer from the DN.
+   * This is the amount that the user has requested plus some padding
+   * at the beginning so that the read can begin on a chunk boundary.
+   */
+  private  long bytesNeededToFinish;
+
+  private boolean eos = false;
+  private boolean sentStatusCode = false;
+
+  byte[] skipBuf = null;
+  ByteBuffer checksumBytes = null;
+  /** Amount of unread data in the current received packet */
+  int dataLeft = 0;
+
+  /* FSInputChecker interface */
+
+  /* same interface as inputStream java.io.InputStream#read()
+   * used by DFSInputStream#read()
+   * This violates one rule when there is a checksum error:
+   * "Read should not modify user buffer before successful read"
+   * because it first reads the data to user buffer and then checks
+   * the checksum.
+   */
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) 
+  throws IOException {
+
+    // This has to be set here, *before* the skip, since we can
+    // hit EOS during the skip, in the case that our entire read
+    // is smaller than the checksum chunk.
+    boolean eosBefore = eos;
+
+    //for the first read, skip the extra bytes at the front.
+    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+      // Skip these bytes. But don't call this.skip()!
+      int toSkip = (int)(startOffset - firstChunkOffset);
+      if ( skipBuf == null ) {
+        skipBuf = new byte[bytesPerChecksum];
+      }
+      if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
+        // should never happen
+        throw new IOException("Could not skip required number of bytes");
+      }
+    }
+
+    int nRead = super.read(buf, off, len);
+
+    // if eos was set in the previous read, send a status code to the DN
+    if ( dnSock != null && eos && !eosBefore && nRead >= 0) {
+      if (needChecksum()) {
+        sendReadResult(dnSock, CHECKSUM_OK);
+      } else {
+        sendReadResult(dnSock, SUCCESS);
+      }
+    }
+    return nRead;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    /* How can we make sure we don't throw a ChecksumException, at least
+     * in majority of the cases?. This one throws. */  
+    if ( skipBuf == null ) {
+      skipBuf = new byte[bytesPerChecksum]; 
+    }
+
+    long nSkipped = 0;
+    while ( nSkipped < n ) {
+      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+      int ret = read(skipBuf, 0, toSkip);
+      if ( ret <= 0 ) {
+        return nSkipped;
+      }
+      nSkipped += ret;
+    }
+    return nSkipped;
+  }
+
+  @Override
+  public int read() throws IOException {
+    throw new IOException("read() is not expected to be invoked. " +
+    "Use read(buf, off, len) instead.");
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    /* Checksum errors are handled outside the BlockReader. 
+     * DFSInputStream does not always call 'seekToNewSource'. In the 
+     * case of pread(), it just tries a different replica without seeking.
+     */ 
+    return false;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    throw new IOException("Seek() is not supported in BlockInputChecker");
+  }
+
+  @Override
+  protected long getChunkPosition(long pos) {
+    throw new RuntimeException("getChunkPosition() is not supported, " +
+    "since seek is not required");
+  }
+
+  /**
+   * Makes sure that checksumBytes has enough capacity 
+   * and limit is set to the number of checksum bytes needed 
+   * to be read.
+   */
+  private void adjustChecksumBytes(int dataLen) {
+    int requiredSize = 
+      ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
+    } else {
+      checksumBytes.clear();
+    }
+    checksumBytes.limit(requiredSize);
+  }
+
+  @Override
+  protected synchronized int readChunk(long pos, byte[] buf, int offset, 
+      int len, byte[] checksumBuf) 
+  throws IOException {
+    // Read one chunk.
+    if (eos) {
+      // Already hit EOF
+      return -1;
+    }
+
+    // Read one DATA_CHUNK.
+    long chunkOffset = lastChunkOffset;
+    if ( lastChunkLen > 0 ) {
+      chunkOffset += lastChunkLen;
+    }
+
+    // pos is relative to the start of the first chunk of the read.
+    // chunkOffset is relative to the start of the block.
+    // This makes sure that the read passed from FSInputChecker is the
+    // for the same chunk we expect to be reading from the DN.
+    if ( (pos + firstChunkOffset) != chunkOffset ) {
+      throw new IOException("Mismatch in pos : " + pos + " + " + 
+          firstChunkOffset + " != " + chunkOffset);
+    }
+
+    // Read next packet if the previous packet has been read completely.
+    if (dataLeft <= 0) {
+      //Read packet headers.
+      PacketHeader header = new PacketHeader();
+      header.readFields(in);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("DFSClient readChunk got header " + header);
+      }
+
+      // Sanity check the lengths
+      if (!header.sanityCheck(lastSeqNo)) {
+        throw new IOException("BlockReader: error in packet header " +
+            header);
+      }
+
+      lastSeqNo = header.getSeqno();
+      dataLeft = header.getDataLen();
+      adjustChecksumBytes(header.getDataLen());
+      if (header.getDataLen() > 0) {
+        IOUtils.readFully(in, checksumBytes.array(), 0,
+            checksumBytes.limit());
+      }
+    }
+
+    // Sanity checks
+    assert len >= bytesPerChecksum;
+    assert checksum != null;
+    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+    int checksumsToRead, bytesToRead;
+
+    if (checksumSize > 0) {
+
+      // How many chunks left in our packet - this is a ceiling
+      // since we may have a partial chunk at the end of the file
+      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+      // How many chunks we can fit in databuffer
+      //  - note this is a floor since we always read full chunks
+      int chunksCanFit = Math.min(len / bytesPerChecksum,
+          checksumBuf.length / checksumSize);
+
+      // How many chunks should we read
+      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+      // How many bytes should we actually read
+      bytesToRead = Math.min(
+          checksumsToRead * bytesPerChecksum, // full chunks
+          dataLeft); // in case we have a partial
+    } else {
+      // no checksum
+      bytesToRead = Math.min(dataLeft, len);
+      checksumsToRead = 0;
+    }
+
+    if ( bytesToRead > 0 ) {
+      // Assert we have enough space
+      assert bytesToRead <= len;
+      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+      assert checksumBuf.length >= checksumSize * checksumsToRead;
+      IOUtils.readFully(in, buf, offset, bytesToRead);
+      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+    }
+
+    dataLeft -= bytesToRead;
+    assert dataLeft >= 0;
+
+    lastChunkOffset = chunkOffset;
+    lastChunkLen = bytesToRead;
+
+    // If there's no data left in the current packet after satisfying
+    // this read, and we have satisfied the client read, we expect
+    // an empty packet header from the DN to signify this.
+    // Note that pos + bytesToRead may in fact be greater since the
+    // DN finishes off the entire last chunk.
+    if (dataLeft == 0 &&
+        pos + bytesToRead >= bytesNeededToFinish) {
+
+      // Read header
+      int packetLen = in.readInt();
+      long offsetInBlock = in.readLong();
+      long seqno = in.readLong();
+      boolean lastPacketInBlock = in.readBoolean();
+      int dataLen = in.readInt();
+
+      if (!lastPacketInBlock ||
+          dataLen != 0) {
+        throw new IOException("Expected empty end-of-read packet! Header: " +
+            "(packetLen : " + packetLen + 
+            ", offsetInBlock : " + offsetInBlock +
+            ", seqno : " + seqno + 
+            ", lastInBlock : " + lastPacketInBlock +
+            ", dataLen : " + dataLen);
+      }
+
+      eos = true;
+    }
+
+    if ( bytesToRead == 0 ) {
+      return -1;
+    }
+
+    return bytesToRead;
+  }
+
+  private RemoteBlockReader( String file, long blockId, DataInputStream in, 
+      DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset,
+      long bytesToRead,
+      Socket dnSock ) {
+    // Path is used only for printing block and file information in debug
+    super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
+        1, verifyChecksum,
+        checksum.getChecksumSize() > 0? checksum : null, 
+            checksum.getBytesPerChecksum(),
+            checksum.getChecksumSize());
+
+    this.dnSock = dnSock;
+    this.in = in;
+    this.checksum = checksum;
+    this.startOffset = Math.max( startOffset, 0 );
+
+    // The total number of bytes that we need to transfer from the DN is
+    // the amount that the user wants (bytesToRead), plus the padding at
+    // the beginning in order to chunk-align. Note that the DN may elect
+    // to send more than this amount if the read starts/ends mid-chunk.
+    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
+    this.firstChunkOffset = firstChunkOffset;
+    lastChunkOffset = firstChunkOffset;
+    lastChunkLen = -1;
+
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+  }
+  /**
+   * Public constructor 
+   */  
+  RemoteBlockReader(Path file, int numRetries) {
+    super(file, numRetries);
+  }
+
+  protected RemoteBlockReader(Path file, int numRetries, DataChecksum checksum,
+      boolean verifyChecksum) {
+    super(file,
+        numRetries,
+        verifyChecksum,
+        checksum.getChecksumSize() > 0? checksum : null,
+            checksum.getBytesPerChecksum(),
+            checksum.getChecksumSize());
+  }
+
+  public static RemoteBlockReader newBlockReader(Socket sock, String file,
+      Block block, Token<BlockTokenIdentifier> blockToken, 
+      long startOffset, long len, int bufferSize) throws IOException {
+    return newBlockReader(sock, file, block, blockToken, startOffset, len, bufferSize,
+        true);
+  }
+
+  /** Java Doc required */
+  public static RemoteBlockReader newBlockReader( Socket sock, String file, 
+      Block block, 
+      Token<BlockTokenIdentifier> blockToken,
+      long startOffset, long len,
+      int bufferSize, boolean verifyChecksum)
+  throws IOException {
+    return newBlockReader(sock, file, block, blockToken, startOffset,
+        len, bufferSize, verifyChecksum, "");
+  }
+
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance, or null on error.
+   */
+  public static RemoteBlockReader newBlockReader( Socket sock, String file,
+      Block block, 
+      Token<BlockTokenIdentifier> blockToken,
+      long startOffset, long len,
+      int bufferSize, boolean verifyChecksum,
+      String clientName)
+  throws IOException {
+    // in and out will be closed when sock is closed (by the caller)
+    DataTransferProtocol.Sender.opReadBlock(
+        new DataOutputStream(new BufferedOutputStream(
+            NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
+            block, startOffset, len, clientName, blockToken);
+
+    //
+    // Get bytes in block, set streams
+    //
+
+    DataInputStream in = new DataInputStream(
+        new BufferedInputStream(NetUtils.getInputStream(sock), 
+            bufferSize));
+
+    DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
+    if (status != SUCCESS) {
+      if (status == ERROR_ACCESS_TOKEN) {
+        throw new InvalidBlockTokenException(
+            "Got access token error for OP_READ_BLOCK, self="
+            + sock.getLocalSocketAddress() + ", remote="
+            + sock.getRemoteSocketAddress() + ", for file " + file
+            + ", for block " + block.getBlockId() 
+            + "_" + block.getGenerationStamp());
+      } else {
+        throw new IOException("Got error for OP_READ_BLOCK, self="
+            + sock.getLocalSocketAddress() + ", remote="
+            + sock.getRemoteSocketAddress() + ", for file " + file
+            + ", for block " + block.getBlockId() + "_" 
+            + block.getGenerationStamp());
+      }
+    }
+    DataChecksum checksum = DataChecksum.newDataChecksum( in );
+    //Warning when we get CHECKSUM_NULL?
+
+    // Read the first chunk offset.
+    long firstChunkOffset = in.readLong();
+
+    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+        firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
+      throw new IOException("BlockReader: error in first chunk offset (" +
+          firstChunkOffset + ") startOffset is " + 
+          startOffset + " for file " + file);
+    }
+
+    return new RemoteBlockReader(file, block.getBlockId(), in, checksum,
+        verifyChecksum, startOffset, firstChunkOffset, len, sock);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    startOffset = -1;
+    checksum = null;
+    if (dnSock != null) {
+      dnSock.close();
+    }
+
+    // in will be closed when its Socket is closed.
+  }
+
+  /** kind of like readFully(). Only reads as much as possible.
+   * And allows use of protected readFully().
+   */
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return readFully(this, buf, offset, len);
+  }
+
+  /**
+   * Take the socket used to talk to the DN.
+   */
+  public Socket takeSocket() {
+    assert hasSentStatusCode() :
+      "BlockReader shouldn't give back sockets mid-read";
+    Socket res = dnSock;
+    dnSock = null;
+    return res;
+  }
+
+  /**
+   * Whether the BlockReader has reached the end of its input stream
+   * and successfully sent a status code back to the datanode.
+   */
+  public boolean hasSentStatusCode() {
+    return sentStatusCode;
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + sock;
+  try {
+    OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
+    statusCode.writeOutputStream(out);
+    out.flush();
+    sentStatusCode = true;
+  } catch (IOException e) {
+    // It's ok not to be able to send this. But something is probably wrong.
+    LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+        sock.getInetAddress() + ": " + e.getMessage());
+  }
+  }
+
+  // File name to print when accessing a block directory from servlets
+  public static String getFileName(final InetSocketAddress s,
+      final long blockId) {
+    return s.toString() + ":" + blockId;
+  }
+}

Propchange: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java?rev=1329468&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java (added)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java Mon Apr 23 21:37:55 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A block and the full path information to the block data file and
+ * the metadata file stored on the local file system.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockLocalPathInfo implements Writable {
+  static final WritableFactory FACTORY = new WritableFactory() {
+    public Writable newInstance() { return new BlockLocalPathInfo(); }
+  };
+  static {                                      // register a ctor
+    WritableFactories.setFactory(BlockLocalPathInfo.class, FACTORY);
+  }
+
+  private Block block;
+  private String localBlockPath = "";  // local file storing the data
+  private String localMetaPath = "";   // local file storing the checksum
+
+  public BlockLocalPathInfo() {}
+
+  /**
+   * Constructs BlockLocalPathInfo.
+   * @param b The block corresponding to this lock path info.
+   * @param file Block data file.
+   * @param metafile Metadata file for the block.
+   */
+  public BlockLocalPathInfo(Block b, String file, String metafile) {
+    block = b;
+    localBlockPath = file;
+    localMetaPath = metafile;
+  }
+
+  /**
+   * Get the Block data file.
+   * @return Block data file.
+   */
+  public String getBlockPath() {return localBlockPath;}
+
+  /**
+   * Get the Block metadata file.
+   * @return Block metadata file.
+   */
+  public String getMetaPath() {return localMetaPath;}
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    block.write(out);
+    Text.writeString(out, localBlockPath);
+    Text.writeString(out, localMetaPath);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    block = new Block();
+    block.readFields(in);
+    localBlockPath = Text.readString(in);
+    localMetaPath = Text.readString(in);
+  }
+
+  /**
+   * Get number of bytes in the block.
+   * @return Number of bytes in the block.
+   */
+  public long getNumBytes() {
+    return block.getNumBytes();
+  }
+}

Propchange: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Mon Apr 23 21:37:55 2012
@@ -26,20 +26,51 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
 
 /** An client-datanode protocol for block recovery
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
 @TokenInfo(BlockTokenSelector.class)
 public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 6: recoverBlock() removed.
+   * 7: added getBlockLocalPathInfo.
    */
-  public static final long versionID = 6L;
+  public static final long versionID = 7L;
 
   /** Return the visible length of a replica. */
   long getReplicaVisibleLength(Block b) throws IOException;
+
+  /**
+   * Retrieves the path names of the block file and metadata file stored on the
+   * local file system.
+   * 
+   * In order for this method to work, one of the following should be satisfied:
+   * <ul>
+   * <li>
+   * The client user must be configured at the datanode to be able to use this
+   * method.</li>
+   * <li>
+   * When security is enabled, kerberos authentication must be used to connect
+   * to the datanode.</li>
+   * </ul>
+   * 
+   * @param block
+   *          the specified block on the local datanode
+   * @param token 
+   *          the block access token.
+   * @return the BlockLocalPathInfo of a block
+   * @throws IOException
+   *           on error
+   */
+  BlockLocalPathInfo getBlockLocalPathInfo(Block block,
+      Token<BlockTokenIdentifier> token) throws IOException;   
 }

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Mon Apr 23 21:37:55 2012
@@ -26,8 +26,8 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URL;
 import java.net.URLEncoder;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -43,6 +43,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.RemoteBlockReader;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -192,8 +193,8 @@ public class JspHelper {
       long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);     
       
       // Use the block name for file name. 
-      String file = BlockReader.getFileName(addr, blockId);
-      BlockReader blockReader = BlockReader.newBlockReader(s, file,
+    String file = RemoteBlockReader.getFileName(addr, blockId);
+    BlockReader blockReader = RemoteBlockReader.newBlockReader(s, file,
         new Block(blockId, 0, genStamp), blockToken,
         offsetIntoBlock, amtToRead, conf.getInt("io.file.buffer.size", 4096));
         

Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1329468&r1=1329467&r2=1329468&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Mon Apr 23 21:37:55 2012
@@ -26,6 +26,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
 
 /**
@@ -33,7 +35,9 @@ import org.apache.hadoop.util.DataChecks
  * This is not related to the Block related functionality in Namenode.
  * The biggest part of data block metadata is CRC for the block.
  */
-class BlockMetadataHeader {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMetadataHeader {
 
   static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
   
@@ -50,11 +54,13 @@ class BlockMetadataHeader {
     this.version = version;
   }
     
-  short getVersion() {
+  /** Get the version */
+  public short getVersion() {
     return version;
   }
 
-  DataChecksum getChecksum() {
+  /** Get the version */
+  public DataChecksum getChecksum() {
     return checksum;
   }
 
@@ -65,7 +71,7 @@ class BlockMetadataHeader {
    * @return Metadata Header
    * @throws IOException
    */
-  static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+  public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
     return readHeader(in.readShort(), in);
   }