You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/30 17:42:16 UTC
[16/50] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
Merge remote-tracking branch 'apache/trunk' into HDFS-7285
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/53358fe6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/53358fe6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/53358fe6
Branch: refs/heads/trunk
Commit: 53358fe680a11c1b66a7f60733d11c1f4efe0232
Parents: ab56fcd 2e251a7
Author: Zhe Zhang <zh...@cloudera.com>
Authored: Tue Sep 1 00:29:55 2015 -0700
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Tue Sep 1 14:48:37 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 9 +
hadoop-common-project/hadoop-common/pom.xml | 19 +-
.../fs/CommonConfigurationKeysPublic.java | 7 +
.../src/main/resources/core-default.xml | 14 +-
.../src/site/markdown/FileSystemShell.md | 13 +-
.../java/org/apache/hadoop/fs/test-untar.tar | Bin 20480 -> 0 bytes
.../java/org/apache/hadoop/fs/test-untar.tgz | Bin 2024 -> 0 bytes
.../fs/viewfs/ViewFileSystemBaseTest.java | 2 +-
.../apache/hadoop/fs/viewfs/ViewFsBaseTest.java | 2 +-
.../src/test/resources/test-untar.tar | Bin 0 -> 20480 bytes
.../src/test/resources/test-untar.tgz | Bin 0 -> 2024 bytes
hadoop-hdfs-project/hadoop-hdfs-client/pom.xml | 5 +
.../org/apache/hadoop/hdfs/BlockReader.java | 110 +++
.../apache/hadoop/hdfs/BlockReaderLocal.java | 748 +++++++++++++++++++
.../hadoop/hdfs/BlockReaderLocalLegacy.java | 743 ++++++++++++++++++
.../org/apache/hadoop/hdfs/BlockReaderUtil.java | 57 ++
.../org/apache/hadoop/hdfs/ClientContext.java | 196 +++++
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 68 ++
.../apache/hadoop/hdfs/ExternalBlockReader.java | 126 ++++
.../apache/hadoop/hdfs/KeyProviderCache.java | 112 +++
.../java/org/apache/hadoop/hdfs/PeerCache.java | 291 ++++++++
.../apache/hadoop/hdfs/RemoteBlockReader.java | 517 +++++++++++++
.../apache/hadoop/hdfs/RemoteBlockReader2.java | 485 ++++++++++++
.../hadoop/hdfs/client/BlockReportOptions.java | 59 ++
.../hdfs/client/HdfsClientConfigKeys.java | 13 +
.../hdfs/protocol/BlockLocalPathInfo.java | 70 ++
.../hdfs/protocol/ClientDatanodeProtocol.java | 152 ++++
.../InvalidEncryptionKeyException.java | 40 +
.../protocol/datatransfer/PacketHeader.java | 214 ++++++
.../protocol/datatransfer/PacketReceiver.java | 310 ++++++++
.../protocolPB/ClientDatanodeProtocolPB.java | 37 +
.../ClientDatanodeProtocolTranslatorPB.java | 326 ++++++++
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 13 +
.../token/block/BlockTokenSelector.java | 48 ++
.../hdfs/util/ByteBufferOutputStream.java | 49 ++
.../hadoop/hdfs/web/URLConnectionFactory.java | 30 +-
.../hadoop/hdfs/web/WebHdfsFileSystem.java | 15 +-
.../hdfs/web/oauth2/AccessTokenProvider.java | 66 ++
.../hdfs/web/oauth2/AccessTokenTimer.java | 103 +++
.../ConfCredentialBasedAccessTokenProvider.java | 62 ++
...onfRefreshTokenBasedAccessTokenProvider.java | 146 ++++
.../CredentialBasedAccessTokenProvider.java | 135 ++++
.../oauth2/OAuth2ConnectionConfigurator.java | 79 ++
.../hadoop/hdfs/web/oauth2/OAuth2Constants.java | 46 ++
.../apache/hadoop/hdfs/web/oauth2/Utils.java | 63 ++
.../hadoop/hdfs/web/oauth2/package-info.java | 26 +
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 22 +
hadoop-hdfs-project/hadoop-hdfs/pom.xml | 6 +
.../bkjournal/BookKeeperEditLogInputStream.java | 2 +-
.../org/apache/hadoop/hdfs/BlockReader.java | 110 ---
.../apache/hadoop/hdfs/BlockReaderLocal.java | 746 ------------------
.../hadoop/hdfs/BlockReaderLocalLegacy.java | 740 ------------------
.../org/apache/hadoop/hdfs/BlockReaderUtil.java | 57 --
.../org/apache/hadoop/hdfs/ClientContext.java | 195 -----
.../java/org/apache/hadoop/hdfs/DFSClient.java | 1 -
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 +-
.../org/apache/hadoop/hdfs/DFSInputStream.java | 2 +-
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 68 +-
.../apache/hadoop/hdfs/ExternalBlockReader.java | 126 ----
.../apache/hadoop/hdfs/KeyProviderCache.java | 111 ---
.../java/org/apache/hadoop/hdfs/PeerCache.java | 290 -------
.../apache/hadoop/hdfs/RemoteBlockReader.java | 513 -------------
.../apache/hadoop/hdfs/RemoteBlockReader2.java | 482 ------------
.../hadoop/hdfs/client/BlockReportOptions.java | 59 --
.../hdfs/protocol/BlockLocalPathInfo.java | 70 --
.../hdfs/protocol/ClientDatanodeProtocol.java | 152 ----
.../hadoop/hdfs/protocol/LayoutVersion.java | 2 +-
.../InvalidEncryptionKeyException.java | 40 -
.../protocol/datatransfer/PacketHeader.java | 214 ------
.../protocol/datatransfer/PacketReceiver.java | 310 --------
.../hdfs/protocol/datatransfer/Receiver.java | 15 +-
.../protocolPB/ClientDatanodeProtocolPB.java | 37 -
...tDatanodeProtocolServerSideTranslatorPB.java | 6 +-
.../ClientDatanodeProtocolTranslatorPB.java | 326 --------
...tNamenodeProtocolServerSideTranslatorPB.java | 14 +-
.../DatanodeProtocolServerSideTranslatorPB.java | 2 +-
...rDatanodeProtocolServerSideTranslatorPB.java | 2 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 33 +-
.../token/block/BlockTokenSelector.java | 48 --
.../hdfs/server/blockmanagement/BlockInfo.java | 19 +-
.../blockmanagement/BlockInfoContiguous.java | 15 -
.../blockmanagement/BlockInfoStriped.java | 19 -
.../server/blockmanagement/BlockManager.java | 552 +++++---------
.../BlockPlacementPolicyDefault.java | 147 +---
.../blockmanagement/BlockRecoveryWork.java | 111 +++
.../blockmanagement/BlockToMarkCorrupt.java | 82 ++
.../hdfs/server/blockmanagement/BlocksMap.java | 16 -
.../blockmanagement/DatanodeDescriptor.java | 35 +-
.../server/blockmanagement/DatanodeManager.java | 9 +-
.../blockmanagement/ErasureCodingWork.java | 60 ++
.../server/blockmanagement/HostFileManager.java | 19 +
.../server/blockmanagement/ReplicationWork.java | 53 ++
.../hadoop/hdfs/server/datanode/DNConf.java | 4 +-
.../namenode/EditLogBackupInputStream.java | 2 +-
.../server/namenode/EditLogFileInputStream.java | 2 +-
.../hdfs/server/namenode/FSDirDeleteOp.java | 40 +
.../hdfs/server/namenode/FSDirectory.java | 63 ++
.../hdfs/server/namenode/FSEditLogLoader.java | 4 +-
.../hdfs/server/namenode/FSEditLogOp.java | 354 ++++++---
.../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 3 +-
.../hdfs/util/ByteBufferOutputStream.java | 49 --
.../hadoop-hdfs/src/site/markdown/WebHDFS.md | 25 +
.../hadoop/hdfs/TestBlockReaderLocal.java | 30 +-
.../hadoop/hdfs/TestBlockReaderLocalLegacy.java | 2 +-
.../hdfs/TestClientBlockVerification.java | 4 +-
.../hadoop/hdfs/TestDFSClientRetries.java | 2 +-
.../apache/hadoop/hdfs/TestDecommission.java | 15 +-
.../hadoop/hdfs/protocolPB/TestPBHelper.java | 4 +-
.../security/token/block/TestBlockToken.java | 10 +-
.../blockmanagement/TestBlockInfoStriped.java | 30 -
.../blockmanagement/TestDatanodeManager.java | 103 ++-
.../blockmanagement/TestHostFileManager.java | 7 +-
.../blockmanagement/TestReplicationPolicy.java | 26 +-
.../hdfs/server/namenode/TestEditLog.java | 2 +-
.../namenode/TestEditLogFileInputStream.java | 80 ++
.../namenode/TestProtectedDirectories.java | 373 +++++++++
.../shortcircuit/TestShortCircuitLocalRead.java | 4 +-
.../hadoop/hdfs/web/TestWebHDFSOAuth2.java | 216 ++++++
.../hdfs/web/oauth2/TestAccessTokenTimer.java | 63 ++
...ClientCredentialTimeBasedTokenRefresher.java | 138 ++++
...TestRefreshTokenTimeBasedTokenRefresher.java | 138 ++++
hadoop-project/src/site/site.xml | 1 +
.../org/apache/hadoop/fs/s3a/Constants.java | 4 +-
.../src/site/markdown/tools/hadoop-aws/index.md | 4 +-
hadoop-yarn-project/CHANGES.txt | 10 +-
.../hadoop/yarn/client/TestRMFailover.java | 27 +
.../hadoop/yarn/webapp/YarnWebParams.java | 1 +
.../scheduler/capacity/AbstractCSQueue.java | 27 +
.../scheduler/capacity/CSQueue.java | 26 +
.../scheduler/capacity/CapacityScheduler.java | 40 +-
.../scheduler/capacity/LeafQueue.java | 16 +
.../scheduler/common/fica/FiCaSchedulerApp.java | 9 +
.../resourcemanager/webapp/RMWebAppFilter.java | 90 ++-
.../TestCapacitySchedulerNodeLabelUpdate.java | 249 +++++-
.../src/site/markdown/NodeLabel.md | 140 ++++
135 files changed, 8407 insertions(+), 5608 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
index 0000000,aa3e8ba..8f988af
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
@@@ -1,0 -1,102 +1,110 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs;
+
++import java.io.Closeable;
+ import java.io.IOException;
+ import java.util.EnumSet;
+
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.ByteBufferReadable;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
++import org.apache.hadoop.util.DataChecksum;
+
+ /**
+ * A BlockReader is responsible for reading a single block
+ * from a single datanode.
+ */
+ @InterfaceAudience.Private
-public interface BlockReader extends ByteBufferReadable {
++public interface BlockReader extends ByteBufferReadable, Closeable {
+
+
+ /* same interface as inputStream java.io.InputStream#read()
+ * used by DFSInputStream#read()
+ * This violates one rule when there is a checksum error:
+ * "Read should not modify user buffer before successful read"
+ * because it first reads the data to user buffer and then checks
+ * the checksum.
+ * Note: this must return -1 on EOF, even in the case of a 0-byte read.
+ * See HDFS-5762 for details.
+ */
+ int read(byte[] buf, int off, int len) throws IOException;
+
+ /**
+ * Skip the given number of bytes
+ */
+ long skip(long n) throws IOException;
+
+ /**
+ * Returns an estimate of the number of bytes that can be read
+ * (or skipped over) from this input stream without performing
+ * network I/O.
+ * This may return more than what is actually present in the block.
+ */
+ int available() throws IOException;
+
+ /**
+ * Close the block reader.
+ *
+ * @throws IOException
+ */
++ @Override // java.io.Closeable
+ void close() throws IOException;
+
+ /**
+ * Read exactly the given amount of data, throwing an exception
+ * if EOF is reached before that amount
+ */
+ void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException;
+
+ /**
+ * Similar to {@link #readFully(byte[], int, int)} except that it will
+ * not throw an exception on EOF. However, it differs from the simple
+ * {@link #read(byte[], int, int)} call in that it is guaranteed to
+ * read the data if it is available. In other words, if this call
+ * does not throw an exception, then either the buffer has been
+ * filled or the next call will return EOF.
+ */
+ int readAll(byte[] buf, int offset, int len) throws IOException;
+
+ /**
+ * @return true only if this is a local read.
+ */
+ boolean isLocal();
+
+ /**
+ * @return true only if this is a short-circuit read.
+ * All short-circuit reads are also local.
+ */
+ boolean isShortCircuit();
+
+ /**
+ * Get a ClientMmap object for this BlockReader.
+ *
+ * @param opts The read options to use.
+ * @return The ClientMmap object, or null if mmap is not
+ * supported.
+ */
+ ClientMmap getClientMmap(EnumSet<ReadOption> opts);
++
++ /**
++ * @return The DataChecksum used by the read block
++ */
++ DataChecksum getDataChecksum();
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index 0000000,2a0e21b..8d7f294
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@@ -1,0 -1,743 +1,748 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs;
+
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.nio.channels.FileChannel;
+ import java.util.EnumSet;
+
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.fs.StorageType;
+ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+ import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.hadoop.util.DirectBufferPool;
+ import org.apache.htrace.Sampler;
+ import org.apache.htrace.Trace;
+ import org.apache.htrace.TraceScope;
+
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /**
+ * BlockReaderLocal enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ * {@link BlockReaderLocal} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the file descriptors for the metadata file and the data
+ * file for the block using
+ * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
+ * </li>
+ * <li>The client reads the file descriptors.</li>
+ * </ul>
+ */
+ @InterfaceAudience.Private
+ class BlockReaderLocal implements BlockReader {
+ static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class);
+
+ private static final DirectBufferPool bufferPool = new DirectBufferPool();
+
+ public static class Builder {
+ private final int bufferSize;
+ private boolean verifyChecksum;
+ private int maxReadahead;
+ private String filename;
+ private ShortCircuitReplica replica;
+ private long dataPos;
+ private ExtendedBlock block;
+ private StorageType storageType;
+
+ public Builder(ShortCircuitConf conf) {
+ this.maxReadahead = Integer.MAX_VALUE;
+ this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
+ this.bufferSize = conf.getShortCircuitBufferSize();
+ }
+
+ public Builder setVerifyChecksum(boolean verifyChecksum) {
+ this.verifyChecksum = verifyChecksum;
+ return this;
+ }
+
+ public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
+ long readahead = cachingStrategy.getReadahead() != null ?
+ cachingStrategy.getReadahead() :
+ HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
+ this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
+ return this;
+ }
+
+ public Builder setFilename(String filename) {
+ this.filename = filename;
+ return this;
+ }
+
+ public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
+ this.replica = replica;
+ return this;
+ }
+
+ public Builder setStartOffset(long startOffset) {
+ this.dataPos = Math.max(0, startOffset);
+ return this;
+ }
+
+ public Builder setBlock(ExtendedBlock block) {
+ this.block = block;
+ return this;
+ }
+
+ public Builder setStorageType(StorageType storageType) {
+ this.storageType = storageType;
+ return this;
+ }
+
+ public BlockReaderLocal build() {
+ Preconditions.checkNotNull(replica);
+ return new BlockReaderLocal(this);
+ }
+ }
+
+ private boolean closed = false;
+
+ /**
+ * Pair of streams for this block.
+ */
+ private final ShortCircuitReplica replica;
+
+ /**
+ * The data FileChannel.
+ */
+ private final FileChannel dataIn;
+
+ /**
+ * The next place we'll read from in the block data FileChannel.
+ *
+ * If data is buffered in dataBuf, this offset will be larger than the
+ * offset of the next byte which a read() operation will give us.
+ */
+ private long dataPos;
+
+ /**
+ * The Checksum FileChannel.
+ */
+ private final FileChannel checksumIn;
+
+ /**
+ * Checksum type and size.
+ */
+ private final DataChecksum checksum;
+
+ /**
+ * If false, we will always skip the checksum.
+ */
+ private final boolean verifyChecksum;
+
+ /**
+ * Name of the block, for logging purposes.
+ */
+ private final String filename;
+
+ /**
+ * Block ID and Block Pool ID.
+ */
+ private final ExtendedBlock block;
+
+ /**
+ * Cache of Checksum#bytesPerChecksum.
+ */
+ private final int bytesPerChecksum;
+
+ /**
+ * Cache of Checksum#checksumSize.
+ */
+ private final int checksumSize;
+
+ /**
+ * Maximum number of chunks to allocate.
+ *
+ * This is used to allocate dataBuf and checksumBuf, in the event that
+ * we need them.
+ */
+ private final int maxAllocatedChunks;
+
+ /**
+ * True if zero readahead was requested.
+ */
+ private final boolean zeroReadaheadRequested;
+
+ /**
+ * Maximum amount of readahead we'll do. This will always be at least the,
+ * size of a single chunk, even if {@link #zeroReadaheadRequested} is true.
+ * The reason is because we need to do a certain amount of buffering in order
+ * to do checksumming.
+ *
+ * This determines how many bytes we'll use out of dataBuf and checksumBuf.
+ * Why do we allocate buffers, and then (potentially) only use part of them?
+ * The rationale is that allocating a lot of buffers of different sizes would
+ * make it very difficult for the DirectBufferPool to re-use buffers.
+ */
+ private final int maxReadaheadLength;
+
+ /**
+ * Buffers data starting at the current dataPos and extending on
+ * for dataBuf.limit().
+ *
+ * This may be null if we don't need it.
+ */
+ private ByteBuffer dataBuf;
+
+ /**
+ * Buffers checksums starting at the current checksumPos and extending on
+ * for checksumBuf.limit().
+ *
+ * This may be null if we don't need it.
+ */
+ private ByteBuffer checksumBuf;
+
+ /**
+ * StorageType of replica on DataNode.
+ */
+ private StorageType storageType;
+
+ private BlockReaderLocal(Builder builder) {
+ this.replica = builder.replica;
+ this.dataIn = replica.getDataStream().getChannel();
+ this.dataPos = builder.dataPos;
+ this.checksumIn = replica.getMetaStream().getChannel();
+ BlockMetadataHeader header = builder.replica.getMetaHeader();
+ this.checksum = header.getChecksum();
+ this.verifyChecksum = builder.verifyChecksum &&
+ (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
+ this.filename = builder.filename;
+ this.block = builder.block;
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
+ this.checksumSize = checksum.getChecksumSize();
+
+ this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
+ ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
+ // Calculate the effective maximum readahead.
+ // We can't do more readahead than there is space in the buffer.
+ int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
+ ((Math.min(builder.bufferSize, builder.maxReadahead) +
+ bytesPerChecksum - 1) / bytesPerChecksum);
+ if (maxReadaheadChunks == 0) {
+ this.zeroReadaheadRequested = true;
+ maxReadaheadChunks = 1;
+ } else {
+ this.zeroReadaheadRequested = false;
+ }
+ this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
+ this.storageType = builder.storageType;
+ }
+
+ private synchronized void createDataBufIfNeeded() {
+ if (dataBuf == null) {
+ dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
+ dataBuf.position(0);
+ dataBuf.limit(0);
+ }
+ }
+
+ private synchronized void freeDataBufIfExists() {
+ if (dataBuf != null) {
+ // When disposing of a dataBuf, we have to move our stored file index
+ // backwards.
+ dataPos -= dataBuf.remaining();
+ dataBuf.clear();
+ bufferPool.returnBuffer(dataBuf);
+ dataBuf = null;
+ }
+ }
+
+ private synchronized void createChecksumBufIfNeeded() {
+ if (checksumBuf == null) {
+ checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
+ checksumBuf.position(0);
+ checksumBuf.limit(0);
+ }
+ }
+
+ private synchronized void freeChecksumBufIfExists() {
+ if (checksumBuf != null) {
+ checksumBuf.clear();
+ bufferPool.returnBuffer(checksumBuf);
+ checksumBuf = null;
+ }
+ }
+
+ private synchronized int drainDataBuf(ByteBuffer buf) {
+ if (dataBuf == null) return -1;
+ int oldLimit = dataBuf.limit();
+ int nRead = Math.min(dataBuf.remaining(), buf.remaining());
+ if (nRead == 0) {
+ return (dataBuf.remaining() == 0) ? -1 : 0;
+ }
+ try {
+ dataBuf.limit(dataBuf.position() + nRead);
+ buf.put(dataBuf);
+ } finally {
+ dataBuf.limit(oldLimit);
+ }
+ return nRead;
+ }
+
+ /**
+ * Read from the block file into a buffer.
+ *
+ * This function overwrites checksumBuf. It will increment dataPos.
+ *
+ * @param buf The buffer to read into. May be dataBuf.
+ * The position and limit of this buffer should be set to
+ * multiples of the checksum size.
+ * @param canSkipChecksum True if we can skip checksumming.
+ *
+ * @return Total bytes read. 0 on EOF.
+ */
+ private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
+ throws IOException {
+ TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" +
+ block.getBlockId() + ")", Sampler.NEVER);
+ try {
+ int total = 0;
+ long startDataPos = dataPos;
+ int startBufPos = buf.position();
+ while (buf.hasRemaining()) {
+ int nRead = dataIn.read(buf, dataPos);
+ if (nRead < 0) {
+ break;
+ }
+ dataPos += nRead;
+ total += nRead;
+ }
+ if (canSkipChecksum) {
+ freeChecksumBufIfExists();
+ return total;
+ }
+ if (total > 0) {
+ try {
+ buf.limit(buf.position());
+ buf.position(startBufPos);
+ createChecksumBufIfNeeded();
+ int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
+ checksumBuf.clear();
+ checksumBuf.limit(checksumsNeeded * checksumSize);
+ long checksumPos = BlockMetadataHeader.getHeaderSize()
+ + ((startDataPos / bytesPerChecksum) * checksumSize);
+ while (checksumBuf.hasRemaining()) {
+ int nRead = checksumIn.read(checksumBuf, checksumPos);
+ if (nRead < 0) {
+ throw new IOException("Got unexpected checksum file EOF at " +
+ checksumPos + ", block file position " + startDataPos + " for " +
+ "block " + block + " of file " + filename);
+ }
+ checksumPos += nRead;
+ }
+ checksumBuf.flip();
+
+ checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
+ } finally {
+ buf.position(buf.limit());
+ }
+ }
+ return total;
+ } finally {
+ scope.close();
+ }
+ }
+
+ private boolean createNoChecksumContext() {
+ if (verifyChecksum) {
+ if (storageType != null && storageType.isTransient()) {
+ // Checksums are not stored for replicas on transient storage. We do not
+ // anchor, because we do not intend for client activity to block eviction
+ // from transient storage on the DataNode side.
+ return true;
+ } else {
+ return replica.addNoChecksumAnchor();
+ }
+ } else {
+ return true;
+ }
+ }
+
+ private void releaseNoChecksumContext() {
+ if (verifyChecksum) {
+ if (storageType == null || !storageType.isTransient()) {
+ replica.removeNoChecksumAnchor();
+ }
+ }
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ boolean canSkipChecksum = createNoChecksumContext();
+ try {
+ String traceString = null;
+ if (LOG.isTraceEnabled()) {
+ traceString = new StringBuilder().
+ append("read(").
+ append("buf.remaining=").append(buf.remaining()).
+ append(", block=").append(block).
+ append(", filename=").append(filename).
+ append(", canSkipChecksum=").append(canSkipChecksum).
+ append(")").toString();
+ LOG.info(traceString + ": starting");
+ }
+ int nRead;
+ try {
+ if (canSkipChecksum && zeroReadaheadRequested) {
+ nRead = readWithoutBounceBuffer(buf);
+ } else {
+ nRead = readWithBounceBuffer(buf, canSkipChecksum);
+ }
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.info(traceString + ": I/O error", e);
+ }
+ throw e;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.info(traceString + ": returning " + nRead);
+ }
+ return nRead;
+ } finally {
+ if (canSkipChecksum) releaseNoChecksumContext();
+ }
+ }
+
+ private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
+ throws IOException {
+ freeDataBufIfExists();
+ freeChecksumBufIfExists();
+ int total = 0;
+ while (buf.hasRemaining()) {
+ int nRead = dataIn.read(buf, dataPos);
+ if (nRead <= 0) break;
+ dataPos += nRead;
+ total += nRead;
+ }
+ return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
+ }
+
+ /**
+ * Fill the data buffer. If necessary, validate the data against the
+ * checksums.
+ *
+ * We always want the offsets of the data contained in dataBuf to be
+ * aligned to the chunk boundary. If we are validating checksums, we
+ * accomplish this by seeking backwards in the file until we're on a
+ * chunk boundary. (This is necessary because we can't checksum a
+ * partial chunk.) If we are not validating checksums, we simply only
+ * fill the latter part of dataBuf.
+ *
+ * @param canSkipChecksum true if we can skip checksumming.
+ * @return true if we hit EOF.
+ * @throws IOException
+ */
+ private synchronized boolean fillDataBuf(boolean canSkipChecksum)
+ throws IOException {
+ createDataBufIfNeeded();
+ final int slop = (int)(dataPos % bytesPerChecksum);
+ final long oldDataPos = dataPos;
+ dataBuf.limit(maxReadaheadLength);
+ if (canSkipChecksum) {
+ dataBuf.position(slop);
+ fillBuffer(dataBuf, canSkipChecksum);
+ } else {
+ dataPos -= slop;
+ dataBuf.position(0);
+ fillBuffer(dataBuf, canSkipChecksum);
+ }
+ dataBuf.limit(dataBuf.position());
+ dataBuf.position(Math.min(dataBuf.position(), slop));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
+ "buffer from offset " + oldDataPos + " of " + block);
+ }
+ return dataBuf.limit() != maxReadaheadLength;
+ }
+
+ /**
+ * Read using the bounce buffer.
+ *
+ * A 'direct' read actually has three phases. The first drains any
+ * remaining bytes from the slow read buffer. After this the read is
+ * guaranteed to be on a checksum chunk boundary. If there are still bytes
+ * to read, the fast direct path is used for as many remaining bytes as
+ * possible, up to a multiple of the checksum chunk size. Finally, any
+ * 'odd' bytes remaining at the end of the read cause another slow read to
+ * be issued, which involves an extra copy.
+ *
+ * Every 'slow' read tries to fill the slow read buffer in one go for
+ * efficiency's sake. As described above, all non-checksum-chunk-aligned
+ * reads will be served from the slower read path.
+ *
+ * @param buf The buffer to read into.
+ * @param canSkipChecksum True if we can skip checksums.
+ */
+ private synchronized int readWithBounceBuffer(ByteBuffer buf,
+ boolean canSkipChecksum) throws IOException {
+ int total = 0;
+ int bb = drainDataBuf(buf); // drain bounce buffer if possible
+ if (bb >= 0) {
+ total += bb;
+ if (buf.remaining() == 0) return total;
+ }
+ boolean eof = true, done = false;
+ do {
+ if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
+ && ((dataPos % bytesPerChecksum) == 0)) {
+ // Fast lane: try to read directly into user-supplied buffer, bypassing
+ // bounce buffer.
+ int oldLimit = buf.limit();
+ int nRead;
+ try {
+ buf.limit(buf.position() + maxReadaheadLength);
+ nRead = fillBuffer(buf, canSkipChecksum);
+ } finally {
+ buf.limit(oldLimit);
+ }
+ if (nRead < maxReadaheadLength) {
+ done = true;
+ }
+ if (nRead > 0) {
+ eof = false;
+ }
+ total += nRead;
+ } else {
+ // Slow lane: refill bounce buffer.
+ if (fillDataBuf(canSkipChecksum)) {
+ done = true;
+ }
+ bb = drainDataBuf(buf); // drain bounce buffer if possible
+ if (bb >= 0) {
+ eof = false;
+ total += bb;
+ }
+ }
+ } while ((!done) && (buf.remaining() > 0));
+ return (eof && total == 0) ? -1 : total;
+ }
+
+ @Override
+ public synchronized int read(byte[] arr, int off, int len)
+ throws IOException {
+ boolean canSkipChecksum = createNoChecksumContext();
+ int nRead;
+ try {
+ String traceString = null;
+ if (LOG.isTraceEnabled()) {
+ traceString = new StringBuilder().
+ append("read(arr.length=").append(arr.length).
+ append(", off=").append(off).
+ append(", len=").append(len).
+ append(", filename=").append(filename).
+ append(", block=").append(block).
+ append(", canSkipChecksum=").append(canSkipChecksum).
+ append(")").toString();
+ LOG.trace(traceString + ": starting");
+ }
+ try {
+ if (canSkipChecksum && zeroReadaheadRequested) {
+ nRead = readWithoutBounceBuffer(arr, off, len);
+ } else {
+ nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+ }
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(traceString + ": I/O error", e);
+ }
+ throw e;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(traceString + ": returning " + nRead);
+ }
+ } finally {
+ if (canSkipChecksum) releaseNoChecksumContext();
+ }
+ return nRead;
+ }
+
+ private synchronized int readWithoutBounceBuffer(byte arr[], int off,
+ int len) throws IOException {
+ freeDataBufIfExists();
+ freeChecksumBufIfExists();
+ int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
+ if (nRead > 0) {
+ dataPos += nRead;
+ } else if ((nRead == 0) && (dataPos == dataIn.size())) {
+ return -1;
+ }
+ return nRead;
+ }
+
+ private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
+ boolean canSkipChecksum) throws IOException {
+ createDataBufIfNeeded();
+ if (!dataBuf.hasRemaining()) {
+ dataBuf.position(0);
+ dataBuf.limit(maxReadaheadLength);
+ fillDataBuf(canSkipChecksum);
+ }
+ if (dataBuf.remaining() == 0) return -1;
+ int toRead = Math.min(dataBuf.remaining(), len);
+ dataBuf.get(arr, off, toRead);
+ return toRead;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ int discardedFromBuf = 0;
+ long remaining = n;
+ if ((dataBuf != null) && dataBuf.hasRemaining()) {
+ discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
+ dataBuf.position(dataBuf.position() + discardedFromBuf);
+ remaining -= discardedFromBuf;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" +
+ filename + "): discarded " + discardedFromBuf + " bytes from " +
+ "dataBuf and advanced dataPos by " + remaining);
+ }
+ dataPos += remaining;
+ return n;
+ }
+
+ @Override
+ public int available() throws IOException {
+ // We never do network I/O in BlockReaderLocal.
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) return;
+ closed = true;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("close(filename=" + filename + ", block=" + block + ")");
+ }
+ replica.unref();
+ freeDataBufIfExists();
+ freeChecksumBufIfExists();
+ }
+
+ @Override
+ public synchronized void readFully(byte[] arr, int off, int len)
+ throws IOException {
+ BlockReaderUtil.readFully(this, arr, off, len);
+ }
+
+ @Override
+ public synchronized int readAll(byte[] buf, int off, int len)
+ throws IOException {
+ return BlockReaderUtil.readAll(this, buf, off, len);
+ }
+
+ @Override
+ public boolean isLocal() {
+ return true;
+ }
+
+ @Override
+ public boolean isShortCircuit() {
+ return true;
+ }
+
+ /**
+ * Get or create a memory map for this replica.
+ *
+ * There are two kinds of ClientMmap objects we could fetch here: one that
+ * will always read pre-checksummed data, and one that may read data that
+ * hasn't been checksummed.
+ *
+ * If we fetch the former, "safe" kind of ClientMmap, we have to increment
+ * the anchor count on the shared memory slot. This will tell the DataNode
+ * not to munlock the block until this ClientMmap is closed.
+ * If we fetch the latter, we don't bother with anchoring.
+ *
+ * @param opts The options to use, such as SKIP_CHECKSUMS.
+ *
+ * @return null on failure; the ClientMmap otherwise.
+ */
+ @Override
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+ boolean anchor = verifyChecksum &&
+ (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
+ if (anchor) {
+ if (!createNoChecksumContext()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("can't get an mmap for " + block + " of " + filename +
+ " since SKIP_CHECKSUMS was not given, " +
+ "we aren't skipping checksums, and the block is not mlocked.");
+ }
+ return null;
+ }
+ }
+ ClientMmap clientMmap = null;
+ try {
+ clientMmap = replica.getOrCreateClientMmap(anchor);
+ } finally {
+ if ((clientMmap == null) && anchor) {
+ releaseNoChecksumContext();
+ }
+ }
+ return clientMmap;
+ }
+
+ @VisibleForTesting
+ boolean getVerifyChecksum() {
+ return this.verifyChecksum;
+ }
+
+ @VisibleForTesting
+ int getMaxReadaheadLength() {
+ return this.maxReadaheadLength;
+ }
+
+ /**
+ * Make the replica anchorable. Normally this can only be done by the
+ * DataNode. This method is only for testing.
+ */
+ @VisibleForTesting
+ void forceAnchorable() {
+ replica.getSlot().makeAnchorable();
+ }
+
+ /**
+ * Make the replica unanchorable. Normally this can only be done by the
+ * DataNode. This method is only for testing.
+ */
+ @VisibleForTesting
+ void forceUnanchorable() {
+ replica.getSlot().makeUnanchorable();
+ }
++
++ @Override
++ public DataChecksum getDataChecksum() {
++ return checksum;
++ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index 0000000,eea3f06..9920438
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@@ -1,0 -1,738 +1,743 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs;
+
+ import java.io.DataInputStream;
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.security.PrivilegedExceptionAction;
+ import java.util.Collections;
+ import java.util.EnumSet;
+ import java.util.HashMap;
+ import java.util.LinkedHashMap;
+ import java.util.Map;
+
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.fs.StorageType;
+ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+ import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+ import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+ import org.apache.hadoop.hdfs.util.IOUtilsClient;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.ipc.RPC;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.hadoop.util.DirectBufferPool;
+ import org.apache.htrace.Sampler;
+ import org.apache.htrace.Trace;
+ import org.apache.htrace.TraceScope;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /**
+ * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ *
+ * This is the legacy implementation based on HDFS-2246, which requires
+ * permissions on the datanode to be set so that clients can directly access the
+ * blocks. The new implementation based on HDFS-347 should be preferred on UNIX
+ * systems where the required native code has been implemented.<br>
+ *
+ * {@link BlockReaderLocalLegacy} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the path to the file where block is stored using
+ * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
+ * RPC call</li>
+ * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+ * if security is enabled.</li>
+ * </ul>
+ */
+ @InterfaceAudience.Private
+ class BlockReaderLocalLegacy implements BlockReader {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ BlockReaderLocalLegacy.class);
+
+ //Stores the cache and proxy for a local datanode.
+ private static class LocalDatanodeInfo {
+ private ClientDatanodeProtocol proxy = null;
+ private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
+
+ LocalDatanodeInfo() {
+ final int cacheSize = 10000;
+ final float hashTableLoadFactor = 0.75f;
+ int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
+ cache = Collections
+ .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
+ hashTableCapacity, hashTableLoadFactor, true) {
+ private static final long serialVersionUID = 1;
+
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
+ return size() > cacheSize;
+ }
+ });
+ }
+
+ private synchronized ClientDatanodeProtocol getDatanodeProxy(
+ UserGroupInformation ugi, final DatanodeInfo node,
+ final Configuration conf, final int socketTimeout,
+ final boolean connectToDnViaHostname) throws IOException {
+ if (proxy == null) {
+ try {
+ proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+ @Override
+ public ClientDatanodeProtocol run() throws Exception {
+ return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf,
+ socketTimeout, connectToDnViaHostname);
+ }
+ });
+ } catch (InterruptedException e) {
+ LOG.warn("encountered exception ", e);
+ }
+ }
+ return proxy;
+ }
+
+ private synchronized void resetDatanodeProxy() {
+ if (null != proxy) {
+ RPC.stopProxy(proxy);
+ proxy = null;
+ }
+ }
+
+ private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
+ return cache.get(b);
+ }
+
+ private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
+ cache.put(b, info);
+ }
+
+ private void removeBlockLocalPathInfo(ExtendedBlock b) {
+ cache.remove(b);
+ }
+ }
+
+ // Multiple datanodes could be running on the local machine. Store proxies in
+ // a map keyed by the ipc port of the datanode.
+ private static final Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+
+ private final FileInputStream dataIn; // reader for the data file
+ private final FileInputStream checksumIn; // reader for the checksum file
+
+ /**
+ * Offset from the most recent chunk boundary at which the next read should
+ * take place. Is only set to non-zero at construction time, and is
+ * decremented (usually to 0) by subsequent reads. This avoids having to do a
+ * checksum read at construction to position the read cursor correctly.
+ */
+ private int offsetFromChunkBoundary;
+
+ private byte[] skipBuf = null;
+
+ /**
+ * Used for checksummed reads that need to be staged before copying to their
+ * output buffer because they are either a) smaller than the checksum chunk
+ * size or b) issued by the slower read(byte[]...) path
+ */
+ private ByteBuffer slowReadBuff = null;
+ private ByteBuffer checksumBuff = null;
+ private DataChecksum checksum;
+ private final boolean verifyChecksum;
+
+ private static final DirectBufferPool bufferPool = new DirectBufferPool();
+
+ private final int bytesPerChecksum;
+ private final int checksumSize;
+
+ /** offset in block where reader wants to actually read */
+ private long startOffset;
+ private final String filename;
+ private long blockId;
+
+ /**
+ * The only way this object can be instantiated.
+ */
+ static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
+ UserGroupInformation userGroupInformation,
+ Configuration configuration, String file, ExtendedBlock blk,
+ Token<BlockTokenIdentifier> token, DatanodeInfo node,
+ long startOffset, long length, StorageType storageType)
+ throws IOException {
+ final ShortCircuitConf scConf = conf.getShortCircuitConf();
+ LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
+ .getIpcPort());
+ // check the cache first
+ BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+ if (pathinfo == null) {
+ if (userGroupInformation == null) {
+ userGroupInformation = UserGroupInformation.getCurrentUser();
+ }
+ pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
+ configuration, conf.getSocketTimeout(), token,
+ conf.isConnectToDnViaHostname(), storageType);
+ }
+
+ // check to see if the file exists. It may so happen that the
+ // HDFS file has been deleted and this block-lookup is occurring
+ // on behalf of a new HDFS file. This time, the block file could
+ // be residing in a different portion of the fs.data.dir directory.
+ // In this case, we remove this entry from the cache. The next
+ // call to this method will re-populate the cache.
+ FileInputStream dataIn = null;
+ FileInputStream checksumIn = null;
+ BlockReaderLocalLegacy localBlockReader = null;
+ final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums()
+ || storageType.isTransient();
+ try {
+ // get a local file system
+ File blkfile = new File(pathinfo.getBlockPath());
+ dataIn = new FileInputStream(blkfile);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size "
+ + blkfile.length() + " startOffset " + startOffset + " length "
+ + length + " short circuit checksum " + !skipChecksumCheck);
+ }
+
+ if (!skipChecksumCheck) {
+ // get the metadata file
+ File metafile = new File(pathinfo.getMetaPath());
+ checksumIn = new FileInputStream(metafile);
+
+ final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+ new DataInputStream(checksumIn), blk);
+ long firstChunkOffset = startOffset
+ - (startOffset % checksum.getBytesPerChecksum());
+ localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
+ startOffset, length, pathinfo, checksum, true, dataIn,
+ firstChunkOffset, checksumIn);
+ } else {
+ localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
+ startOffset, length, pathinfo, dataIn);
+ }
+ } catch (IOException e) {
+ // remove from cache
+ localDatanodeInfo.removeBlockLocalPathInfo(blk);
+ LOG.warn("BlockReaderLocalLegacy: Removing " + blk
+ + " from cache because local file " + pathinfo.getBlockPath()
+ + " could not be opened.");
+ throw e;
+ } finally {
+ if (localBlockReader == null) {
+ if (dataIn != null) {
+ dataIn.close();
+ }
+ if (checksumIn != null) {
+ checksumIn.close();
+ }
+ }
+ }
+ return localBlockReader;
+ }
+
+ private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+ LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+ if (ldInfo == null) {
+ ldInfo = new LocalDatanodeInfo();
+ localDatanodeInfoMap.put(port, ldInfo);
+ }
+ return ldInfo;
+ }
+
+ private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
+ ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
+ Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
+ StorageType storageType) throws IOException {
+ LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
+ BlockLocalPathInfo pathinfo = null;
+ ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
+ conf, timeout, connectToDnViaHostname);
+ try {
+ // make RPC to local datanode to find local pathnames of blocks
+ pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+ // We cannot cache the path information for a replica on transient storage.
+ // If the replica gets evicted, then it moves to a different path. Then,
+ // our next attempt to read from the cached path would fail to find the
+ // file. Additionally, the failure would cause us to disable legacy
+ // short-circuit read for all subsequent use in the ClientContext. Unlike
+ // the newer short-circuit read implementation, we have no communication
+ // channel for the DataNode to notify the client that the path has been
+ // invalidated. Therefore, our only option is to skip caching.
+ if (pathinfo != null && !storageType.isTransient()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cached location of block " + blk + " as " + pathinfo);
+ }
+ localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+ }
+ } catch (IOException e) {
+ localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+ throw e;
+ }
+ return pathinfo;
+ }
+
+ private static int getSlowReadBufferNumChunks(int bufferSizeBytes,
+ int bytesPerChecksum) {
+ if (bufferSizeBytes < bytesPerChecksum) {
+ throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
+ "buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
+ "a single chunk (" + bytesPerChecksum + "). Please configure " +
+ HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY +
+ " appropriately");
+ }
+
+ // Round down to nearest chunk size
+ return bufferSizeBytes / bytesPerChecksum;
+ }
+
+ private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
+ ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
+ long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
+ throws IOException {
+ this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
+ DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
+ dataIn, startOffset, null);
+ }
+
+ private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
+ ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
+ long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
+ boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
+ FileInputStream checksumIn) throws IOException {
+ this.filename = hdfsfile;
+ this.checksum = checksum;
+ this.verifyChecksum = verifyChecksum;
+ this.startOffset = Math.max(startOffset, 0);
+ this.blockId = block.getBlockId();
+
+ bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ checksumSize = this.checksum.getChecksumSize();
+
+ this.dataIn = dataIn;
+ this.checksumIn = checksumIn;
+ this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
+
+ final int chunksPerChecksumRead = getSlowReadBufferNumChunks(
+ conf.getShortCircuitBufferSize(), bytesPerChecksum);
+ slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+ checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+ // Initially the buffers have nothing to read.
+ slowReadBuff.flip();
+ checksumBuff.flip();
+ boolean success = false;
+ try {
+ // Skip both input streams to beginning of the chunk containing startOffset
+ IOUtils.skipFully(dataIn, firstChunkOffset);
+ if (checksumIn != null) {
+ long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+ IOUtils.skipFully(checksumIn, checkSumOffset);
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ bufferPool.returnBuffer(slowReadBuff);
+ bufferPool.returnBuffer(checksumBuff);
+ }
+ }
+ }
+
+ /**
+ * Reads bytes into a buffer until EOF or the buffer's limit is reached
+ */
+ private int fillBuffer(FileInputStream stream, ByteBuffer buf)
+ throws IOException {
+ TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" +
+ blockId + ")", Sampler.NEVER);
+ try {
+ int bytesRead = stream.getChannel().read(buf);
+ if (bytesRead < 0) {
+ //EOF
+ return bytesRead;
+ }
+ while (buf.remaining() > 0) {
+ int n = stream.getChannel().read(buf);
+ if (n < 0) {
+ //EOF
+ return bytesRead;
+ }
+ bytesRead += n;
+ }
+ return bytesRead;
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
+ * another.
+ */
+ private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
+ int oldLimit = from.limit();
+ from.limit(from.position() + length);
+ try {
+ to.put(from);
+ } finally {
+ from.limit(oldLimit);
+ }
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ int nRead = 0;
+ if (verifyChecksum) {
+ // A 'direct' read actually has three phases. The first drains any
+ // remaining bytes from the slow read buffer. After this the read is
+ // guaranteed to be on a checksum chunk boundary. If there are still bytes
+ // to read, the fast direct path is used for as many remaining bytes as
+ // possible, up to a multiple of the checksum chunk size. Finally, any
+ // 'odd' bytes remaining at the end of the read cause another slow read to
+ // be issued, which involves an extra copy.
+
+ // Every 'slow' read tries to fill the slow read buffer in one go for
+ // efficiency's sake. As described above, all non-checksum-chunk-aligned
+ // reads will be served from the slower read path.
+
+ if (slowReadBuff.hasRemaining()) {
+ // There are remaining bytes from a small read available. This usually
+ // means this read is unaligned, which falls back to the slow path.
+ int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining());
+ writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+ nRead += fromSlowReadBuff;
+ }
+
+ if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
+ // Since we have drained the 'small read' buffer, we are guaranteed to
+ // be chunk-aligned
+ int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
+
+ // There's only enough checksum buffer space available to checksum one
+ // entire slow read buffer. This saves keeping the number of checksum
+ // chunks around.
+ len = Math.min(len, slowReadBuff.capacity());
+ int oldlimit = buf.limit();
+ buf.limit(buf.position() + len);
+ int readResult = 0;
+ try {
+ readResult = doByteBufferRead(buf);
+ } finally {
+ buf.limit(oldlimit);
+ }
+ if (readResult == -1) {
+ return nRead;
+ } else {
+ nRead += readResult;
+ buf.position(buf.position() + readResult);
+ }
+ }
+
+ // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
+ // until chunk boundary
+ if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) {
+ int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary);
+ int readResult = fillSlowReadBuffer(toRead);
+ if (readResult == -1) {
+ return nRead;
+ } else {
+ int fromSlowReadBuff = Math.min(readResult, buf.remaining());
+ writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+ nRead += fromSlowReadBuff;
+ }
+ }
+ } else {
+ // Non-checksummed reads are much easier; we can just fill the buffer directly.
+ nRead = doByteBufferRead(buf);
+ if (nRead > 0) {
+ buf.position(buf.position() + nRead);
+ }
+ }
+ return nRead;
+ }
+
+ /**
+ * Tries to read as many bytes as possible into supplied buffer, checksumming
+ * each chunk if needed.
+ *
+ * <b>Preconditions:</b>
+ * <ul>
+ * <li>
+ * If checksumming is enabled, buf.remaining must be a multiple of
+ * bytesPerChecksum. Note that this is not a requirement for clients of
+ * read(ByteBuffer) - in the case of non-checksum-sized read requests,
+ * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
+ * method.
+ * </li>
+ * </ul>
+ * <b>Postconditions:</b>
+ * <ul>
+ * <li>buf.limit and buf.mark are unchanged.</li>
+ * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
+ * requested bytes can be read straight from the buffer</li>
+ * </ul>
+ *
+ * @param buf
+ * byte buffer to write bytes to. If checksums are not required, buf
+ * can have any number of bytes remaining, otherwise there must be a
+ * multiple of the checksum chunk size remaining.
+ * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
+ * that is, the the number of useful bytes (up to the amount
+ * requested) readable from the buffer by the client.
+ */
+ private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
+ if (verifyChecksum) {
+ assert buf.remaining() % bytesPerChecksum == 0;
+ }
+ int dataRead = -1;
+
+ int oldpos = buf.position();
+ // Read as much as we can into the buffer.
+ dataRead = fillBuffer(dataIn, buf);
+
+ if (dataRead == -1) {
+ return -1;
+ }
+
+ if (verifyChecksum) {
+ ByteBuffer toChecksum = buf.duplicate();
+ toChecksum.position(oldpos);
+ toChecksum.limit(oldpos + dataRead);
+
+ checksumBuff.clear();
+ // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
+ int numChunks =
+ (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
+ checksumBuff.limit(checksumSize * numChunks);
+
+ fillBuffer(checksumIn, checksumBuff);
+ checksumBuff.flip();
+
+ checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
+ this.startOffset);
+ }
+
+ if (dataRead >= 0) {
+ buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
+ }
+
+ if (dataRead < offsetFromChunkBoundary) {
+ // yikes, didn't even get enough bytes to honour offset. This can happen
+ // even if we are verifying checksums if we are at EOF.
+ offsetFromChunkBoundary -= dataRead;
+ dataRead = 0;
+ } else {
+ dataRead -= offsetFromChunkBoundary;
+ offsetFromChunkBoundary = 0;
+ }
+
+ return dataRead;
+ }
+
+ /**
+ * Ensures that up to len bytes are available and checksummed in the slow read
+ * buffer. The number of bytes available to read is returned. If the buffer is
+ * not already empty, the number of remaining bytes is returned and no actual
+ * read happens.
+ *
+ * @param len
+ * the maximum number of bytes to make available. After len bytes
+ * are read, the underlying bytestream <b>must</b> be at a checksum
+ * boundary, or EOF. That is, (len + currentPosition) %
+ * bytesPerChecksum == 0.
+ * @return the number of bytes available to read, or -1 if EOF.
+ */
+ private synchronized int fillSlowReadBuffer(int len) throws IOException {
+ int nRead = -1;
+ if (slowReadBuff.hasRemaining()) {
+ // Already got data, good to go.
+ nRead = Math.min(len, slowReadBuff.remaining());
+ } else {
+ // Round a complete read of len bytes (plus any implicit offset) to the
+ // next chunk boundary, since we try and read in multiples of a chunk
+ int nextChunk = len + offsetFromChunkBoundary +
+ (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum));
+ int limit = Math.min(nextChunk, slowReadBuff.capacity());
+ assert limit % bytesPerChecksum == 0;
+
+ slowReadBuff.clear();
+ slowReadBuff.limit(limit);
+
+ nRead = doByteBufferRead(slowReadBuff);
+
+ if (nRead > 0) {
+ // So that next time we call slowReadBuff.hasRemaining(), we don't get a
+ // false positive.
+ slowReadBuff.limit(nRead + slowReadBuff.position());
+ }
+ }
+ return nRead;
+ }
+
+ @Override
+ public synchronized int read(byte[] buf, int off, int len) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("read off " + off + " len " + len);
+ }
+ if (!verifyChecksum) {
+ return dataIn.read(buf, off, len);
+ }
+
+ int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
+
+ if (nRead > 0) {
+ // Possible that buffer is filled with a larger read than we need, since
+ // we tried to read as much as possible at once
+ nRead = Math.min(len, nRead);
+ slowReadBuff.get(buf, off, nRead);
+ }
+
+ return nRead;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("skip " + n);
+ }
+ if (n <= 0) {
+ return 0;
+ }
+ if (!verifyChecksum) {
+ return dataIn.skip(n);
+ }
+
+ // caller made sure newPosition is not beyond EOF.
+ int remaining = slowReadBuff.remaining();
+ int position = slowReadBuff.position();
+ int newPosition = position + (int)n;
+
+ // if the new offset is already read into dataBuff, just reposition
+ if (n <= remaining) {
+ assert offsetFromChunkBoundary == 0;
+ slowReadBuff.position(newPosition);
+ return n;
+ }
+
+ // for small gap, read through to keep the data/checksum in sync
+ if (n - remaining <= bytesPerChecksum) {
+ slowReadBuff.position(position + remaining);
+ if (skipBuf == null) {
+ skipBuf = new byte[bytesPerChecksum];
+ }
+ int ret = read(skipBuf, 0, (int)(n - remaining));
+ return (remaining + ret);
+ }
+
+ // optimize for big gap: discard the current buffer, skip to
+ // the beginning of the appropriate checksum chunk and then
+ // read to the middle of that chunk to be in sync with checksums.
+
+ // We can't use this.offsetFromChunkBoundary because we need to know how
+ // many bytes of the offset were really read. Calling read(..) with a
+ // positive this.offsetFromChunkBoundary causes that many bytes to get
+ // silently skipped.
+ int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
+ long toskip = n - remaining - myOffsetFromChunkBoundary;
+
+ slowReadBuff.position(slowReadBuff.limit());
+ checksumBuff.position(checksumBuff.limit());
+
+ IOUtils.skipFully(dataIn, toskip);
+ long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
+ IOUtils.skipFully(checksumIn, checkSumOffset);
+
+ // read into the middle of the chunk
+ if (skipBuf == null) {
+ skipBuf = new byte[bytesPerChecksum];
+ }
+ assert skipBuf.length == bytesPerChecksum;
+ assert myOffsetFromChunkBoundary < bytesPerChecksum;
+
+ int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+
+ if (ret == -1) { // EOS
+ return (toskip + remaining);
+ } else {
+ return (toskip + remaining + ret);
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ IOUtilsClient.cleanup(LOG, dataIn, checksumIn);
+ if (slowReadBuff != null) {
+ bufferPool.returnBuffer(slowReadBuff);
+ slowReadBuff = null;
+ }
+ if (checksumBuff != null) {
+ bufferPool.returnBuffer(checksumBuff);
+ checksumBuff = null;
+ }
+ startOffset = -1;
+ checksum = null;
+ }
+
+ @Override
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return BlockReaderUtil.readAll(this, buf, offset, len);
+ }
+
+ @Override
+ public void readFully(byte[] buf, int off, int len) throws IOException {
+ BlockReaderUtil.readFully(this, buf, off, len);
+ }
+
+ @Override
+ public int available() throws IOException {
+ // We never do network I/O in BlockReaderLocalLegacy.
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return true;
+ }
+
+ @Override
+ public boolean isShortCircuit() {
+ return true;
+ }
+
+ @Override
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+ return null;
+ }
++
++ @Override
++ public DataChecksum getDataChecksum() {
++ return checksum;
++ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
index 0000000,e135d8e..f908dd3
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
@@@ -1,0 -1,120 +1,126 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs;
+
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.EnumSet;
+
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
++import org.apache.hadoop.util.DataChecksum;
+
+ /**
+ * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
+ * replicas.
+ */
+ @InterfaceAudience.Private
+ public final class ExternalBlockReader implements BlockReader {
+ private final ReplicaAccessor accessor;
+ private final long visibleLength;
+ private long pos;
+
+ ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
+ long startOffset) {
+ this.accessor = accessor;
+ this.visibleLength = visibleLength;
+ this.pos = startOffset;
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ int nread = accessor.read(pos, buf, off, len);
+ pos += nread;
+ return nread;
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ int nread = accessor.read(pos, buf);
+ pos += nread;
+ return nread;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ // You cannot skip backwards
+ if (n <= 0) {
+ return 0;
+ }
+ // You can't skip past the end of the replica.
+ long oldPos = pos;
+ pos += n;
+ if (pos > visibleLength) {
+ pos = visibleLength;
+ }
+ return pos - oldPos;
+ }
+
+ @Override
+ public int available() throws IOException {
+ // We return the amount of bytes that we haven't read yet from the
+ // replica, based on our current position. Some of the other block
+ // readers return a shorter length than that. The only advantage to
+ // returning a shorter length is that the DFSInputStream will
+ // trash your block reader and create a new one if someone tries to
+ // seek() beyond the available() region.
+ long diff = visibleLength - pos;
+ if (diff > Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ } else {
+ return (int)diff;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ accessor.close();
+ }
+
+ @Override
+ public void readFully(byte[] buf, int offset, int len) throws IOException {
+ BlockReaderUtil.readFully(this, buf, offset, len);
+ }
+
+ @Override
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return BlockReaderUtil.readAll(this, buf, offset, len);
+ }
+
+ @Override
+ public boolean isLocal() {
+ return accessor.isLocal();
+ }
+
+ @Override
+ public boolean isShortCircuit() {
+ return accessor.isShortCircuit();
+ }
+
+ @Override
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+ // For now, pluggable ReplicaAccessors do not support zero-copy.
+ return null;
+ }
++
++ @Override
++ public DataChecksum getDataChecksum() {
++ return null;
++ }
+ }