You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:40 UTC

[28/50] [abbrv] hadoop git commit: HDFS-8925. Move BlockReaderLocal to hdfs-client. Contributed by Mingliang Liu.

HDFS-8925. Move BlockReaderLocal to hdfs-client. Contributed by Mingliang Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e2c9b288
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e2c9b288
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e2c9b288

Branch: refs/heads/HDFS-7285
Commit: e2c9b288b223b9fd82dc12018936e13128413492
Parents: b94b568
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Aug 28 14:20:55 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Aug 28 14:38:36 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/BlockReader.java     | 102 +++
 .../apache/hadoop/hdfs/BlockReaderLocal.java    | 743 +++++++++++++++++++
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     | 738 ++++++++++++++++++
 .../org/apache/hadoop/hdfs/BlockReaderUtil.java |  57 ++
 .../org/apache/hadoop/hdfs/ClientContext.java   | 196 +++++
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   |  68 ++
 .../apache/hadoop/hdfs/ExternalBlockReader.java | 120 +++
 .../apache/hadoop/hdfs/KeyProviderCache.java    | 112 +++
 .../java/org/apache/hadoop/hdfs/PeerCache.java  | 291 ++++++++
 .../hadoop/hdfs/client/BlockReportOptions.java  |  59 ++
 .../hdfs/client/HdfsClientConfigKeys.java       |   5 +
 .../hdfs/protocol/BlockLocalPathInfo.java       |  70 ++
 .../hdfs/protocol/ClientDatanodeProtocol.java   | 152 ++++
 .../InvalidEncryptionKeyException.java          |  40 +
 .../protocolPB/ClientDatanodeProtocolPB.java    |  37 +
 .../ClientDatanodeProtocolTranslatorPB.java     | 326 ++++++++
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  13 +
 .../token/block/BlockTokenSelector.java         |  48 ++
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/BlockReader.java     | 102 ---
 .../apache/hadoop/hdfs/BlockReaderLocal.java    | 741 ------------------
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     | 735 ------------------
 .../org/apache/hadoop/hdfs/BlockReaderUtil.java |  57 --
 .../org/apache/hadoop/hdfs/ClientContext.java   | 195 -----
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  14 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |   2 +-
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  67 +-
 .../apache/hadoop/hdfs/ExternalBlockReader.java | 120 ---
 .../apache/hadoop/hdfs/KeyProviderCache.java    | 111 ---
 .../java/org/apache/hadoop/hdfs/PeerCache.java  | 290 --------
 .../hadoop/hdfs/client/BlockReportOptions.java  |  59 --
 .../hdfs/protocol/BlockLocalPathInfo.java       |  70 --
 .../hdfs/protocol/ClientDatanodeProtocol.java   | 152 ----
 .../InvalidEncryptionKeyException.java          |  40 -
 .../hdfs/protocol/datatransfer/Receiver.java    |  15 +-
 .../protocolPB/ClientDatanodeProtocolPB.java    |  37 -
 ...tDatanodeProtocolServerSideTranslatorPB.java |   6 +-
 .../ClientDatanodeProtocolTranslatorPB.java     | 326 --------
 ...tNamenodeProtocolServerSideTranslatorPB.java |  14 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   2 +-
 ...rDatanodeProtocolServerSideTranslatorPB.java |   2 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  26 +-
 .../token/block/BlockTokenSelector.java         |  48 --
 .../hadoop/hdfs/server/datanode/DNConf.java     |   4 +-
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java  |   3 +-
 .../hadoop/hdfs/TestBlockReaderLocal.java       |  30 +-
 .../hadoop/hdfs/TestBlockReaderLocalLegacy.java |   2 +-
 .../hadoop/hdfs/TestDFSClientRetries.java       |   2 +-
 .../hadoop/hdfs/protocolPB/TestPBHelper.java    |   4 +-
 .../security/token/block/TestBlockToken.java    |  10 +-
 .../shortcircuit/TestShortCircuitLocalRead.java |   4 +-
 51 files changed, 3243 insertions(+), 3227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
new file mode 100644
index 0000000..aa3e8ba
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+
+/**
+ * A BlockReader is responsible for reading a single block
+ * from a single datanode.
+ */
+@InterfaceAudience.Private
+public interface BlockReader extends ByteBufferReadable {
+  
+
+  /* same interface as inputStream java.io.InputStream#read()
+   * used by DFSInputStream#read()
+   * This violates one rule when there is a checksum error:
+   * "Read should not modify user buffer before successful read"
+   * because it first reads the data to user buffer and then checks
+   * the checksum.
+   * Note: this must return -1 on EOF, even in the case of a 0-byte read.
+   * See HDFS-5762 for details.
+   */
+  int read(byte[] buf, int off, int len) throws IOException;
+
+  /**
+   * Skip the given number of bytes
+   */
+  long skip(long n) throws IOException;
+
+  /**
+   * Returns an estimate of the number of bytes that can be read
+   * (or skipped over) from this input stream without performing
+   * network I/O.
+   * This may return more than what is actually present in the block.
+   */
+  int available() throws IOException;
+
+  /**
+   * Close the block reader.
+   *
+   * @throws IOException
+   */
+  void close() throws IOException;
+
+  /**
+   * Read exactly the given amount of data, throwing an exception
+   * if EOF is reached before that amount
+   */
+  void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException;
+
+  /**
+   * Similar to {@link #readFully(byte[], int, int)} except that it will
+   * not throw an exception on EOF. However, it differs from the simple
+   * {@link #read(byte[], int, int)} call in that it is guaranteed to
+   * read the data if it is available. In other words, if this call
+   * does not throw an exception, then either the buffer has been
+   * filled or the next call will return EOF.
+   */
+  int readAll(byte[] buf, int offset, int len) throws IOException;
+
+  /**
+   * @return              true only if this is a local read.
+   */
+  boolean isLocal();
+  
+  /**
+   * @return              true only if this is a short-circuit read.
+   *                      All short-circuit reads are also local.
+   */
+  boolean isShortCircuit();
+
+  /**
+   * Get a ClientMmap object for this BlockReader.
+   *
+   * @param opts          The read options to use.
+   * @return              The ClientMmap object, or null if mmap is not
+   *                      supported.
+   */
+  ClientMmap getClientMmap(EnumSet<ReadOption> opts);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
new file mode 100644
index 0000000..2a0e21b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -0,0 +1,743 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DirectBufferPool;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlockReaderLocal enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ * {@link BlockReaderLocal} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the file descriptors for the metadata file and the data 
+ * file for the block using
+ * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
+ * </li>
+ * <li>The client reads the file descriptors.</li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+class BlockReaderLocal implements BlockReader {
+  static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class);
+
+  private static final DirectBufferPool bufferPool = new DirectBufferPool();
+
+  public static class Builder {
+    private final int bufferSize;
+    private boolean verifyChecksum;
+    private int maxReadahead;
+    private String filename;
+    private ShortCircuitReplica replica;
+    private long dataPos;
+    private ExtendedBlock block;
+    private StorageType storageType;
+
+    public Builder(ShortCircuitConf conf) {
+      this.maxReadahead = Integer.MAX_VALUE;
+      this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
+      this.bufferSize = conf.getShortCircuitBufferSize();
+    }
+
+    public Builder setVerifyChecksum(boolean verifyChecksum) {
+      this.verifyChecksum = verifyChecksum;
+      return this;
+    }
+
+    public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
+      long readahead = cachingStrategy.getReadahead() != null ?
+          cachingStrategy.getReadahead() :
+              HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
+      this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
+      return this;
+    }
+
+    public Builder setFilename(String filename) {
+      this.filename = filename;
+      return this;
+    }
+
+    public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
+      this.replica = replica;
+      return this;
+    }
+
+    public Builder setStartOffset(long startOffset) {
+      this.dataPos = Math.max(0, startOffset);
+      return this;
+    }
+
+    public Builder setBlock(ExtendedBlock block) {
+      this.block = block;
+      return this;
+    }
+
+    public Builder setStorageType(StorageType storageType) {
+      this.storageType = storageType;
+      return this;
+    }
+
+    public BlockReaderLocal build() {
+      Preconditions.checkNotNull(replica);
+      return new BlockReaderLocal(this);
+    }
+  }
+
+  private boolean closed = false;
+
+  /**
+   * Pair of streams for this block.
+   */
+  private final ShortCircuitReplica replica;
+
+  /**
+   * The data FileChannel.
+   */
+  private final FileChannel dataIn;
+
+  /**
+   * The next place we'll read from in the block data FileChannel.
+   *
+   * If data is buffered in dataBuf, this offset will be larger than the
+   * offset of the next byte which a read() operation will give us.
+   */
+  private long dataPos;
+
+  /**
+   * The Checksum FileChannel.
+   */
+  private final FileChannel checksumIn;
+  
+  /**
+   * Checksum type and size.
+   */
+  private final DataChecksum checksum;
+
+  /**
+   * If false, we will always skip the checksum.
+   */
+  private final boolean verifyChecksum;
+
+  /**
+   * Name of the block, for logging purposes.
+   */
+  private final String filename;
+  
+  /**
+   * Block ID and Block Pool ID.
+   */
+  private final ExtendedBlock block;
+  
+  /**
+   * Cache of Checksum#bytesPerChecksum.
+   */
+  private final int bytesPerChecksum;
+
+  /**
+   * Cache of Checksum#checksumSize.
+   */
+  private final int checksumSize;
+
+  /**
+   * Maximum number of chunks to allocate.
+   *
+   * This is used to allocate dataBuf and checksumBuf, in the event that
+   * we need them.
+   */
+  private final int maxAllocatedChunks;
+
+  /**
+   * True if zero readahead was requested.
+   */
+  private final boolean zeroReadaheadRequested;
+
+  /**
+   * Maximum amount of readahead we'll do.  This will always be at least the,
+   * size of a single chunk, even if {@link #zeroReadaheadRequested} is true.
+   * The reason is because we need to do a certain amount of buffering in order
+   * to do checksumming.
+   * 
+   * This determines how many bytes we'll use out of dataBuf and checksumBuf.
+   * Why do we allocate buffers, and then (potentially) only use part of them?
+   * The rationale is that allocating a lot of buffers of different sizes would
+   * make it very difficult for the DirectBufferPool to re-use buffers. 
+   */
+  private final int maxReadaheadLength;
+
+  /**
+   * Buffers data starting at the current dataPos and extending on
+   * for dataBuf.limit().
+   *
+   * This may be null if we don't need it.
+   */
+  private ByteBuffer dataBuf;
+
+  /**
+   * Buffers checksums starting at the current checksumPos and extending on
+   * for checksumBuf.limit().
+   *
+   * This may be null if we don't need it.
+   */
+  private ByteBuffer checksumBuf;
+
+  /**
+   * StorageType of replica on DataNode.
+   */
+  private StorageType storageType;
+
+  private BlockReaderLocal(Builder builder) {
+    this.replica = builder.replica;
+    this.dataIn = replica.getDataStream().getChannel();
+    this.dataPos = builder.dataPos;
+    this.checksumIn = replica.getMetaStream().getChannel();
+    BlockMetadataHeader header = builder.replica.getMetaHeader();
+    this.checksum = header.getChecksum();
+    this.verifyChecksum = builder.verifyChecksum &&
+        (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
+    this.filename = builder.filename;
+    this.block = builder.block;
+    this.bytesPerChecksum = checksum.getBytesPerChecksum();
+    this.checksumSize = checksum.getChecksumSize();
+
+    this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
+        ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
+    // Calculate the effective maximum readahead.
+    // We can't do more readahead than there is space in the buffer.
+    int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
+        ((Math.min(builder.bufferSize, builder.maxReadahead) +
+            bytesPerChecksum - 1) / bytesPerChecksum);
+    if (maxReadaheadChunks == 0) {
+      this.zeroReadaheadRequested = true;
+      maxReadaheadChunks = 1;
+    } else {
+      this.zeroReadaheadRequested = false;
+    }
+    this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
+    this.storageType = builder.storageType;
+  }
+
+  private synchronized void createDataBufIfNeeded() {
+    if (dataBuf == null) {
+      dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
+      dataBuf.position(0);
+      dataBuf.limit(0);
+    }
+  }
+
+  private synchronized void freeDataBufIfExists() {
+    if (dataBuf != null) {
+      // When disposing of a dataBuf, we have to move our stored file index
+      // backwards.
+      dataPos -= dataBuf.remaining();
+      dataBuf.clear();
+      bufferPool.returnBuffer(dataBuf);
+      dataBuf = null;
+    }
+  }
+
+  private synchronized void createChecksumBufIfNeeded() {
+    if (checksumBuf == null) {
+      checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
+      checksumBuf.position(0);
+      checksumBuf.limit(0);
+    }
+  }
+
+  private synchronized void freeChecksumBufIfExists() {
+    if (checksumBuf != null) {
+      checksumBuf.clear();
+      bufferPool.returnBuffer(checksumBuf);
+      checksumBuf = null;
+    }
+  }
+
+  private synchronized int drainDataBuf(ByteBuffer buf) {
+    if (dataBuf == null) return -1;
+    int oldLimit = dataBuf.limit();
+    int nRead = Math.min(dataBuf.remaining(), buf.remaining());
+    if (nRead == 0) {
+      return (dataBuf.remaining() == 0) ? -1 : 0;
+    }
+    try {
+      dataBuf.limit(dataBuf.position() + nRead);
+      buf.put(dataBuf);
+    } finally {
+      dataBuf.limit(oldLimit);
+    }
+    return nRead;
+  }
+
+  /**
+   * Read from the block file into a buffer.
+   *
+   * This function overwrites checksumBuf.  It will increment dataPos.
+   *
+   * @param buf   The buffer to read into.  May be dataBuf.
+   *              The position and limit of this buffer should be set to
+   *              multiples of the checksum size.
+   * @param canSkipChecksum  True if we can skip checksumming.
+   *
+   * @return      Total bytes read.  0 on EOF.
+   */
+  private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
+      throws IOException {
+    TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" +
+        block.getBlockId() + ")", Sampler.NEVER);
+    try {
+      int total = 0;
+      long startDataPos = dataPos;
+      int startBufPos = buf.position();
+      while (buf.hasRemaining()) {
+        int nRead = dataIn.read(buf, dataPos);
+        if (nRead < 0) {
+          break;
+        }
+        dataPos += nRead;
+        total += nRead;
+      }
+      if (canSkipChecksum) {
+        freeChecksumBufIfExists();
+        return total;
+      }
+      if (total > 0) {
+        try {
+          buf.limit(buf.position());
+          buf.position(startBufPos);
+          createChecksumBufIfNeeded();
+          int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
+          checksumBuf.clear();
+          checksumBuf.limit(checksumsNeeded * checksumSize);
+          long checksumPos = BlockMetadataHeader.getHeaderSize()
+              + ((startDataPos / bytesPerChecksum) * checksumSize);
+          while (checksumBuf.hasRemaining()) {
+            int nRead = checksumIn.read(checksumBuf, checksumPos);
+            if (nRead < 0) {
+              throw new IOException("Got unexpected checksum file EOF at " +
+                  checksumPos + ", block file position " + startDataPos + " for " +
+                  "block " + block + " of file " + filename);
+            }
+            checksumPos += nRead;
+          }
+          checksumBuf.flip();
+
+          checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
+        } finally {
+          buf.position(buf.limit());
+        }
+      }
+      return total;
+    } finally {
+      scope.close();
+    }
+  }
+
+  private boolean createNoChecksumContext() {
+    if (verifyChecksum) {
+      if (storageType != null && storageType.isTransient()) {
+        // Checksums are not stored for replicas on transient storage.  We do not
+        // anchor, because we do not intend for client activity to block eviction
+        // from transient storage on the DataNode side.
+        return true;
+      } else {
+        return replica.addNoChecksumAnchor();
+      }
+    } else {
+      return true;
+    }
+  }
+
+  private void releaseNoChecksumContext() {
+    if (verifyChecksum) {
+      if (storageType == null || !storageType.isTransient()) {
+        replica.removeNoChecksumAnchor();
+      }
+    }
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    boolean canSkipChecksum = createNoChecksumContext();
+    try {
+      String traceString = null;
+      if (LOG.isTraceEnabled()) {
+        traceString = new StringBuilder().
+            append("read(").
+            append("buf.remaining=").append(buf.remaining()).
+            append(", block=").append(block).
+            append(", filename=").append(filename).
+            append(", canSkipChecksum=").append(canSkipChecksum).
+            append(")").toString();
+        LOG.info(traceString + ": starting");
+      }
+      int nRead;
+      try {
+        if (canSkipChecksum && zeroReadaheadRequested) {
+          nRead = readWithoutBounceBuffer(buf);
+        } else {
+          nRead = readWithBounceBuffer(buf, canSkipChecksum);
+        }
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.info(traceString + ": I/O error", e);
+        }
+        throw e;
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.info(traceString + ": returning " + nRead);
+      }
+      return nRead;
+    } finally {
+      if (canSkipChecksum) releaseNoChecksumContext();
+    }
+  }
+
+  private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
+      throws IOException {
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
+    int total = 0;
+    while (buf.hasRemaining()) {
+      int nRead = dataIn.read(buf, dataPos);
+      if (nRead <= 0) break;
+      dataPos += nRead;
+      total += nRead;
+    }
+    return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
+  }
+
+  /**
+   * Fill the data buffer.  If necessary, validate the data against the
+   * checksums.
+   * 
+   * We always want the offsets of the data contained in dataBuf to be
+   * aligned to the chunk boundary.  If we are validating checksums, we
+   * accomplish this by seeking backwards in the file until we're on a
+   * chunk boundary.  (This is necessary because we can't checksum a
+   * partial chunk.)  If we are not validating checksums, we simply only
+   * fill the latter part of dataBuf.
+   * 
+   * @param canSkipChecksum  true if we can skip checksumming.
+   * @return                 true if we hit EOF.
+   * @throws IOException
+   */
+  private synchronized boolean fillDataBuf(boolean canSkipChecksum)
+      throws IOException {
+    createDataBufIfNeeded();
+    final int slop = (int)(dataPos % bytesPerChecksum);
+    final long oldDataPos = dataPos;
+    dataBuf.limit(maxReadaheadLength);
+    if (canSkipChecksum) {
+      dataBuf.position(slop);
+      fillBuffer(dataBuf, canSkipChecksum);
+    } else {
+      dataPos -= slop;
+      dataBuf.position(0);
+      fillBuffer(dataBuf, canSkipChecksum);
+    }
+    dataBuf.limit(dataBuf.position());
+    dataBuf.position(Math.min(dataBuf.position(), slop));
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
+          "buffer from offset " + oldDataPos + " of " + block);
+    }
+    return dataBuf.limit() != maxReadaheadLength;
+  }
+
+  /**
+   * Read using the bounce buffer.
+   *
+   * A 'direct' read actually has three phases. The first drains any
+   * remaining bytes from the slow read buffer. After this the read is
+   * guaranteed to be on a checksum chunk boundary. If there are still bytes
+   * to read, the fast direct path is used for as many remaining bytes as
+   * possible, up to a multiple of the checksum chunk size. Finally, any
+   * 'odd' bytes remaining at the end of the read cause another slow read to
+   * be issued, which involves an extra copy.
+   *
+   * Every 'slow' read tries to fill the slow read buffer in one go for
+   * efficiency's sake. As described above, all non-checksum-chunk-aligned
+   * reads will be served from the slower read path.
+   *
+   * @param buf              The buffer to read into. 
+   * @param canSkipChecksum  True if we can skip checksums.
+   */
+  private synchronized int readWithBounceBuffer(ByteBuffer buf,
+        boolean canSkipChecksum) throws IOException {
+    int total = 0;
+    int bb = drainDataBuf(buf); // drain bounce buffer if possible
+    if (bb >= 0) {
+      total += bb;
+      if (buf.remaining() == 0) return total;
+    }
+    boolean eof = true, done = false;
+    do {
+      if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
+            && ((dataPos % bytesPerChecksum) == 0)) {
+        // Fast lane: try to read directly into user-supplied buffer, bypassing
+        // bounce buffer.
+        int oldLimit = buf.limit();
+        int nRead;
+        try {
+          buf.limit(buf.position() + maxReadaheadLength);
+          nRead = fillBuffer(buf, canSkipChecksum);
+        } finally {
+          buf.limit(oldLimit);
+        }
+        if (nRead < maxReadaheadLength) {
+          done = true;
+        }
+        if (nRead > 0) {
+          eof = false;
+        }
+        total += nRead;
+      } else {
+        // Slow lane: refill bounce buffer.
+        if (fillDataBuf(canSkipChecksum)) {
+          done = true;
+        }
+        bb = drainDataBuf(buf); // drain bounce buffer if possible
+        if (bb >= 0) {
+          eof = false;
+          total += bb;
+        }
+      }
+    } while ((!done) && (buf.remaining() > 0));
+    return (eof && total == 0) ? -1 : total;
+  }
+
+  @Override
+  public synchronized int read(byte[] arr, int off, int len)
+        throws IOException {
+    boolean canSkipChecksum = createNoChecksumContext();
+    int nRead;
+    try {
+      String traceString = null;
+      if (LOG.isTraceEnabled()) {
+        traceString = new StringBuilder().
+            append("read(arr.length=").append(arr.length).
+            append(", off=").append(off).
+            append(", len=").append(len).
+            append(", filename=").append(filename).
+            append(", block=").append(block).
+            append(", canSkipChecksum=").append(canSkipChecksum).
+            append(")").toString();
+        LOG.trace(traceString + ": starting");
+      }
+      try {
+        if (canSkipChecksum && zeroReadaheadRequested) {
+          nRead = readWithoutBounceBuffer(arr, off, len);
+        } else {
+          nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+        }
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(traceString + ": I/O error", e);
+        }
+        throw e;
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(traceString + ": returning " + nRead);
+      }
+    } finally {
+      if (canSkipChecksum) releaseNoChecksumContext();
+    }
+    return nRead;
+  }
+
+  private synchronized int readWithoutBounceBuffer(byte arr[], int off,
+        int len) throws IOException {
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
+    int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
+    if (nRead > 0) {
+      dataPos += nRead;
+    } else if ((nRead == 0) && (dataPos == dataIn.size())) {
+      return -1;
+    }
+    return nRead;
+  }
+
+  private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
+        boolean canSkipChecksum) throws IOException {
+    createDataBufIfNeeded();
+    if (!dataBuf.hasRemaining()) {
+      dataBuf.position(0);
+      dataBuf.limit(maxReadaheadLength);
+      fillDataBuf(canSkipChecksum);
+    }
+    if (dataBuf.remaining() == 0) return -1;
+    int toRead = Math.min(dataBuf.remaining(), len);
+    dataBuf.get(arr, off, toRead);
+    return toRead;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    int discardedFromBuf = 0;
+    long remaining = n;
+    if ((dataBuf != null) && dataBuf.hasRemaining()) {
+      discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
+      dataBuf.position(dataBuf.position() + discardedFromBuf);
+      remaining -= discardedFromBuf;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + 
+        filename + "): discarded " + discardedFromBuf + " bytes from " +
+        "dataBuf and advanced dataPos by " + remaining);
+    }
+    dataPos += remaining;
+    return n;
+  }
+
+  @Override
+  public int available() throws IOException {
+    // We never do network I/O in BlockReaderLocal.
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) return;
+    closed = true;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("close(filename=" + filename + ", block=" + block + ")");
+    }
+    replica.unref();
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
+  }
+
+  @Override
+  public synchronized void readFully(byte[] arr, int off, int len)
+      throws IOException {
+    BlockReaderUtil.readFully(this, arr, off, len);
+  }
+
+  @Override
+  public synchronized int readAll(byte[] buf, int off, int len)
+      throws IOException {
+    return BlockReaderUtil.readAll(this, buf, off, len);
+  }
+
+  @Override
+  public boolean isLocal() {
+    return true;
+  }
+
+  @Override
+  public boolean isShortCircuit() {
+    return true;
+  }
+
+  /**
+   * Get or create a memory map for this replica.
+   * 
+   * There are two kinds of ClientMmap objects we could fetch here: one that 
+   * will always read pre-checksummed data, and one that may read data that
+   * hasn't been checksummed.
+   *
+   * If we fetch the former, "safe" kind of ClientMmap, we have to increment
+   * the anchor count on the shared memory slot.  This will tell the DataNode
+   * not to munlock the block until this ClientMmap is closed.
+   * If we fetch the latter, we don't bother with anchoring.
+   *
+   * @param opts     The options to use, such as SKIP_CHECKSUMS.
+   * 
+   * @return         null on failure; the ClientMmap otherwise.
+   */
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    boolean anchor = verifyChecksum &&
+        (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
+    if (anchor) {
+      if (!createNoChecksumContext()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("can't get an mmap for " + block + " of " + filename + 
+              " since SKIP_CHECKSUMS was not given, " +
+              "we aren't skipping checksums, and the block is not mlocked.");
+        }
+        return null;
+      }
+    }
+    ClientMmap clientMmap = null;
+    try {
+      clientMmap = replica.getOrCreateClientMmap(anchor);
+    } finally {
+      if ((clientMmap == null) && anchor) {
+        releaseNoChecksumContext();
+      }
+    }
+    return clientMmap;
+  }
+  
+  @VisibleForTesting
+  boolean getVerifyChecksum() {
+    return this.verifyChecksum;
+  }
+
+  @VisibleForTesting
+  int getMaxReadaheadLength() {
+    return this.maxReadaheadLength;
+  }
+  
+  /**
+   * Make the replica anchorable.  Normally this can only be done by the
+   * DataNode.  This method is only for testing.
+   */
+  @VisibleForTesting
+  void forceAnchorable() {
+    replica.getSlot().makeAnchorable();
+  }
+
+  /**
+   * Make the replica unanchorable.  Normally this can only be done by the
+   * DataNode.  This method is only for testing.
+   */
+  @VisibleForTesting
+  void forceUnanchorable() {
+    replica.getSlot().makeUnanchorable();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
new file mode 100644
index 0000000..eea3f06
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DirectBufferPool;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ *
+ * This is the legacy implementation based on HDFS-2246, which requires
+ * permissions on the datanode to be set so that clients can directly access the
+ * blocks. The new implementation based on HDFS-347 should be preferred on UNIX
+ * systems where the required native code has been implemented.<br>
+ *
+ * {@link BlockReaderLocalLegacy} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the path to the file where block is stored using
+ * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
+ * RPC call</li>
+ * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+ * if security is enabled.</li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+class BlockReaderLocalLegacy implements BlockReader {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      BlockReaderLocalLegacy.class);
+
+  //Stores the cache and proxy for a local datanode.
+  private static class LocalDatanodeInfo {
+    private ClientDatanodeProtocol proxy = null;
+    private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
+
+    LocalDatanodeInfo() {
+      final int cacheSize = 10000;
+      final float hashTableLoadFactor = 0.75f;
+      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
+      cache = Collections
+          .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
+              hashTableCapacity, hashTableLoadFactor, true) {
+            private static final long serialVersionUID = 1;
+
+            @Override
+            protected boolean removeEldestEntry(
+                Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
+              return size() > cacheSize;
+            }
+          });
+    }
+
+    private synchronized ClientDatanodeProtocol getDatanodeProxy(
+        UserGroupInformation ugi, final DatanodeInfo node,
+        final Configuration conf, final int socketTimeout,
+        final boolean connectToDnViaHostname) throws IOException {
+      if (proxy == null) {
+        try {
+          proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+            @Override
+            public ClientDatanodeProtocol run() throws Exception {
+              return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf,
+                  socketTimeout, connectToDnViaHostname);
+            }
+          });
+        } catch (InterruptedException e) {
+          LOG.warn("encountered exception ", e);
+        }
+      }
+      return proxy;
+    }
+    
+    private synchronized void resetDatanodeProxy() {
+      if (null != proxy) {
+        RPC.stopProxy(proxy);
+        proxy = null;
+      }
+    }
+
+    private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
+      return cache.get(b);
+    }
+
+    private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
+      cache.put(b, info);
+    }
+
+    private void removeBlockLocalPathInfo(ExtendedBlock b) {
+      cache.remove(b);
+    }
+  }
+  
+  // Multiple datanodes could be running on the local machine. Store proxies in
+  // a map keyed by the ipc port of the datanode.
+  private static final Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+
+  private final FileInputStream dataIn; // reader for the data file
+  private final FileInputStream checksumIn;   // reader for the checksum file
+
+  /**
+   * Offset from the most recent chunk boundary at which the next read should
+   * take place. Is only set to non-zero at construction time, and is
+   * decremented (usually to 0) by subsequent reads. This avoids having to do a
+   * checksum read at construction to position the read cursor correctly.
+   */
+  private int offsetFromChunkBoundary;
+  
+  private byte[] skipBuf = null;
+
+  /**
+   * Used for checksummed reads that need to be staged before copying to their
+   * output buffer because they are either a) smaller than the checksum chunk
+   * size or b) issued by the slower read(byte[]...) path
+   */
+  private ByteBuffer slowReadBuff = null;
+  private ByteBuffer checksumBuff = null;
+  private DataChecksum checksum;
+  private final boolean verifyChecksum;
+
+  private static final DirectBufferPool bufferPool = new DirectBufferPool();
+
+  private final int bytesPerChecksum;
+  private final int checksumSize;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+  private final String filename;
+  private long blockId;
+  
+  /**
+   * The only way this object can be instantiated.
+   */
+  static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
+      UserGroupInformation userGroupInformation,
+      Configuration configuration, String file, ExtendedBlock blk,
+      Token<BlockTokenIdentifier> token, DatanodeInfo node, 
+      long startOffset, long length, StorageType storageType)
+      throws IOException {
+    final ShortCircuitConf scConf = conf.getShortCircuitConf();
+    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
+        .getIpcPort());
+    // check the cache first
+    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+    if (pathinfo == null) {
+      if (userGroupInformation == null) {
+        userGroupInformation = UserGroupInformation.getCurrentUser();
+      }
+      pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
+          configuration, conf.getSocketTimeout(), token,
+          conf.isConnectToDnViaHostname(), storageType);
+    }
+
+    // check to see if the file exists. It may so happen that the
+    // HDFS file has been deleted and this block-lookup is occurring
+    // on behalf of a new HDFS file. This time, the block file could
+    // be residing in a different portion of the fs.data.dir directory.
+    // In this case, we remove this entry from the cache. The next
+    // call to this method will re-populate the cache.
+    FileInputStream dataIn = null;
+    FileInputStream checksumIn = null;
+    BlockReaderLocalLegacy localBlockReader = null;
+    final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums()
+        || storageType.isTransient();
+    try {
+      // get a local file system
+      File blkfile = new File(pathinfo.getBlockPath());
+      dataIn = new FileInputStream(blkfile);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size "
+            + blkfile.length() + " startOffset " + startOffset + " length "
+            + length + " short circuit checksum " + !skipChecksumCheck);
+      }
+
+      if (!skipChecksumCheck) {
+        // get the metadata file
+        File metafile = new File(pathinfo.getMetaPath());
+        checksumIn = new FileInputStream(metafile);
+
+        final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+            new DataInputStream(checksumIn), blk);
+        long firstChunkOffset = startOffset
+            - (startOffset % checksum.getBytesPerChecksum());
+        localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
+            startOffset, length, pathinfo, checksum, true, dataIn,
+            firstChunkOffset, checksumIn);
+      } else {
+        localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
+            startOffset, length, pathinfo, dataIn);
+      }
+    } catch (IOException e) {
+      // remove from cache
+      localDatanodeInfo.removeBlockLocalPathInfo(blk);
+      LOG.warn("BlockReaderLocalLegacy: Removing " + blk
+          + " from cache because local file " + pathinfo.getBlockPath()
+          + " could not be opened.");
+      throw e;
+    } finally {
+      if (localBlockReader == null) {
+        if (dataIn != null) {
+          dataIn.close();
+        }
+        if (checksumIn != null) {
+          checksumIn.close();
+        }
+      }
+    }
+    return localBlockReader;
+  }
+  
+  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+    if (ldInfo == null) {
+      ldInfo = new LocalDatanodeInfo();
+      localDatanodeInfoMap.put(port, ldInfo);
+    }
+    return ldInfo;
+  }
+  
+  private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
+      ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
+      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
+      StorageType storageType) throws IOException {
+    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
+    BlockLocalPathInfo pathinfo = null;
+    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
+        conf, timeout, connectToDnViaHostname);
+    try {
+      // make RPC to local datanode to find local pathnames of blocks
+      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+      // We cannot cache the path information for a replica on transient storage.
+      // If the replica gets evicted, then it moves to a different path.  Then,
+      // our next attempt to read from the cached path would fail to find the
+      // file.  Additionally, the failure would cause us to disable legacy
+      // short-circuit read for all subsequent use in the ClientContext.  Unlike
+      // the newer short-circuit read implementation, we have no communication
+      // channel for the DataNode to notify the client that the path has been
+      // invalidated.  Therefore, our only option is to skip caching.
+      if (pathinfo != null && !storageType.isTransient()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
+        }
+        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+      }
+    } catch (IOException e) {
+      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+      throw e;
+    }
+    return pathinfo;
+  }
+  
+  private static int getSlowReadBufferNumChunks(int bufferSizeBytes,
+      int bytesPerChecksum) {
+    if (bufferSizeBytes < bytesPerChecksum) {
+      throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
+          "buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
+          "a single chunk (" + bytesPerChecksum +  "). Please configure " +
+          HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY +
+          " appropriately");
+    }
+
+    // Round down to nearest chunk size
+    return bufferSizeBytes / bytesPerChecksum;
+  }
+
+  private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
+      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
+      long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
+      throws IOException {
+    this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
+        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
+        dataIn, startOffset, null);
+  }
+
+  private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
+      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
+      long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
+      boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
+      FileInputStream checksumIn) throws IOException {
+    this.filename = hdfsfile;
+    this.checksum = checksum;
+    this.verifyChecksum = verifyChecksum;
+    this.startOffset = Math.max(startOffset, 0);
+    this.blockId = block.getBlockId();
+
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+
+    this.dataIn = dataIn;
+    this.checksumIn = checksumIn;
+    this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
+
+    final int chunksPerChecksumRead = getSlowReadBufferNumChunks(
+        conf.getShortCircuitBufferSize(), bytesPerChecksum);
+    slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+    checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+    // Initially the buffers have nothing to read.
+    slowReadBuff.flip();
+    checksumBuff.flip();
+    boolean success = false;
+    try {
+      // Skip both input streams to beginning of the chunk containing startOffset
+      IOUtils.skipFully(dataIn, firstChunkOffset);
+      if (checksumIn != null) {
+        long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+        IOUtils.skipFully(checksumIn, checkSumOffset);
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        bufferPool.returnBuffer(slowReadBuff);
+        bufferPool.returnBuffer(checksumBuff);
+      }
+    }
+  }
+
+  /**
+   * Reads bytes into a buffer until EOF or the buffer's limit is reached
+   */
+  private int fillBuffer(FileInputStream stream, ByteBuffer buf)
+      throws IOException {
+    TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" +
+        blockId + ")", Sampler.NEVER);
+    try {
+      int bytesRead = stream.getChannel().read(buf);
+      if (bytesRead < 0) {
+        //EOF
+        return bytesRead;
+      }
+      while (buf.remaining() > 0) {
+        int n = stream.getChannel().read(buf);
+        if (n < 0) {
+          //EOF
+          return bytesRead;
+        }
+        bytesRead += n;
+      }
+      return bytesRead;
+    } finally {
+      scope.close();
+    }
+  }
+  
+  /**
+   * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
+   * another.
+   */
+  private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
+    int oldLimit = from.limit();
+    from.limit(from.position() + length);
+    try {
+      to.put(from);
+    } finally {
+      from.limit(oldLimit);
+    }
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    int nRead = 0;
+    if (verifyChecksum) {
+      // A 'direct' read actually has three phases. The first drains any
+      // remaining bytes from the slow read buffer. After this the read is
+      // guaranteed to be on a checksum chunk boundary. If there are still bytes
+      // to read, the fast direct path is used for as many remaining bytes as
+      // possible, up to a multiple of the checksum chunk size. Finally, any
+      // 'odd' bytes remaining at the end of the read cause another slow read to
+      // be issued, which involves an extra copy.
+
+      // Every 'slow' read tries to fill the slow read buffer in one go for
+      // efficiency's sake. As described above, all non-checksum-chunk-aligned
+      // reads will be served from the slower read path.
+
+      if (slowReadBuff.hasRemaining()) {
+        // There are remaining bytes from a small read available. This usually
+        // means this read is unaligned, which falls back to the slow path.
+        int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining());
+        writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+        nRead += fromSlowReadBuff;
+      }
+
+      if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
+        // Since we have drained the 'small read' buffer, we are guaranteed to
+        // be chunk-aligned
+        int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
+
+        // There's only enough checksum buffer space available to checksum one
+        // entire slow read buffer. This saves keeping the number of checksum
+        // chunks around.
+        len = Math.min(len, slowReadBuff.capacity());
+        int oldlimit = buf.limit();
+        buf.limit(buf.position() + len);
+        int readResult = 0;
+        try {
+          readResult = doByteBufferRead(buf);
+        } finally {
+          buf.limit(oldlimit);
+        }
+        if (readResult == -1) {
+          return nRead;
+        } else {
+          nRead += readResult;
+          buf.position(buf.position() + readResult);
+        }
+      }
+
+      // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
+      // until chunk boundary
+      if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) {
+        int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary);
+        int readResult = fillSlowReadBuffer(toRead);
+        if (readResult == -1) {
+          return nRead;
+        } else {
+          int fromSlowReadBuff = Math.min(readResult, buf.remaining());
+          writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+          nRead += fromSlowReadBuff;
+        }
+      }
+    } else {
+      // Non-checksummed reads are much easier; we can just fill the buffer directly.
+      nRead = doByteBufferRead(buf);
+      if (nRead > 0) {
+        buf.position(buf.position() + nRead);
+      }
+    }
+    return nRead;
+  }
+
+  /**
+   * Tries to read as many bytes as possible into supplied buffer, checksumming
+   * each chunk if needed.
+   *
+   * <b>Preconditions:</b>
+   * <ul>
+   * <li>
+   * If checksumming is enabled, buf.remaining must be a multiple of
+   * bytesPerChecksum. Note that this is not a requirement for clients of
+   * read(ByteBuffer) - in the case of non-checksum-sized read requests,
+   * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
+   * method.
+   * </li>
+   * </ul>
+   * <b>Postconditions:</b>
+   * <ul>
+   * <li>buf.limit and buf.mark are unchanged.</li>
+   * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
+   * requested bytes can be read straight from the buffer</li>
+   * </ul>
+   *
+   * @param buf
+   *          byte buffer to write bytes to. If checksums are not required, buf
+   *          can have any number of bytes remaining, otherwise there must be a
+   *          multiple of the checksum chunk size remaining.
+   * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
+   *         that is, the the number of useful bytes (up to the amount
+   *         requested) readable from the buffer by the client.
+   */
+  private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
+    if (verifyChecksum) {
+      assert buf.remaining() % bytesPerChecksum == 0;
+    }
+    int dataRead = -1;
+
+    int oldpos = buf.position();
+    // Read as much as we can into the buffer.
+    dataRead = fillBuffer(dataIn, buf);
+
+    if (dataRead == -1) {
+      return -1;
+    }
+
+    if (verifyChecksum) {
+      ByteBuffer toChecksum = buf.duplicate();
+      toChecksum.position(oldpos);
+      toChecksum.limit(oldpos + dataRead);
+
+      checksumBuff.clear();
+      // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
+      int numChunks =
+        (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
+      checksumBuff.limit(checksumSize * numChunks);
+
+      fillBuffer(checksumIn, checksumBuff);
+      checksumBuff.flip();
+
+      checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
+          this.startOffset);
+    }
+
+    if (dataRead >= 0) {
+      buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
+    }
+
+    if (dataRead < offsetFromChunkBoundary) {
+      // yikes, didn't even get enough bytes to honour offset. This can happen
+      // even if we are verifying checksums if we are at EOF.
+      offsetFromChunkBoundary -= dataRead;
+      dataRead = 0;
+    } else {
+      dataRead -= offsetFromChunkBoundary;
+      offsetFromChunkBoundary = 0;
+    }
+
+    return dataRead;
+  }
+
+  /**
+   * Ensures that up to len bytes are available and checksummed in the slow read
+   * buffer. The number of bytes available to read is returned. If the buffer is
+   * not already empty, the number of remaining bytes is returned and no actual
+   * read happens.
+   *
+   * @param len
+   *          the maximum number of bytes to make available. After len bytes
+   *          are read, the underlying bytestream <b>must</b> be at a checksum
+   *          boundary, or EOF. That is, (len + currentPosition) %
+   *          bytesPerChecksum == 0.
+   * @return the number of bytes available to read, or -1 if EOF.
+   */
+  private synchronized int fillSlowReadBuffer(int len) throws IOException {
+    int nRead = -1;
+    if (slowReadBuff.hasRemaining()) {
+      // Already got data, good to go.
+      nRead = Math.min(len, slowReadBuff.remaining());
+    } else {
+      // Round a complete read of len bytes (plus any implicit offset) to the
+      // next chunk boundary, since we try and read in multiples of a chunk
+      int nextChunk = len + offsetFromChunkBoundary +
+          (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum));
+      int limit = Math.min(nextChunk, slowReadBuff.capacity());
+      assert limit % bytesPerChecksum == 0;
+
+      slowReadBuff.clear();
+      slowReadBuff.limit(limit);
+
+      nRead = doByteBufferRead(slowReadBuff);
+
+      if (nRead > 0) {
+        // So that next time we call slowReadBuff.hasRemaining(), we don't get a
+        // false positive.
+        slowReadBuff.limit(nRead + slowReadBuff.position());
+      }
+    }
+    return nRead;
+  }
+
+  @Override
+  public synchronized int read(byte[] buf, int off, int len) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("read off " + off + " len " + len);
+    }
+    if (!verifyChecksum) {
+      return dataIn.read(buf, off, len);
+    }
+
+    int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
+
+    if (nRead > 0) {
+      // Possible that buffer is filled with a larger read than we need, since
+      // we tried to read as much as possible at once
+      nRead = Math.min(len, nRead);
+      slowReadBuff.get(buf, off, nRead);
+    }
+
+    return nRead;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("skip " + n);
+    }
+    if (n <= 0) {
+      return 0;
+    }
+    if (!verifyChecksum) {
+      return dataIn.skip(n);
+    }
+  
+    // caller made sure newPosition is not beyond EOF.
+    int remaining = slowReadBuff.remaining();
+    int position = slowReadBuff.position();
+    int newPosition = position + (int)n;
+  
+    // if the new offset is already read into dataBuff, just reposition
+    if (n <= remaining) {
+      assert offsetFromChunkBoundary == 0;
+      slowReadBuff.position(newPosition);
+      return n;
+    }
+  
+    // for small gap, read through to keep the data/checksum in sync
+    if (n - remaining <= bytesPerChecksum) {
+      slowReadBuff.position(position + remaining);
+      if (skipBuf == null) {
+        skipBuf = new byte[bytesPerChecksum];
+      }
+      int ret = read(skipBuf, 0, (int)(n - remaining));
+      return (remaining + ret);
+    }
+  
+    // optimize for big gap: discard the current buffer, skip to
+    // the beginning of the appropriate checksum chunk and then
+    // read to the middle of that chunk to be in sync with checksums.
+  
+    // We can't use this.offsetFromChunkBoundary because we need to know how
+    // many bytes of the offset were really read. Calling read(..) with a
+    // positive this.offsetFromChunkBoundary causes that many bytes to get
+    // silently skipped.
+    int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
+    long toskip = n - remaining - myOffsetFromChunkBoundary;
+
+    slowReadBuff.position(slowReadBuff.limit());
+    checksumBuff.position(checksumBuff.limit());
+  
+    IOUtils.skipFully(dataIn, toskip);
+    long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
+    IOUtils.skipFully(checksumIn, checkSumOffset);
+
+    // read into the middle of the chunk
+    if (skipBuf == null) {
+      skipBuf = new byte[bytesPerChecksum];
+    }
+    assert skipBuf.length == bytesPerChecksum;
+    assert myOffsetFromChunkBoundary < bytesPerChecksum;
+
+    int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+
+    if (ret == -1) {  // EOS
+      return (toskip + remaining);
+    } else {
+      return (toskip + remaining + ret);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    IOUtilsClient.cleanup(LOG, dataIn, checksumIn);
+    if (slowReadBuff != null) {
+      bufferPool.returnBuffer(slowReadBuff);
+      slowReadBuff = null;
+    }
+    if (checksumBuff != null) {
+      bufferPool.returnBuffer(checksumBuff);
+      checksumBuff = null;
+    }
+    startOffset = -1;
+    checksum = null;
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return BlockReaderUtil.readAll(this, buf, offset, len);
+  }
+
+  @Override
+  public void readFully(byte[] buf, int off, int len) throws IOException {
+    BlockReaderUtil.readFully(this, buf, off, len);
+  }
+
+  @Override
+  public int available() throws IOException {
+    // We never do network I/O in BlockReaderLocalLegacy.
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public boolean isLocal() {
+    return true;
+  }
+  
+  @Override
+  public boolean isShortCircuit() {
+    return true;
+  }
+
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
new file mode 100644
index 0000000..dbc528e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+
+/**
+ * For sharing between the local and remote block reader implementations.
+ */
+@InterfaceAudience.Private
+class BlockReaderUtil {
+
+  /* See {@link BlockReader#readAll(byte[], int, int)} */
+  public static int readAll(BlockReader reader,
+      byte[] buf, int offset, int len) throws IOException {
+    int n = 0;
+    for (;;) {
+      int nread = reader.read(buf, offset + n, len - n);
+      if (nread <= 0)
+        return (n == 0) ? nread : n;
+      n += nread;
+      if (n >= len)
+        return n;
+    }
+  }
+
+  /* See {@link BlockReader#readFully(byte[], int, int)} */
+  public static void readFully(BlockReader reader,
+      byte[] buf, int off, int len) throws IOException {
+    int toRead = len;
+    while (toRead > 0) {
+      int ret = reader.read(buf, off, toRead);
+      if (ret < 0) {
+        throw new IOException("Premature EOF from inputStream");
+      }
+      toRead -= ret;
+      off += ret;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
new file mode 100644
index 0000000..3836979
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ClientContext contains context information for a client.
+ * 
+ * This allows us to share caches such as the socket cache across
+ * DFSClient instances.
+ */
+@InterfaceAudience.Private
+public class ClientContext {
+  private static final Logger LOG = LoggerFactory.getLogger(ClientContext.class);
+
+  /**
+   * Global map of context names to caches contexts.
+   */
+  private final static HashMap<String, ClientContext> CACHES =
+      new HashMap<String, ClientContext>();
+
+  /**
+   * Name of context.
+   */
+  private final String name;
+
+  /**
+   * String representation of the configuration.
+   */
+  private final String confString;
+
+  /**
+   * Caches short-circuit file descriptors, mmap regions.
+   */
+  private final ShortCircuitCache shortCircuitCache;
+
+  /**
+   * Caches TCP and UNIX domain sockets for reuse.
+   */
+  private final PeerCache peerCache;
+
+  /**
+   * Stores information about socket paths.
+   */
+  private final DomainSocketFactory domainSocketFactory;
+
+  /**
+   * Caches key Providers for the DFSClient
+   */
+  private final KeyProviderCache keyProviderCache;
+  /**
+   * True if we should use the legacy BlockReaderLocal.
+   */
+  private final boolean useLegacyBlockReaderLocal;
+
+  /**
+   * True if the legacy BlockReaderLocal is disabled.
+   *
+   * The legacy block reader local gets disabled completely whenever there is an
+   * error or miscommunication.  The new block reader local code handles this
+   * case more gracefully inside DomainSocketFactory.
+   */
+  private volatile boolean disableLegacyBlockReaderLocal = false;
+
+  /** Creating byte[] for {@link DFSOutputStream}. */
+  private final ByteArrayManager byteArrayManager;  
+
+  /**
+   * Whether or not we complained about a DFSClient fetching a CacheContext that
+   * didn't match its config values yet.
+   */
+  private boolean printedConfWarning = false;
+
+  private ClientContext(String name, DfsClientConf conf) {
+    final ShortCircuitConf scConf = conf.getShortCircuitConf();
+
+    this.name = name;
+    this.confString = scConf.confAsString();
+    this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
+    this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
+        scConf.getSocketCacheExpiry());
+    this.keyProviderCache = new KeyProviderCache(
+        scConf.getKeyProviderCacheExpiryMs());
+    this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal();
+    this.domainSocketFactory = new DomainSocketFactory(scConf);
+
+    this.byteArrayManager = ByteArrayManager.newInstance(
+        conf.getWriteByteArrayManagerConf());
+  }
+
+  public static ClientContext get(String name, DfsClientConf conf) {
+    ClientContext context;
+    synchronized(ClientContext.class) {
+      context = CACHES.get(name);
+      if (context == null) {
+        context = new ClientContext(name, conf);
+        CACHES.put(name, context);
+      } else {
+        context.printConfWarningIfNeeded(conf);
+      }
+    }
+    return context;
+  }
+
+  /**
+   * Get a client context, from a Configuration object.
+   *
+   * This method is less efficient than the version which takes a DFSClient#Conf
+   * object, and should be mostly used by tests.
+   */
+  @VisibleForTesting
+  public static ClientContext getFromConf(Configuration conf) {
+    return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT,
+        HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
+            new DfsClientConf(conf));
+  }
+
+  private void printConfWarningIfNeeded(DfsClientConf conf) {
+    String existing = this.getConfString();
+    String requested = conf.getShortCircuitConf().confAsString();
+    if (!existing.equals(requested)) {
+      if (!printedConfWarning) {
+        printedConfWarning = true;
+        LOG.warn("Existing client context '" + name + "' does not match " +
+            "requested configuration.  Existing: " + existing + 
+            ", Requested: " + requested);
+      }
+    }
+  }
+
+  public String getConfString() {
+    return confString;
+  }
+
+  public ShortCircuitCache getShortCircuitCache() {
+    return shortCircuitCache;
+  }
+
+  public PeerCache getPeerCache() {
+    return peerCache;
+  }
+
+  public KeyProviderCache getKeyProviderCache() {
+    return keyProviderCache;
+  }
+
+  public boolean getUseLegacyBlockReaderLocal() {
+    return useLegacyBlockReaderLocal;
+  }
+
+  public boolean getDisableLegacyBlockReaderLocal() {
+    return disableLegacyBlockReaderLocal;
+  }
+
+  public void setDisableLegacyBlockReaderLocal() {
+    disableLegacyBlockReaderLocal = true;
+  }
+
+  public DomainSocketFactory getDomainSocketFactory() {
+    return domainSocketFactory;
+  }
+
+  public ByteArrayManager getByteArrayManager() {
+    return byteArrayManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 3d0acb0..a89f556 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -22,22 +22,32 @@ import com.google.common.collect.Maps;
 import com.google.common.primitives.SignedBytes;
 import org.apache.commons.io.Charsets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.SocketFactory;
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
 import java.util.Collections;
@@ -455,4 +465,62 @@ public class DFSUtilClient {
     localAddrMap.put(addr.getHostAddress(), local);
     return local;
   }
+
+  /** Create a {@link ClientDatanodeProtocol} proxy */
+  public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf, int socketTimeout,
+      boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
+    return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
+        connectToDnViaHostname, locatedBlock);
+  }
+
+  /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
+  public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf, int socketTimeout,
+      boolean connectToDnViaHostname) throws IOException {
+    return new ClientDatanodeProtocolTranslatorPB(
+        datanodeid, conf, socketTimeout, connectToDnViaHostname);
+  }
+
+  /** Create a {@link ClientDatanodeProtocol} proxy */
+  public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory) throws IOException {
+    return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
+  }
+
+  /**
+   * Creates a new KeyProvider from the given Configuration.
+   *
+   * @param conf Configuration
+   * @return new KeyProvider, or null if no provider was found.
+   * @throws IOException if the KeyProvider is improperly specified in
+   *                             the Configuration
+   */
+  public static KeyProvider createKeyProvider(
+      final Configuration conf) throws IOException {
+    final String providerUriStr =
+        conf.getTrimmed(HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
+    // No provider set in conf
+    if (providerUriStr.isEmpty()) {
+      return null;
+    }
+    final URI providerUri;
+    try {
+      providerUri = new URI(providerUriStr);
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+    KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
+    if (keyProvider == null) {
+      throw new IOException("Could not instantiate KeyProvider from " +
+          HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '"
+          + providerUriStr + "'");
+    }
+    if (keyProvider.isTransient()) {
+      throw new IOException("KeyProvider " + keyProvider.toString()
+          + " was found but it is a transient provider.");
+    }
+    return keyProvider;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
new file mode 100644
index 0000000..e135d8e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+
+/**
+ * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
+ * replicas.
+ */
+@InterfaceAudience.Private
+public final class ExternalBlockReader implements BlockReader {
+  private final ReplicaAccessor accessor;
+  private final long visibleLength;
+  private long pos;
+
+  ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
+                      long startOffset) {
+    this.accessor = accessor;
+    this.visibleLength = visibleLength;
+    this.pos = startOffset;
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws IOException {
+    int nread = accessor.read(pos, buf, off, len);
+    pos += nread;
+    return nread;
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    int nread = accessor.read(pos, buf);
+    pos += nread;
+    return nread;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    // You cannot skip backwards
+    if (n <= 0) {
+      return 0;
+    }
+    // You can't skip past the end of the replica.
+    long oldPos = pos;
+    pos += n;
+    if (pos > visibleLength) {
+      pos = visibleLength;
+    }
+    return pos - oldPos;
+  }
+
+  @Override
+  public int available() throws IOException {
+    // We return the amount of bytes that we haven't read yet from the
+    // replica, based on our current position.  Some of the other block
+    // readers return a shorter length than that.  The only advantage to
+    // returning a shorter length is that the DFSInputStream will
+    // trash your block reader and create a new one if someone tries to
+    // seek() beyond the available() region.
+    long diff = visibleLength - pos;
+    if (diff > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    } else {
+      return (int)diff;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    accessor.close();
+  }
+
+  @Override
+  public void readFully(byte[] buf, int offset, int len) throws IOException {
+    BlockReaderUtil.readFully(this, buf, offset, len);
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return BlockReaderUtil.readAll(this, buf, offset, len);
+  }
+
+  @Override
+  public boolean isLocal() {
+    return accessor.isLocal();
+  }
+
+  @Override
+  public boolean isShortCircuit() {
+    return accessor.isShortCircuit();
+  }
+
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    // For now, pluggable ReplicaAccessors do not support zero-copy.
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java
new file mode 100644
index 0000000..05492e0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class KeyProviderCache {
+
+  public static final Logger LOG = LoggerFactory.getLogger(KeyProviderCache.class);
+
+  private final Cache<URI, KeyProvider> cache;
+
+  public KeyProviderCache(long expiryMs) {
+    cache = CacheBuilder.newBuilder()
+        .expireAfterAccess(expiryMs, TimeUnit.MILLISECONDS)
+        .removalListener(new RemovalListener<URI, KeyProvider>() {
+          @Override
+          public void onRemoval(
+              RemovalNotification<URI, KeyProvider> notification) {
+            try {
+              notification.getValue().close();
+            } catch (Throwable e) {
+              LOG.error(
+                  "Error closing KeyProvider with uri ["
+                      + notification.getKey() + "]", e);
+              ;
+            }
+          }
+        })
+        .build();
+  }
+
+  public KeyProvider get(final Configuration conf) {
+    URI kpURI = createKeyProviderURI(conf);
+    if (kpURI == null) {
+      return null;
+    }
+    try {
+      return cache.get(kpURI, new Callable<KeyProvider>() {
+        @Override
+        public KeyProvider call() throws Exception {
+          return DFSUtilClient.createKeyProvider(conf);
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Could not create KeyProvider for DFSClient !!", e.getCause());
+      return null;
+    }
+  }
+
+  private URI createKeyProviderURI(Configuration conf) {
+    final String providerUriStr =
+        conf.getTrimmed(HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
+    // No provider set in conf
+    if (providerUriStr.isEmpty()) {
+      LOG.error("Could not find uri with key ["
+          + HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI
+          + "] to create a keyProvider !!");
+      return null;
+    }
+    final URI providerUri;
+    try {
+      providerUri = new URI(providerUriStr);
+    } catch (URISyntaxException e) {
+      LOG.error("KeyProvider URI string is invalid [" + providerUriStr
+          + "]!!", e.getCause());
+      return null;
+    }
+    return providerUri;
+  }
+
+  @VisibleForTesting
+  public void setKeyProvider(Configuration conf, KeyProvider keyProvider)
+      throws IOException {
+    URI uri = createKeyProviderURI(conf);
+    cache.put(uri, keyProvider);
+  }
+}