You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:59:02 UTC

[50/50] [abbrv] hadoop git commit: Merge remote-tracking branch 'apache/trunk' into HDFS-7285

Merge remote-tracking branch 'apache/trunk' into HDFS-7285


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/53358fe6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/53358fe6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/53358fe6

Branch: refs/heads/HDFS-7285
Commit: 53358fe680a11c1b66a7f60733d11c1f4efe0232
Parents: ab56fcd 2e251a7
Author: Zhe Zhang <zh...@cloudera.com>
Authored: Tue Sep 1 00:29:55 2015 -0700
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Tue Sep 1 14:48:37 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   9 +
 hadoop-common-project/hadoop-common/pom.xml     |  19 +-
 .../fs/CommonConfigurationKeysPublic.java       |   7 +
 .../src/main/resources/core-default.xml         |  14 +-
 .../src/site/markdown/FileSystemShell.md        |  13 +-
 .../java/org/apache/hadoop/fs/test-untar.tar    | Bin 20480 -> 0 bytes
 .../java/org/apache/hadoop/fs/test-untar.tgz    | Bin 2024 -> 0 bytes
 .../fs/viewfs/ViewFileSystemBaseTest.java       |   2 +-
 .../apache/hadoop/fs/viewfs/ViewFsBaseTest.java |   2 +-
 .../src/test/resources/test-untar.tar           | Bin 0 -> 20480 bytes
 .../src/test/resources/test-untar.tgz           | Bin 0 -> 2024 bytes
 hadoop-hdfs-project/hadoop-hdfs-client/pom.xml  |   5 +
 .../org/apache/hadoop/hdfs/BlockReader.java     | 110 +++
 .../apache/hadoop/hdfs/BlockReaderLocal.java    | 748 +++++++++++++++++++
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     | 743 ++++++++++++++++++
 .../org/apache/hadoop/hdfs/BlockReaderUtil.java |  57 ++
 .../org/apache/hadoop/hdfs/ClientContext.java   | 196 +++++
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   |  68 ++
 .../apache/hadoop/hdfs/ExternalBlockReader.java | 126 ++++
 .../apache/hadoop/hdfs/KeyProviderCache.java    | 112 +++
 .../java/org/apache/hadoop/hdfs/PeerCache.java  | 291 ++++++++
 .../apache/hadoop/hdfs/RemoteBlockReader.java   | 517 +++++++++++++
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  | 485 ++++++++++++
 .../hadoop/hdfs/client/BlockReportOptions.java  |  59 ++
 .../hdfs/client/HdfsClientConfigKeys.java       |  13 +
 .../hdfs/protocol/BlockLocalPathInfo.java       |  70 ++
 .../hdfs/protocol/ClientDatanodeProtocol.java   | 152 ++++
 .../InvalidEncryptionKeyException.java          |  40 +
 .../protocol/datatransfer/PacketHeader.java     | 214 ++++++
 .../protocol/datatransfer/PacketReceiver.java   | 310 ++++++++
 .../protocolPB/ClientDatanodeProtocolPB.java    |  37 +
 .../ClientDatanodeProtocolTranslatorPB.java     | 326 ++++++++
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  13 +
 .../token/block/BlockTokenSelector.java         |  48 ++
 .../hdfs/util/ByteBufferOutputStream.java       |  49 ++
 .../hadoop/hdfs/web/URLConnectionFactory.java   |  30 +-
 .../hadoop/hdfs/web/WebHdfsFileSystem.java      |  15 +-
 .../hdfs/web/oauth2/AccessTokenProvider.java    |  66 ++
 .../hdfs/web/oauth2/AccessTokenTimer.java       | 103 +++
 .../ConfCredentialBasedAccessTokenProvider.java |  62 ++
 ...onfRefreshTokenBasedAccessTokenProvider.java | 146 ++++
 .../CredentialBasedAccessTokenProvider.java     | 135 ++++
 .../oauth2/OAuth2ConnectionConfigurator.java    |  79 ++
 .../hadoop/hdfs/web/oauth2/OAuth2Constants.java |  46 ++
 .../apache/hadoop/hdfs/web/oauth2/Utils.java    |  63 ++
 .../hadoop/hdfs/web/oauth2/package-info.java    |  26 +
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  22 +
 hadoop-hdfs-project/hadoop-hdfs/pom.xml         |   6 +
 .../bkjournal/BookKeeperEditLogInputStream.java |   2 +-
 .../org/apache/hadoop/hdfs/BlockReader.java     | 110 ---
 .../apache/hadoop/hdfs/BlockReaderLocal.java    | 746 ------------------
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     | 740 ------------------
 .../org/apache/hadoop/hdfs/BlockReaderUtil.java |  57 --
 .../org/apache/hadoop/hdfs/ClientContext.java   | 195 -----
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   1 -
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  14 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |   2 +-
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  68 +-
 .../apache/hadoop/hdfs/ExternalBlockReader.java | 126 ----
 .../apache/hadoop/hdfs/KeyProviderCache.java    | 111 ---
 .../java/org/apache/hadoop/hdfs/PeerCache.java  | 290 -------
 .../apache/hadoop/hdfs/RemoteBlockReader.java   | 513 -------------
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  | 482 ------------
 .../hadoop/hdfs/client/BlockReportOptions.java  |  59 --
 .../hdfs/protocol/BlockLocalPathInfo.java       |  70 --
 .../hdfs/protocol/ClientDatanodeProtocol.java   | 152 ----
 .../hadoop/hdfs/protocol/LayoutVersion.java     |   2 +-
 .../InvalidEncryptionKeyException.java          |  40 -
 .../protocol/datatransfer/PacketHeader.java     | 214 ------
 .../protocol/datatransfer/PacketReceiver.java   | 310 --------
 .../hdfs/protocol/datatransfer/Receiver.java    |  15 +-
 .../protocolPB/ClientDatanodeProtocolPB.java    |  37 -
 ...tDatanodeProtocolServerSideTranslatorPB.java |   6 +-
 .../ClientDatanodeProtocolTranslatorPB.java     | 326 --------
 ...tNamenodeProtocolServerSideTranslatorPB.java |  14 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   2 +-
 ...rDatanodeProtocolServerSideTranslatorPB.java |   2 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  33 +-
 .../token/block/BlockTokenSelector.java         |  48 --
 .../hdfs/server/blockmanagement/BlockInfo.java  |  19 +-
 .../blockmanagement/BlockInfoContiguous.java    |  15 -
 .../blockmanagement/BlockInfoStriped.java       |  19 -
 .../server/blockmanagement/BlockManager.java    | 552 +++++---------
 .../BlockPlacementPolicyDefault.java            | 147 +---
 .../blockmanagement/BlockRecoveryWork.java      | 111 +++
 .../blockmanagement/BlockToMarkCorrupt.java     |  82 ++
 .../hdfs/server/blockmanagement/BlocksMap.java  |  16 -
 .../blockmanagement/DatanodeDescriptor.java     |  35 +-
 .../server/blockmanagement/DatanodeManager.java |   9 +-
 .../blockmanagement/ErasureCodingWork.java      |  60 ++
 .../server/blockmanagement/HostFileManager.java |  19 +
 .../server/blockmanagement/ReplicationWork.java |  53 ++
 .../hadoop/hdfs/server/datanode/DNConf.java     |   4 +-
 .../namenode/EditLogBackupInputStream.java      |   2 +-
 .../server/namenode/EditLogFileInputStream.java |   2 +-
 .../hdfs/server/namenode/FSDirDeleteOp.java     |  40 +
 .../hdfs/server/namenode/FSDirectory.java       |  63 ++
 .../hdfs/server/namenode/FSEditLogLoader.java   |   4 +-
 .../hdfs/server/namenode/FSEditLogOp.java       | 354 ++++++---
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java  |   3 +-
 .../hdfs/util/ByteBufferOutputStream.java       |  49 --
 .../hadoop-hdfs/src/site/markdown/WebHDFS.md    |  25 +
 .../hadoop/hdfs/TestBlockReaderLocal.java       |  30 +-
 .../hadoop/hdfs/TestBlockReaderLocalLegacy.java |   2 +-
 .../hdfs/TestClientBlockVerification.java       |   4 +-
 .../hadoop/hdfs/TestDFSClientRetries.java       |   2 +-
 .../apache/hadoop/hdfs/TestDecommission.java    |  15 +-
 .../hadoop/hdfs/protocolPB/TestPBHelper.java    |   4 +-
 .../security/token/block/TestBlockToken.java    |  10 +-
 .../blockmanagement/TestBlockInfoStriped.java   |  30 -
 .../blockmanagement/TestDatanodeManager.java    | 103 ++-
 .../blockmanagement/TestHostFileManager.java    |   7 +-
 .../blockmanagement/TestReplicationPolicy.java  |  26 +-
 .../hdfs/server/namenode/TestEditLog.java       |   2 +-
 .../namenode/TestEditLogFileInputStream.java    |  80 ++
 .../namenode/TestProtectedDirectories.java      | 373 +++++++++
 .../shortcircuit/TestShortCircuitLocalRead.java |   4 +-
 .../hadoop/hdfs/web/TestWebHDFSOAuth2.java      | 216 ++++++
 .../hdfs/web/oauth2/TestAccessTokenTimer.java   |  63 ++
 ...ClientCredentialTimeBasedTokenRefresher.java | 138 ++++
 ...TestRefreshTokenTimeBasedTokenRefresher.java | 138 ++++
 hadoop-project/src/site/site.xml                |   1 +
 .../org/apache/hadoop/fs/s3a/Constants.java     |   4 +-
 .../src/site/markdown/tools/hadoop-aws/index.md |   4 +-
 hadoop-yarn-project/CHANGES.txt                 |  10 +-
 .../hadoop/yarn/client/TestRMFailover.java      |  27 +
 .../hadoop/yarn/webapp/YarnWebParams.java       |   1 +
 .../scheduler/capacity/AbstractCSQueue.java     |  27 +
 .../scheduler/capacity/CSQueue.java             |  26 +
 .../scheduler/capacity/CapacityScheduler.java   |  40 +-
 .../scheduler/capacity/LeafQueue.java           |  16 +
 .../scheduler/common/fica/FiCaSchedulerApp.java |   9 +
 .../resourcemanager/webapp/RMWebAppFilter.java  |  90 ++-
 .../TestCapacitySchedulerNodeLabelUpdate.java   | 249 +++++-
 .../src/site/markdown/NodeLabel.md              | 140 ++++
 135 files changed, 8407 insertions(+), 5608 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
index 0000000,aa3e8ba..8f988af
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
@@@ -1,0 -1,102 +1,110 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs;
+ 
++import java.io.Closeable;
+ import java.io.IOException;
+ import java.util.EnumSet;
+ 
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.ByteBufferReadable;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
++import org.apache.hadoop.util.DataChecksum;
+ 
+ /**
+  * A BlockReader is responsible for reading a single block
+  * from a single datanode.
+  */
+ @InterfaceAudience.Private
 -public interface BlockReader extends ByteBufferReadable {
++public interface BlockReader extends ByteBufferReadable, Closeable {
+   
+ 
+   /* same interface as inputStream java.io.InputStream#read()
+    * used by DFSInputStream#read()
+    * This violates one rule when there is a checksum error:
+    * "Read should not modify user buffer before successful read"
+    * because it first reads the data to user buffer and then checks
+    * the checksum.
+    * Note: this must return -1 on EOF, even in the case of a 0-byte read.
+    * See HDFS-5762 for details.
+    */
+   int read(byte[] buf, int off, int len) throws IOException;
+ 
+   /**
+    * Skip the given number of bytes
+    */
+   long skip(long n) throws IOException;
+ 
+   /**
+    * Returns an estimate of the number of bytes that can be read
+    * (or skipped over) from this input stream without performing
+    * network I/O.
+    * This may return more than what is actually present in the block.
+    */
+   int available() throws IOException;
+ 
+   /**
+    * Close the block reader.
+    *
+    * @throws IOException
+    */
++  @Override // java.io.Closeable
+   void close() throws IOException;
+ 
+   /**
+    * Read exactly the given amount of data, throwing an exception
+    * if EOF is reached before that amount
+    */
+   void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException;
+ 
+   /**
+    * Similar to {@link #readFully(byte[], int, int)} except that it will
+    * not throw an exception on EOF. However, it differs from the simple
+    * {@link #read(byte[], int, int)} call in that it is guaranteed to
+    * read the data if it is available. In other words, if this call
+    * does not throw an exception, then either the buffer has been
+    * filled or the next call will return EOF.
+    */
+   int readAll(byte[] buf, int offset, int len) throws IOException;
+ 
+   /**
+    * @return              true only if this is a local read.
+    */
+   boolean isLocal();
+   
+   /**
+    * @return              true only if this is a short-circuit read.
+    *                      All short-circuit reads are also local.
+    */
+   boolean isShortCircuit();
+ 
+   /**
+    * Get a ClientMmap object for this BlockReader.
+    *
+    * @param opts          The read options to use.
+    * @return              The ClientMmap object, or null if mmap is not
+    *                      supported.
+    */
+   ClientMmap getClientMmap(EnumSet<ReadOption> opts);
++
++  /**
++   * @return              The DataChecksum used by the read block
++   */
++  DataChecksum getDataChecksum();
+ }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index 0000000,2a0e21b..8d7f294
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@@ -1,0 -1,743 +1,748 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.nio.channels.FileChannel;
+ import java.util.EnumSet;
+ 
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.fs.StorageType;
+ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+ import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.hadoop.util.DirectBufferPool;
+ import org.apache.htrace.Sampler;
+ import org.apache.htrace.Trace;
+ import org.apache.htrace.TraceScope;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * BlockReaderLocal enables local short circuited reads. If the DFS client is on
+  * the same machine as the datanode, then the client can read files directly
+  * from the local file system rather than going through the datanode for better
+  * performance. <br>
+  * {@link BlockReaderLocal} works as follows:
+  * <ul>
+  * <li>The client performing short circuit reads must be configured at the
+  * datanode.</li>
+  * <li>The client gets the file descriptors for the metadata file and the data 
+  * file for the block using
+  * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
+  * </li>
+  * <li>The client reads the file descriptors.</li>
+  * </ul>
+  */
+ @InterfaceAudience.Private
+ class BlockReaderLocal implements BlockReader {
+   static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class);
+ 
+   private static final DirectBufferPool bufferPool = new DirectBufferPool();
+ 
+   public static class Builder {
+     private final int bufferSize;
+     private boolean verifyChecksum;
+     private int maxReadahead;
+     private String filename;
+     private ShortCircuitReplica replica;
+     private long dataPos;
+     private ExtendedBlock block;
+     private StorageType storageType;
+ 
+     public Builder(ShortCircuitConf conf) {
+       this.maxReadahead = Integer.MAX_VALUE;
+       this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
+       this.bufferSize = conf.getShortCircuitBufferSize();
+     }
+ 
+     public Builder setVerifyChecksum(boolean verifyChecksum) {
+       this.verifyChecksum = verifyChecksum;
+       return this;
+     }
+ 
+     public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
+       long readahead = cachingStrategy.getReadahead() != null ?
+           cachingStrategy.getReadahead() :
+               HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
+       this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
+       return this;
+     }
+ 
+     public Builder setFilename(String filename) {
+       this.filename = filename;
+       return this;
+     }
+ 
+     public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
+       this.replica = replica;
+       return this;
+     }
+ 
+     public Builder setStartOffset(long startOffset) {
+       this.dataPos = Math.max(0, startOffset);
+       return this;
+     }
+ 
+     public Builder setBlock(ExtendedBlock block) {
+       this.block = block;
+       return this;
+     }
+ 
+     public Builder setStorageType(StorageType storageType) {
+       this.storageType = storageType;
+       return this;
+     }
+ 
+     public BlockReaderLocal build() {
+       Preconditions.checkNotNull(replica);
+       return new BlockReaderLocal(this);
+     }
+   }
+ 
+   private boolean closed = false;
+ 
+   /**
+    * Pair of streams for this block.
+    */
+   private final ShortCircuitReplica replica;
+ 
+   /**
+    * The data FileChannel.
+    */
+   private final FileChannel dataIn;
+ 
+   /**
+    * The next place we'll read from in the block data FileChannel.
+    *
+    * If data is buffered in dataBuf, this offset will be larger than the
+    * offset of the next byte which a read() operation will give us.
+    */
+   private long dataPos;
+ 
+   /**
+    * The Checksum FileChannel.
+    */
+   private final FileChannel checksumIn;
+   
+   /**
+    * Checksum type and size.
+    */
+   private final DataChecksum checksum;
+ 
+   /**
+    * If false, we will always skip the checksum.
+    */
+   private final boolean verifyChecksum;
+ 
+   /**
+    * Name of the block, for logging purposes.
+    */
+   private final String filename;
+   
+   /**
+    * Block ID and Block Pool ID.
+    */
+   private final ExtendedBlock block;
+   
+   /**
+    * Cache of Checksum#bytesPerChecksum.
+    */
+   private final int bytesPerChecksum;
+ 
+   /**
+    * Cache of Checksum#checksumSize.
+    */
+   private final int checksumSize;
+ 
+   /**
+    * Maximum number of chunks to allocate.
+    *
+    * This is used to allocate dataBuf and checksumBuf, in the event that
+    * we need them.
+    */
+   private final int maxAllocatedChunks;
+ 
+   /**
+    * True if zero readahead was requested.
+    */
+   private final boolean zeroReadaheadRequested;
+ 
+   /**
+    * Maximum amount of readahead we'll do.  This will always be at least the,
+    * size of a single chunk, even if {@link #zeroReadaheadRequested} is true.
+    * The reason is because we need to do a certain amount of buffering in order
+    * to do checksumming.
+    * 
+    * This determines how many bytes we'll use out of dataBuf and checksumBuf.
+    * Why do we allocate buffers, and then (potentially) only use part of them?
+    * The rationale is that allocating a lot of buffers of different sizes would
+    * make it very difficult for the DirectBufferPool to re-use buffers. 
+    */
+   private final int maxReadaheadLength;
+ 
+   /**
+    * Buffers data starting at the current dataPos and extending on
+    * for dataBuf.limit().
+    *
+    * This may be null if we don't need it.
+    */
+   private ByteBuffer dataBuf;
+ 
+   /**
+    * Buffers checksums starting at the current checksumPos and extending on
+    * for checksumBuf.limit().
+    *
+    * This may be null if we don't need it.
+    */
+   private ByteBuffer checksumBuf;
+ 
+   /**
+    * StorageType of replica on DataNode.
+    */
+   private StorageType storageType;
+ 
+   private BlockReaderLocal(Builder builder) {
+     this.replica = builder.replica;
+     this.dataIn = replica.getDataStream().getChannel();
+     this.dataPos = builder.dataPos;
+     this.checksumIn = replica.getMetaStream().getChannel();
+     BlockMetadataHeader header = builder.replica.getMetaHeader();
+     this.checksum = header.getChecksum();
+     this.verifyChecksum = builder.verifyChecksum &&
+         (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
+     this.filename = builder.filename;
+     this.block = builder.block;
+     this.bytesPerChecksum = checksum.getBytesPerChecksum();
+     this.checksumSize = checksum.getChecksumSize();
+ 
+     this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
+         ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
+     // Calculate the effective maximum readahead.
+     // We can't do more readahead than there is space in the buffer.
+     int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
+         ((Math.min(builder.bufferSize, builder.maxReadahead) +
+             bytesPerChecksum - 1) / bytesPerChecksum);
+     if (maxReadaheadChunks == 0) {
+       this.zeroReadaheadRequested = true;
+       maxReadaheadChunks = 1;
+     } else {
+       this.zeroReadaheadRequested = false;
+     }
+     this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
+     this.storageType = builder.storageType;
+   }
+ 
+   private synchronized void createDataBufIfNeeded() {
+     if (dataBuf == null) {
+       dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
+       dataBuf.position(0);
+       dataBuf.limit(0);
+     }
+   }
+ 
+   private synchronized void freeDataBufIfExists() {
+     if (dataBuf != null) {
+       // When disposing of a dataBuf, we have to move our stored file index
+       // backwards.
+       dataPos -= dataBuf.remaining();
+       dataBuf.clear();
+       bufferPool.returnBuffer(dataBuf);
+       dataBuf = null;
+     }
+   }
+ 
+   private synchronized void createChecksumBufIfNeeded() {
+     if (checksumBuf == null) {
+       checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
+       checksumBuf.position(0);
+       checksumBuf.limit(0);
+     }
+   }
+ 
+   private synchronized void freeChecksumBufIfExists() {
+     if (checksumBuf != null) {
+       checksumBuf.clear();
+       bufferPool.returnBuffer(checksumBuf);
+       checksumBuf = null;
+     }
+   }
+ 
+   private synchronized int drainDataBuf(ByteBuffer buf) {
+     if (dataBuf == null) return -1;
+     int oldLimit = dataBuf.limit();
+     int nRead = Math.min(dataBuf.remaining(), buf.remaining());
+     if (nRead == 0) {
+       return (dataBuf.remaining() == 0) ? -1 : 0;
+     }
+     try {
+       dataBuf.limit(dataBuf.position() + nRead);
+       buf.put(dataBuf);
+     } finally {
+       dataBuf.limit(oldLimit);
+     }
+     return nRead;
+   }
+ 
+   /**
+    * Read from the block file into a buffer.
+    *
+    * This function overwrites checksumBuf.  It will increment dataPos.
+    *
+    * @param buf   The buffer to read into.  May be dataBuf.
+    *              The position and limit of this buffer should be set to
+    *              multiples of the checksum size.
+    * @param canSkipChecksum  True if we can skip checksumming.
+    *
+    * @return      Total bytes read.  0 on EOF.
+    */
+   private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
+       throws IOException {
+     TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" +
+         block.getBlockId() + ")", Sampler.NEVER);
+     try {
+       int total = 0;
+       long startDataPos = dataPos;
+       int startBufPos = buf.position();
+       while (buf.hasRemaining()) {
+         int nRead = dataIn.read(buf, dataPos);
+         if (nRead < 0) {
+           break;
+         }
+         dataPos += nRead;
+         total += nRead;
+       }
+       if (canSkipChecksum) {
+         freeChecksumBufIfExists();
+         return total;
+       }
+       if (total > 0) {
+         try {
+           buf.limit(buf.position());
+           buf.position(startBufPos);
+           createChecksumBufIfNeeded();
+           int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
+           checksumBuf.clear();
+           checksumBuf.limit(checksumsNeeded * checksumSize);
+           long checksumPos = BlockMetadataHeader.getHeaderSize()
+               + ((startDataPos / bytesPerChecksum) * checksumSize);
+           while (checksumBuf.hasRemaining()) {
+             int nRead = checksumIn.read(checksumBuf, checksumPos);
+             if (nRead < 0) {
+               throw new IOException("Got unexpected checksum file EOF at " +
+                   checksumPos + ", block file position " + startDataPos + " for " +
+                   "block " + block + " of file " + filename);
+             }
+             checksumPos += nRead;
+           }
+           checksumBuf.flip();
+ 
+           checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
+         } finally {
+           buf.position(buf.limit());
+         }
+       }
+       return total;
+     } finally {
+       scope.close();
+     }
+   }
+ 
+   private boolean createNoChecksumContext() {
+     if (verifyChecksum) {
+       if (storageType != null && storageType.isTransient()) {
+         // Checksums are not stored for replicas on transient storage.  We do not
+         // anchor, because we do not intend for client activity to block eviction
+         // from transient storage on the DataNode side.
+         return true;
+       } else {
+         return replica.addNoChecksumAnchor();
+       }
+     } else {
+       return true;
+     }
+   }
+ 
+   private void releaseNoChecksumContext() {
+     if (verifyChecksum) {
+       if (storageType == null || !storageType.isTransient()) {
+         replica.removeNoChecksumAnchor();
+       }
+     }
+   }
+ 
+   @Override
+   public synchronized int read(ByteBuffer buf) throws IOException {
+     boolean canSkipChecksum = createNoChecksumContext();
+     try {
+       String traceString = null;
+       if (LOG.isTraceEnabled()) {
+         traceString = new StringBuilder().
+             append("read(").
+             append("buf.remaining=").append(buf.remaining()).
+             append(", block=").append(block).
+             append(", filename=").append(filename).
+             append(", canSkipChecksum=").append(canSkipChecksum).
+             append(")").toString();
+         LOG.info(traceString + ": starting");
+       }
+       int nRead;
+       try {
+         if (canSkipChecksum && zeroReadaheadRequested) {
+           nRead = readWithoutBounceBuffer(buf);
+         } else {
+           nRead = readWithBounceBuffer(buf, canSkipChecksum);
+         }
+       } catch (IOException e) {
+         if (LOG.isTraceEnabled()) {
+           LOG.info(traceString + ": I/O error", e);
+         }
+         throw e;
+       }
+       if (LOG.isTraceEnabled()) {
+         LOG.info(traceString + ": returning " + nRead);
+       }
+       return nRead;
+     } finally {
+       if (canSkipChecksum) releaseNoChecksumContext();
+     }
+   }
+ 
+   private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
+       throws IOException {
+     freeDataBufIfExists();
+     freeChecksumBufIfExists();
+     int total = 0;
+     while (buf.hasRemaining()) {
+       int nRead = dataIn.read(buf, dataPos);
+       if (nRead <= 0) break;
+       dataPos += nRead;
+       total += nRead;
+     }
+     return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
+   }
+ 
+   /**
+    * Fill the data buffer.  If necessary, validate the data against the
+    * checksums.
+    * 
+    * We always want the offsets of the data contained in dataBuf to be
+    * aligned to the chunk boundary.  If we are validating checksums, we
+    * accomplish this by seeking backwards in the file until we're on a
+    * chunk boundary.  (This is necessary because we can't checksum a
+    * partial chunk.)  If we are not validating checksums, we simply only
+    * fill the latter part of dataBuf.
+    * 
+    * @param canSkipChecksum  true if we can skip checksumming.
+    * @return                 true if we hit EOF.
+    * @throws IOException
+    */
+   private synchronized boolean fillDataBuf(boolean canSkipChecksum)
+       throws IOException {
+     createDataBufIfNeeded();
+     final int slop = (int)(dataPos % bytesPerChecksum);
+     final long oldDataPos = dataPos;
+     dataBuf.limit(maxReadaheadLength);
+     if (canSkipChecksum) {
+       dataBuf.position(slop);
+       fillBuffer(dataBuf, canSkipChecksum);
+     } else {
+       dataPos -= slop;
+       dataBuf.position(0);
+       fillBuffer(dataBuf, canSkipChecksum);
+     }
+     dataBuf.limit(dataBuf.position());
+     dataBuf.position(Math.min(dataBuf.position(), slop));
+     if (LOG.isTraceEnabled()) {
+       LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
+           "buffer from offset " + oldDataPos + " of " + block);
+     }
+     return dataBuf.limit() != maxReadaheadLength;
+   }
+ 
+   /**
+    * Read using the bounce buffer.
+    *
+    * A 'direct' read actually has three phases. The first drains any
+    * remaining bytes from the slow read buffer. After this the read is
+    * guaranteed to be on a checksum chunk boundary. If there are still bytes
+    * to read, the fast direct path is used for as many remaining bytes as
+    * possible, up to a multiple of the checksum chunk size. Finally, any
+    * 'odd' bytes remaining at the end of the read cause another slow read to
+    * be issued, which involves an extra copy.
+    *
+    * Every 'slow' read tries to fill the slow read buffer in one go for
+    * efficiency's sake. As described above, all non-checksum-chunk-aligned
+    * reads will be served from the slower read path.
+    *
+    * @param buf              The buffer to read into. 
+    * @param canSkipChecksum  True if we can skip checksums.
+    */
+   private synchronized int readWithBounceBuffer(ByteBuffer buf,
+         boolean canSkipChecksum) throws IOException {
+     int total = 0;
+     int bb = drainDataBuf(buf); // drain bounce buffer if possible
+     if (bb >= 0) {
+       total += bb;
+       if (buf.remaining() == 0) return total;
+     }
+     boolean eof = true, done = false;
+     do {
+       if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
+             && ((dataPos % bytesPerChecksum) == 0)) {
+         // Fast lane: try to read directly into user-supplied buffer, bypassing
+         // bounce buffer.
+         int oldLimit = buf.limit();
+         int nRead;
+         try {
+           buf.limit(buf.position() + maxReadaheadLength);
+           nRead = fillBuffer(buf, canSkipChecksum);
+         } finally {
+           buf.limit(oldLimit);
+         }
+         if (nRead < maxReadaheadLength) {
+           done = true;
+         }
+         if (nRead > 0) {
+           eof = false;
+         }
+         total += nRead;
+       } else {
+         // Slow lane: refill bounce buffer.
+         if (fillDataBuf(canSkipChecksum)) {
+           done = true;
+         }
+         bb = drainDataBuf(buf); // drain bounce buffer if possible
+         if (bb >= 0) {
+           eof = false;
+           total += bb;
+         }
+       }
+     } while ((!done) && (buf.remaining() > 0));
+     return (eof && total == 0) ? -1 : total;
+   }
+ 
+   @Override
+   public synchronized int read(byte[] arr, int off, int len)
+         throws IOException {
+     boolean canSkipChecksum = createNoChecksumContext();
+     int nRead;
+     try {
+       String traceString = null;
+       if (LOG.isTraceEnabled()) {
+         traceString = new StringBuilder().
+             append("read(arr.length=").append(arr.length).
+             append(", off=").append(off).
+             append(", len=").append(len).
+             append(", filename=").append(filename).
+             append(", block=").append(block).
+             append(", canSkipChecksum=").append(canSkipChecksum).
+             append(")").toString();
+         LOG.trace(traceString + ": starting");
+       }
+       try {
+         if (canSkipChecksum && zeroReadaheadRequested) {
+           nRead = readWithoutBounceBuffer(arr, off, len);
+         } else {
+           nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+         }
+       } catch (IOException e) {
+         if (LOG.isTraceEnabled()) {
+           LOG.trace(traceString + ": I/O error", e);
+         }
+         throw e;
+       }
+       if (LOG.isTraceEnabled()) {
+         LOG.trace(traceString + ": returning " + nRead);
+       }
+     } finally {
+       if (canSkipChecksum) releaseNoChecksumContext();
+     }
+     return nRead;
+   }
+ 
+   private synchronized int readWithoutBounceBuffer(byte arr[], int off,
+         int len) throws IOException {
+     freeDataBufIfExists();
+     freeChecksumBufIfExists();
+     int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
+     if (nRead > 0) {
+       dataPos += nRead;
+     } else if ((nRead == 0) && (dataPos == dataIn.size())) {
+       return -1;
+     }
+     return nRead;
+   }
+ 
+   private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
+         boolean canSkipChecksum) throws IOException {
+     createDataBufIfNeeded();
+     if (!dataBuf.hasRemaining()) {
+       dataBuf.position(0);
+       dataBuf.limit(maxReadaheadLength);
+       fillDataBuf(canSkipChecksum);
+     }
+     if (dataBuf.remaining() == 0) return -1;
+     int toRead = Math.min(dataBuf.remaining(), len);
+     dataBuf.get(arr, off, toRead);
+     return toRead;
+   }
+ 
+   @Override
+   public synchronized long skip(long n) throws IOException {
+     int discardedFromBuf = 0;
+     long remaining = n;
+     if ((dataBuf != null) && dataBuf.hasRemaining()) {
+       discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
+       dataBuf.position(dataBuf.position() + discardedFromBuf);
+       remaining -= discardedFromBuf;
+     }
+     if (LOG.isTraceEnabled()) {
+       LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + 
+         filename + "): discarded " + discardedFromBuf + " bytes from " +
+         "dataBuf and advanced dataPos by " + remaining);
+     }
+     dataPos += remaining;
+     return n;
+   }
+ 
+   @Override
+   public int available() throws IOException {
+     // We never do network I/O in BlockReaderLocal.
+     return Integer.MAX_VALUE;
+   }
+ 
+   @Override
+   public synchronized void close() throws IOException {
+     if (closed) return;
+     closed = true;
+     if (LOG.isTraceEnabled()) {
+       LOG.trace("close(filename=" + filename + ", block=" + block + ")");
+     }
+     replica.unref();
+     freeDataBufIfExists();
+     freeChecksumBufIfExists();
+   }
+ 
+   @Override
+   public synchronized void readFully(byte[] arr, int off, int len)
+       throws IOException {
+     BlockReaderUtil.readFully(this, arr, off, len);
+   }
+ 
+   @Override
+   public synchronized int readAll(byte[] buf, int off, int len)
+       throws IOException {
+     return BlockReaderUtil.readAll(this, buf, off, len);
+   }
+ 
+   @Override
+   public boolean isLocal() {
+     return true;
+   }
+ 
+   @Override
+   public boolean isShortCircuit() {
+     return true;
+   }
+ 
+   /**
+    * Get or create a memory map for this replica.
+    * 
+    * There are two kinds of ClientMmap objects we could fetch here: one that 
+    * will always read pre-checksummed data, and one that may read data that
+    * hasn't been checksummed.
+    *
+    * If we fetch the former, "safe" kind of ClientMmap, we have to increment
+    * the anchor count on the shared memory slot.  This will tell the DataNode
+    * not to munlock the block until this ClientMmap is closed.
+    * If we fetch the latter, we don't bother with anchoring.
+    *
+    * @param opts     The options to use, such as SKIP_CHECKSUMS.
+    * 
+    * @return         null on failure; the ClientMmap otherwise.
+    */
+   @Override
+   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+     boolean anchor = verifyChecksum &&
+         (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
+     if (anchor) {
+       if (!createNoChecksumContext()) {
+         if (LOG.isTraceEnabled()) {
+           LOG.trace("can't get an mmap for " + block + " of " + filename + 
+               " since SKIP_CHECKSUMS was not given, " +
+               "we aren't skipping checksums, and the block is not mlocked.");
+         }
+         return null;
+       }
+     }
+     ClientMmap clientMmap = null;
+     try {
+       clientMmap = replica.getOrCreateClientMmap(anchor);
+     } finally {
+       if ((clientMmap == null) && anchor) {
+         releaseNoChecksumContext();
+       }
+     }
+     return clientMmap;
+   }
+   
+   @VisibleForTesting
+   boolean getVerifyChecksum() {
+     return this.verifyChecksum;
+   }
+ 
+   @VisibleForTesting
+   int getMaxReadaheadLength() {
+     return this.maxReadaheadLength;
+   }
+   
+   /**
+    * Make the replica anchorable.  Normally this can only be done by the
+    * DataNode.  This method is only for testing.
+    */
+   @VisibleForTesting
+   void forceAnchorable() {
+     replica.getSlot().makeAnchorable();
+   }
+ 
+   /**
+    * Make the replica unanchorable.  Normally this can only be done by the
+    * DataNode.  This method is only for testing.
+    */
+   @VisibleForTesting
+   void forceUnanchorable() {
+     replica.getSlot().makeUnanchorable();
+   }
++
++  @Override
++  public DataChecksum getDataChecksum() {
++    return checksum;
++  }
+ }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index 0000000,eea3f06..9920438
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@@ -1,0 -1,738 +1,743 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs;
+ 
+ import java.io.DataInputStream;
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.security.PrivilegedExceptionAction;
+ import java.util.Collections;
+ import java.util.EnumSet;
+ import java.util.HashMap;
+ import java.util.LinkedHashMap;
+ import java.util.Map;
+ 
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.fs.StorageType;
+ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+ import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+ import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+ import org.apache.hadoop.hdfs.util.IOUtilsClient;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.ipc.RPC;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.hadoop.util.DirectBufferPool;
+ import org.apache.htrace.Sampler;
+ import org.apache.htrace.Trace;
+ import org.apache.htrace.TraceScope;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on
+  * the same machine as the datanode, then the client can read files directly
+  * from the local file system rather than going through the datanode for better
+  * performance. <br>
+  *
+  * This is the legacy implementation based on HDFS-2246, which requires
+  * permissions on the datanode to be set so that clients can directly access the
+  * blocks. The new implementation based on HDFS-347 should be preferred on UNIX
+  * systems where the required native code has been implemented.<br>
+  *
+  * {@link BlockReaderLocalLegacy} works as follows:
+  * <ul>
+  * <li>The client performing short circuit reads must be configured at the
+  * datanode.</li>
+  * <li>The client gets the path to the file where block is stored using
+  * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
+  * RPC call</li>
+  * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+  * if security is enabled.</li>
+  * </ul>
+  */
+ @InterfaceAudience.Private
+ class BlockReaderLocalLegacy implements BlockReader {
+   private static final Logger LOG = LoggerFactory.getLogger(
+       BlockReaderLocalLegacy.class);
+ 
+   //Stores the cache and proxy for a local datanode.
+   private static class LocalDatanodeInfo {
+     private ClientDatanodeProtocol proxy = null;
+     private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
+ 
+     LocalDatanodeInfo() {
+       final int cacheSize = 10000;
+       final float hashTableLoadFactor = 0.75f;
+       int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
+       cache = Collections
+           .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
+               hashTableCapacity, hashTableLoadFactor, true) {
+             private static final long serialVersionUID = 1;
+ 
+             @Override
+             protected boolean removeEldestEntry(
+                 Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
+               return size() > cacheSize;
+             }
+           });
+     }
+ 
+     private synchronized ClientDatanodeProtocol getDatanodeProxy(
+         UserGroupInformation ugi, final DatanodeInfo node,
+         final Configuration conf, final int socketTimeout,
+         final boolean connectToDnViaHostname) throws IOException {
+       if (proxy == null) {
+         try {
+           proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+             @Override
+             public ClientDatanodeProtocol run() throws Exception {
+               return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf,
+                   socketTimeout, connectToDnViaHostname);
+             }
+           });
+         } catch (InterruptedException e) {
+           LOG.warn("encountered exception ", e);
+         }
+       }
+       return proxy;
+     }
+     
+     private synchronized void resetDatanodeProxy() {
+       if (null != proxy) {
+         RPC.stopProxy(proxy);
+         proxy = null;
+       }
+     }
+ 
+     private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
+       return cache.get(b);
+     }
+ 
+     private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
+       cache.put(b, info);
+     }
+ 
+     private void removeBlockLocalPathInfo(ExtendedBlock b) {
+       cache.remove(b);
+     }
+   }
+   
+   // Multiple datanodes could be running on the local machine. Store proxies in
+   // a map keyed by the ipc port of the datanode.
+   private static final Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+ 
+   private final FileInputStream dataIn; // reader for the data file
+   private final FileInputStream checksumIn;   // reader for the checksum file
+ 
+   /**
+    * Offset from the most recent chunk boundary at which the next read should
+    * take place. Is only set to non-zero at construction time, and is
+    * decremented (usually to 0) by subsequent reads. This avoids having to do a
+    * checksum read at construction to position the read cursor correctly.
+    */
+   private int offsetFromChunkBoundary;
+   
+   private byte[] skipBuf = null;
+ 
+   /**
+    * Used for checksummed reads that need to be staged before copying to their
+    * output buffer because they are either a) smaller than the checksum chunk
+    * size or b) issued by the slower read(byte[]...) path
+    */
+   private ByteBuffer slowReadBuff = null;
+   private ByteBuffer checksumBuff = null;
+   private DataChecksum checksum;
+   private final boolean verifyChecksum;
+ 
+   private static final DirectBufferPool bufferPool = new DirectBufferPool();
+ 
+   private final int bytesPerChecksum;
+   private final int checksumSize;
+ 
+   /** offset in block where reader wants to actually read */
+   private long startOffset;
+   private final String filename;
+   private long blockId;
+   
+   /**
+    * The only way this object can be instantiated.
+    */
+   static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
+       UserGroupInformation userGroupInformation,
+       Configuration configuration, String file, ExtendedBlock blk,
+       Token<BlockTokenIdentifier> token, DatanodeInfo node, 
+       long startOffset, long length, StorageType storageType)
+       throws IOException {
+     final ShortCircuitConf scConf = conf.getShortCircuitConf();
+     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
+         .getIpcPort());
+     // check the cache first
+     BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+     if (pathinfo == null) {
+       if (userGroupInformation == null) {
+         userGroupInformation = UserGroupInformation.getCurrentUser();
+       }
+       pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
+           configuration, conf.getSocketTimeout(), token,
+           conf.isConnectToDnViaHostname(), storageType);
+     }
+ 
+     // check to see if the file exists. It may so happen that the
+     // HDFS file has been deleted and this block-lookup is occurring
+     // on behalf of a new HDFS file. This time, the block file could
+     // be residing in a different portion of the fs.data.dir directory.
+     // In this case, we remove this entry from the cache. The next
+     // call to this method will re-populate the cache.
+     FileInputStream dataIn = null;
+     FileInputStream checksumIn = null;
+     BlockReaderLocalLegacy localBlockReader = null;
+     final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums()
+         || storageType.isTransient();
+     try {
+       // get a local file system
+       File blkfile = new File(pathinfo.getBlockPath());
+       dataIn = new FileInputStream(blkfile);
+ 
+       if (LOG.isDebugEnabled()) {
+         LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size "
+             + blkfile.length() + " startOffset " + startOffset + " length "
+             + length + " short circuit checksum " + !skipChecksumCheck);
+       }
+ 
+       if (!skipChecksumCheck) {
+         // get the metadata file
+         File metafile = new File(pathinfo.getMetaPath());
+         checksumIn = new FileInputStream(metafile);
+ 
+         final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+             new DataInputStream(checksumIn), blk);
+         long firstChunkOffset = startOffset
+             - (startOffset % checksum.getBytesPerChecksum());
+         localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
+             startOffset, length, pathinfo, checksum, true, dataIn,
+             firstChunkOffset, checksumIn);
+       } else {
+         localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
+             startOffset, length, pathinfo, dataIn);
+       }
+     } catch (IOException e) {
+       // remove from cache
+       localDatanodeInfo.removeBlockLocalPathInfo(blk);
+       LOG.warn("BlockReaderLocalLegacy: Removing " + blk
+           + " from cache because local file " + pathinfo.getBlockPath()
+           + " could not be opened.");
+       throw e;
+     } finally {
+       if (localBlockReader == null) {
+         if (dataIn != null) {
+           dataIn.close();
+         }
+         if (checksumIn != null) {
+           checksumIn.close();
+         }
+       }
+     }
+     return localBlockReader;
+   }
+   
+   private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+     LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+     if (ldInfo == null) {
+       ldInfo = new LocalDatanodeInfo();
+       localDatanodeInfoMap.put(port, ldInfo);
+     }
+     return ldInfo;
+   }
+   
+   private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
+       ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
+       Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
+       StorageType storageType) throws IOException {
+     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
+     BlockLocalPathInfo pathinfo = null;
+     ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
+         conf, timeout, connectToDnViaHostname);
+     try {
+       // make RPC to local datanode to find local pathnames of blocks
+       pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+       // We cannot cache the path information for a replica on transient storage.
+       // If the replica gets evicted, then it moves to a different path.  Then,
+       // our next attempt to read from the cached path would fail to find the
+       // file.  Additionally, the failure would cause us to disable legacy
+       // short-circuit read for all subsequent use in the ClientContext.  Unlike
+       // the newer short-circuit read implementation, we have no communication
+       // channel for the DataNode to notify the client that the path has been
+       // invalidated.  Therefore, our only option is to skip caching.
+       if (pathinfo != null && !storageType.isTransient()) {
+         if (LOG.isDebugEnabled()) {
+           LOG.debug("Cached location of block " + blk + " as " + pathinfo);
+         }
+         localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+       }
+     } catch (IOException e) {
+       localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+       throw e;
+     }
+     return pathinfo;
+   }
+   
+   private static int getSlowReadBufferNumChunks(int bufferSizeBytes,
+       int bytesPerChecksum) {
+     if (bufferSizeBytes < bytesPerChecksum) {
+       throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
+           "buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
+           "a single chunk (" + bytesPerChecksum +  "). Please configure " +
+           HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY +
+           " appropriately");
+     }
+ 
+     // Round down to nearest chunk size
+     return bufferSizeBytes / bytesPerChecksum;
+   }
+ 
+   private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
+       ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
+       long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
+       throws IOException {
+     this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
+         DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
+         dataIn, startOffset, null);
+   }
+ 
+   private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
+       ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
+       long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
+       boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
+       FileInputStream checksumIn) throws IOException {
+     this.filename = hdfsfile;
+     this.checksum = checksum;
+     this.verifyChecksum = verifyChecksum;
+     this.startOffset = Math.max(startOffset, 0);
+     this.blockId = block.getBlockId();
+ 
+     bytesPerChecksum = this.checksum.getBytesPerChecksum();
+     checksumSize = this.checksum.getChecksumSize();
+ 
+     this.dataIn = dataIn;
+     this.checksumIn = checksumIn;
+     this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
+ 
+     final int chunksPerChecksumRead = getSlowReadBufferNumChunks(
+         conf.getShortCircuitBufferSize(), bytesPerChecksum);
+     slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+     checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+     // Initially the buffers have nothing to read.
+     slowReadBuff.flip();
+     checksumBuff.flip();
+     boolean success = false;
+     try {
+       // Skip both input streams to beginning of the chunk containing startOffset
+       IOUtils.skipFully(dataIn, firstChunkOffset);
+       if (checksumIn != null) {
+         long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+         IOUtils.skipFully(checksumIn, checkSumOffset);
+       }
+       success = true;
+     } finally {
+       if (!success) {
+         bufferPool.returnBuffer(slowReadBuff);
+         bufferPool.returnBuffer(checksumBuff);
+       }
+     }
+   }
+ 
+   /**
+    * Reads bytes into a buffer until EOF or the buffer's limit is reached
+    */
+   private int fillBuffer(FileInputStream stream, ByteBuffer buf)
+       throws IOException {
+     TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" +
+         blockId + ")", Sampler.NEVER);
+     try {
+       int bytesRead = stream.getChannel().read(buf);
+       if (bytesRead < 0) {
+         //EOF
+         return bytesRead;
+       }
+       while (buf.remaining() > 0) {
+         int n = stream.getChannel().read(buf);
+         if (n < 0) {
+           //EOF
+           return bytesRead;
+         }
+         bytesRead += n;
+       }
+       return bytesRead;
+     } finally {
+       scope.close();
+     }
+   }
+   
+   /**
+    * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
+    * another.
+    */
+   private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
+     int oldLimit = from.limit();
+     from.limit(from.position() + length);
+     try {
+       to.put(from);
+     } finally {
+       from.limit(oldLimit);
+     }
+   }
+ 
+   @Override
+   public synchronized int read(ByteBuffer buf) throws IOException {
+     int nRead = 0;
+     if (verifyChecksum) {
+       // A 'direct' read actually has three phases. The first drains any
+       // remaining bytes from the slow read buffer. After this the read is
+       // guaranteed to be on a checksum chunk boundary. If there are still bytes
+       // to read, the fast direct path is used for as many remaining bytes as
+       // possible, up to a multiple of the checksum chunk size. Finally, any
+       // 'odd' bytes remaining at the end of the read cause another slow read to
+       // be issued, which involves an extra copy.
+ 
+       // Every 'slow' read tries to fill the slow read buffer in one go for
+       // efficiency's sake. As described above, all non-checksum-chunk-aligned
+       // reads will be served from the slower read path.
+ 
+       if (slowReadBuff.hasRemaining()) {
+         // There are remaining bytes from a small read available. This usually
+         // means this read is unaligned, which falls back to the slow path.
+         int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining());
+         writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+         nRead += fromSlowReadBuff;
+       }
+ 
+       if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
+         // Since we have drained the 'small read' buffer, we are guaranteed to
+         // be chunk-aligned
+         int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
+ 
+         // There's only enough checksum buffer space available to checksum one
+         // entire slow read buffer. This saves keeping the number of checksum
+         // chunks around.
+         len = Math.min(len, slowReadBuff.capacity());
+         int oldlimit = buf.limit();
+         buf.limit(buf.position() + len);
+         int readResult = 0;
+         try {
+           readResult = doByteBufferRead(buf);
+         } finally {
+           buf.limit(oldlimit);
+         }
+         if (readResult == -1) {
+           return nRead;
+         } else {
+           nRead += readResult;
+           buf.position(buf.position() + readResult);
+         }
+       }
+ 
+       // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
+       // until chunk boundary
+       if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) {
+         int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary);
+         int readResult = fillSlowReadBuffer(toRead);
+         if (readResult == -1) {
+           return nRead;
+         } else {
+           int fromSlowReadBuff = Math.min(readResult, buf.remaining());
+           writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+           nRead += fromSlowReadBuff;
+         }
+       }
+     } else {
+       // Non-checksummed reads are much easier; we can just fill the buffer directly.
+       nRead = doByteBufferRead(buf);
+       if (nRead > 0) {
+         buf.position(buf.position() + nRead);
+       }
+     }
+     return nRead;
+   }
+ 
+   /**
+    * Tries to read as many bytes as possible into supplied buffer, checksumming
+    * each chunk if needed.
+    *
+    * <b>Preconditions:</b>
+    * <ul>
+    * <li>
+    * If checksumming is enabled, buf.remaining must be a multiple of
+    * bytesPerChecksum. Note that this is not a requirement for clients of
+    * read(ByteBuffer) - in the case of non-checksum-sized read requests,
+    * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
+    * method.
+    * </li>
+    * </ul>
+    * <b>Postconditions:</b>
+    * <ul>
+    * <li>buf.limit and buf.mark are unchanged.</li>
+    * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
+    * requested bytes can be read straight from the buffer</li>
+    * </ul>
+    *
+    * @param buf
+    *          byte buffer to write bytes to. If checksums are not required, buf
+    *          can have any number of bytes remaining, otherwise there must be a
+    *          multiple of the checksum chunk size remaining.
+    * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
+    *         that is, the the number of useful bytes (up to the amount
+    *         requested) readable from the buffer by the client.
+    */
+   private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
+     if (verifyChecksum) {
+       assert buf.remaining() % bytesPerChecksum == 0;
+     }
+     int dataRead = -1;
+ 
+     int oldpos = buf.position();
+     // Read as much as we can into the buffer.
+     dataRead = fillBuffer(dataIn, buf);
+ 
+     if (dataRead == -1) {
+       return -1;
+     }
+ 
+     if (verifyChecksum) {
+       ByteBuffer toChecksum = buf.duplicate();
+       toChecksum.position(oldpos);
+       toChecksum.limit(oldpos + dataRead);
+ 
+       checksumBuff.clear();
+       // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
+       int numChunks =
+         (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
+       checksumBuff.limit(checksumSize * numChunks);
+ 
+       fillBuffer(checksumIn, checksumBuff);
+       checksumBuff.flip();
+ 
+       checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
+           this.startOffset);
+     }
+ 
+     if (dataRead >= 0) {
+       buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
+     }
+ 
+     if (dataRead < offsetFromChunkBoundary) {
+       // yikes, didn't even get enough bytes to honour offset. This can happen
+       // even if we are verifying checksums if we are at EOF.
+       offsetFromChunkBoundary -= dataRead;
+       dataRead = 0;
+     } else {
+       dataRead -= offsetFromChunkBoundary;
+       offsetFromChunkBoundary = 0;
+     }
+ 
+     return dataRead;
+   }
+ 
+   /**
+    * Ensures that up to len bytes are available and checksummed in the slow read
+    * buffer. The number of bytes available to read is returned. If the buffer is
+    * not already empty, the number of remaining bytes is returned and no actual
+    * read happens.
+    *
+    * @param len
+    *          the maximum number of bytes to make available. After len bytes
+    *          are read, the underlying bytestream <b>must</b> be at a checksum
+    *          boundary, or EOF. That is, (len + currentPosition) %
+    *          bytesPerChecksum == 0.
+    * @return the number of bytes available to read, or -1 if EOF.
+    */
+   private synchronized int fillSlowReadBuffer(int len) throws IOException {
+     int nRead = -1;
+     if (slowReadBuff.hasRemaining()) {
+       // Already got data, good to go.
+       nRead = Math.min(len, slowReadBuff.remaining());
+     } else {
+       // Round a complete read of len bytes (plus any implicit offset) to the
+       // next chunk boundary, since we try and read in multiples of a chunk
+       int nextChunk = len + offsetFromChunkBoundary +
+           (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum));
+       int limit = Math.min(nextChunk, slowReadBuff.capacity());
+       assert limit % bytesPerChecksum == 0;
+ 
+       slowReadBuff.clear();
+       slowReadBuff.limit(limit);
+ 
+       nRead = doByteBufferRead(slowReadBuff);
+ 
+       if (nRead > 0) {
+         // So that next time we call slowReadBuff.hasRemaining(), we don't get a
+         // false positive.
+         slowReadBuff.limit(nRead + slowReadBuff.position());
+       }
+     }
+     return nRead;
+   }
+ 
+   @Override
+   public synchronized int read(byte[] buf, int off, int len) throws IOException {
+     if (LOG.isTraceEnabled()) {
+       LOG.trace("read off " + off + " len " + len);
+     }
+     if (!verifyChecksum) {
+       return dataIn.read(buf, off, len);
+     }
+ 
+     int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
+ 
+     if (nRead > 0) {
+       // Possible that buffer is filled with a larger read than we need, since
+       // we tried to read as much as possible at once
+       nRead = Math.min(len, nRead);
+       slowReadBuff.get(buf, off, nRead);
+     }
+ 
+     return nRead;
+   }
+ 
+   @Override
+   public synchronized long skip(long n) throws IOException {
+     if (LOG.isDebugEnabled()) {
+       LOG.debug("skip " + n);
+     }
+     if (n <= 0) {
+       return 0;
+     }
+     if (!verifyChecksum) {
+       return dataIn.skip(n);
+     }
+   
+     // caller made sure newPosition is not beyond EOF.
+     int remaining = slowReadBuff.remaining();
+     int position = slowReadBuff.position();
+     int newPosition = position + (int)n;
+   
+     // if the new offset is already read into dataBuff, just reposition
+     if (n <= remaining) {
+       assert offsetFromChunkBoundary == 0;
+       slowReadBuff.position(newPosition);
+       return n;
+     }
+   
+     // for small gap, read through to keep the data/checksum in sync
+     if (n - remaining <= bytesPerChecksum) {
+       slowReadBuff.position(position + remaining);
+       if (skipBuf == null) {
+         skipBuf = new byte[bytesPerChecksum];
+       }
+       int ret = read(skipBuf, 0, (int)(n - remaining));
+       return (remaining + ret);
+     }
+   
+     // optimize for big gap: discard the current buffer, skip to
+     // the beginning of the appropriate checksum chunk and then
+     // read to the middle of that chunk to be in sync with checksums.
+   
+     // We can't use this.offsetFromChunkBoundary because we need to know how
+     // many bytes of the offset were really read. Calling read(..) with a
+     // positive this.offsetFromChunkBoundary causes that many bytes to get
+     // silently skipped.
+     int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
+     long toskip = n - remaining - myOffsetFromChunkBoundary;
+ 
+     slowReadBuff.position(slowReadBuff.limit());
+     checksumBuff.position(checksumBuff.limit());
+   
+     IOUtils.skipFully(dataIn, toskip);
+     long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
+     IOUtils.skipFully(checksumIn, checkSumOffset);
+ 
+     // read into the middle of the chunk
+     if (skipBuf == null) {
+       skipBuf = new byte[bytesPerChecksum];
+     }
+     assert skipBuf.length == bytesPerChecksum;
+     assert myOffsetFromChunkBoundary < bytesPerChecksum;
+ 
+     int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+ 
+     if (ret == -1) {  // EOS
+       return (toskip + remaining);
+     } else {
+       return (toskip + remaining + ret);
+     }
+   }
+ 
+   @Override
+   public synchronized void close() throws IOException {
+     IOUtilsClient.cleanup(LOG, dataIn, checksumIn);
+     if (slowReadBuff != null) {
+       bufferPool.returnBuffer(slowReadBuff);
+       slowReadBuff = null;
+     }
+     if (checksumBuff != null) {
+       bufferPool.returnBuffer(checksumBuff);
+       checksumBuff = null;
+     }
+     startOffset = -1;
+     checksum = null;
+   }
+ 
+   @Override
+   public int readAll(byte[] buf, int offset, int len) throws IOException {
+     return BlockReaderUtil.readAll(this, buf, offset, len);
+   }
+ 
+   @Override
+   public void readFully(byte[] buf, int off, int len) throws IOException {
+     BlockReaderUtil.readFully(this, buf, off, len);
+   }
+ 
+   @Override
+   public int available() throws IOException {
+     // We never do network I/O in BlockReaderLocalLegacy.
+     return Integer.MAX_VALUE;
+   }
+ 
+   @Override
+   public boolean isLocal() {
+     return true;
+   }
+   
+   @Override
+   public boolean isShortCircuit() {
+     return true;
+   }
+ 
+   @Override
+   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+     return null;
+   }
++
++  @Override
++  public DataChecksum getDataChecksum() {
++    return checksum;
++  }
+ }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
index 0000000,e135d8e..f908dd3
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
@@@ -1,0 -1,120 +1,126 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.EnumSet;
+ 
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
++import org.apache.hadoop.util.DataChecksum;
+ 
+ /**
+  * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
+  * replicas.
+  */
+ @InterfaceAudience.Private
+ public final class ExternalBlockReader implements BlockReader {
+   private final ReplicaAccessor accessor;
+   private final long visibleLength;
+   private long pos;
+ 
+   ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
+                       long startOffset) {
+     this.accessor = accessor;
+     this.visibleLength = visibleLength;
+     this.pos = startOffset;
+   }
+ 
+   @Override
+   public int read(byte[] buf, int off, int len) throws IOException {
+     int nread = accessor.read(pos, buf, off, len);
+     pos += nread;
+     return nread;
+   }
+ 
+   @Override
+   public int read(ByteBuffer buf) throws IOException {
+     int nread = accessor.read(pos, buf);
+     pos += nread;
+     return nread;
+   }
+ 
+   @Override
+   public long skip(long n) throws IOException {
+     // You cannot skip backwards
+     if (n <= 0) {
+       return 0;
+     }
+     // You can't skip past the end of the replica.
+     long oldPos = pos;
+     pos += n;
+     if (pos > visibleLength) {
+       pos = visibleLength;
+     }
+     return pos - oldPos;
+   }
+ 
+   @Override
+   public int available() throws IOException {
+     // We return the amount of bytes that we haven't read yet from the
+     // replica, based on our current position.  Some of the other block
+     // readers return a shorter length than that.  The only advantage to
+     // returning a shorter length is that the DFSInputStream will
+     // trash your block reader and create a new one if someone tries to
+     // seek() beyond the available() region.
+     long diff = visibleLength - pos;
+     if (diff > Integer.MAX_VALUE) {
+       return Integer.MAX_VALUE;
+     } else {
+       return (int)diff;
+     }
+   }
+ 
+   @Override
+   public void close() throws IOException {
+     accessor.close();
+   }
+ 
+   @Override
+   public void readFully(byte[] buf, int offset, int len) throws IOException {
+     BlockReaderUtil.readFully(this, buf, offset, len);
+   }
+ 
+   @Override
+   public int readAll(byte[] buf, int offset, int len) throws IOException {
+     return BlockReaderUtil.readAll(this, buf, offset, len);
+   }
+ 
+   @Override
+   public boolean isLocal() {
+     return accessor.isLocal();
+   }
+ 
+   @Override
+   public boolean isShortCircuit() {
+     return accessor.isShortCircuit();
+   }
+ 
+   @Override
+   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+     // For now, pluggable ReplicaAccessors do not support zero-copy.
+     return null;
+   }
++
++  @Override
++  public DataChecksum getDataChecksum() {
++    return null;
++  }
+ }