You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/29 22:30:26 UTC
[24/50] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream
and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bf37d3d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bf37d3d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bf37d3d8
Branch: refs/heads/HDFS-7240
Commit: bf37d3d80e5179dea27e5bd5aea804a38aa9934c
Parents: 861b52d
Author: Haohui Mai <wh...@apache.org>
Authored: Sat Sep 26 11:08:25 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Sat Sep 26 11:08:25 2015 -0700
----------------------------------------------------------------------
.../dev-support/findbugsExcludeFile.xml | 24 +
.../org/apache/hadoop/fs/HdfsBlockLocation.java | 47 +
.../hadoop/hdfs/BlockMissingException.java | 65 +
.../apache/hadoop/hdfs/BlockReaderFactory.java | 893 +++++
.../java/org/apache/hadoop/hdfs/DFSClient.java | 3144 ++++++++++++++++++
.../hadoop/hdfs/DFSClientFaultInjector.java | 60 +
.../hadoop/hdfs/DFSHedgedReadMetrics.java | 58 +
.../hadoop/hdfs/DFSInotifyEventInputStream.java | 239 ++
.../org/apache/hadoop/hdfs/DFSInputStream.java | 1915 +++++++++++
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 918 +++++
.../java/org/apache/hadoop/hdfs/DFSPacket.java | 345 ++
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 24 +
.../org/apache/hadoop/hdfs/DataStreamer.java | 1904 +++++++++++
.../hadoop/hdfs/HdfsConfigurationLoader.java | 44 +
.../apache/hadoop/hdfs/RemotePeerFactory.java | 43 +
.../hdfs/UnknownCipherSuiteException.java | 35 +
.../UnknownCryptoProtocolVersionException.java | 38 +
.../org/apache/hadoop/hdfs/XAttrHelper.java | 174 +
.../hadoop/hdfs/client/HdfsDataInputStream.java | 113 +
.../hdfs/client/HdfsDataOutputStream.java | 112 +
.../hadoop/hdfs/client/impl/LeaseRenewer.java | 524 +++
.../hdfs/inotify/MissingEventsException.java | 54 +
.../hadoop/hdfs/protocol/AclException.java | 39 +
.../hdfs/protocol/CacheDirectiveIterator.java | 130 +
.../hadoop/hdfs/protocol/CachePoolIterator.java | 63 +
.../hdfs/protocol/EncryptionZoneIterator.java | 64 +
.../QuotaByStorageTypeExceededException.java | 56 +
.../hdfs/protocol/UnresolvedPathException.java | 87 +
.../datatransfer/ReplaceDatanodeOnFailure.java | 200 ++
.../datanode/ReplicaNotFoundException.java | 53 +
.../namenode/RetryStartFileException.java | 36 +
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../dev-support/findbugsExcludeFile.xml | 19 -
.../org/apache/hadoop/fs/HdfsBlockLocation.java | 47 -
.../hadoop/hdfs/BlockMissingException.java | 65 -
.../apache/hadoop/hdfs/BlockReaderFactory.java | 892 -----
.../java/org/apache/hadoop/hdfs/DFSClient.java | 3141 -----------------
.../hadoop/hdfs/DFSClientFaultInjector.java | 57 -
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 -
.../hadoop/hdfs/DFSHedgedReadMetrics.java | 58 -
.../hadoop/hdfs/DFSInotifyEventInputStream.java | 239 --
.../org/apache/hadoop/hdfs/DFSInputStream.java | 1915 -----------
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 918 -----
.../java/org/apache/hadoop/hdfs/DFSPacket.java | 345 --
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 24 -
.../org/apache/hadoop/hdfs/DataStreamer.java | 1903 -----------
.../apache/hadoop/hdfs/HdfsConfiguration.java | 11 +-
.../apache/hadoop/hdfs/RemotePeerFactory.java | 43 -
.../hdfs/UnknownCipherSuiteException.java | 35 -
.../UnknownCryptoProtocolVersionException.java | 38 -
.../org/apache/hadoop/hdfs/XAttrHelper.java | 174 -
.../hadoop/hdfs/client/HdfsDataInputStream.java | 113 -
.../hdfs/client/HdfsDataOutputStream.java | 112 -
.../hadoop/hdfs/client/impl/LeaseRenewer.java | 524 ---
.../hdfs/inotify/MissingEventsException.java | 54 -
.../hadoop/hdfs/protocol/AclException.java | 39 -
.../hdfs/protocol/CacheDirectiveIterator.java | 130 -
.../hadoop/hdfs/protocol/CachePoolIterator.java | 63 -
.../hdfs/protocol/EncryptionZoneIterator.java | 64 -
.../QuotaByStorageTypeExceededException.java | 56 -
.../hdfs/protocol/UnresolvedPathException.java | 87 -
.../datatransfer/ReplaceDatanodeOnFailure.java | 200 --
.../hadoop/hdfs/server/balancer/Dispatcher.java | 3 +-
.../hdfs/server/datanode/BlockReceiver.java | 4 +-
.../hdfs/server/datanode/BlockSender.java | 4 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 3 +-
.../hdfs/server/datanode/DataXceiver.java | 6 +-
.../datanode/ReplicaNotFoundException.java | 53 -
.../datanode/fsdataset/impl/BlockPoolSlice.java | 4 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 8 +-
.../impl/RamDiskAsyncLazyPersistService.java | 4 +-
.../namenode/RetryStartFileException.java | 36 -
.../hdfs/server/namenode/TransferFsImage.java | 4 +-
.../datanode/TestFiDataTransferProtocol2.java | 1 -
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +-
.../hdfs/MiniDFSClusterWithNodeGroup.java | 2 +-
.../hadoop/hdfs/TestBlockStoragePolicy.java | 1 -
.../TestClientProtocolForPipelineRecovery.java | 6 +-
.../apache/hadoop/hdfs/TestCrcCorruption.java | 2 +-
.../org/apache/hadoop/hdfs/TestDFSUtil.java | 8 +-
.../java/org/apache/hadoop/hdfs/TestPread.java | 10 +-
81 files changed, 11544 insertions(+), 11489 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
index 515da24..41a8564 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
@@ -32,4 +32,28 @@
<Method name="allocSlot" />
<Bug pattern="UL_UNRELEASED_LOCK" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.DFSInputStream"/>
+ <Field name="tcpReadsDisabledForTesting"/>
+ <Bug pattern="MS_SHOULD_BE_FINAL"/>
+ </Match>
+
+ <!--
+ ResponseProccessor is thread that is designed to catch RuntimeException.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" />
+ <Method name="run" />
+ <Bug pattern="REC_CATCH_EXCEPTION" />
+ </Match>
+
+ <!--
+ We use a separate lock to guard cachingStrategy in order to separate
+ locks for p-reads from seek + read invocations.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.DFSInputStream" />
+ <Field name="cachingStrategy" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
new file mode 100644
index 0000000..0ccacda
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+
+/**
+ * Wrapper for {@link BlockLocation} that also includes a {@link LocatedBlock},
+ * allowing more detailed queries to the datanode about a block.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HdfsBlockLocation extends BlockLocation {
+
+ private final LocatedBlock block;
+
+ public HdfsBlockLocation(BlockLocation loc, LocatedBlock block)
+ throws IOException {
+ // Initialize with data from passed in BlockLocation
+ super(loc);
+ this.block = block;
+ }
+
+ public LocatedBlock getLocatedBlock() {
+ return block;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
new file mode 100644
index 0000000..7bba8a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This exception is thrown when a read encounters a block that has no locations
+ * associated with it.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMissingException extends IOException {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String filename;
+ private final long offset;
+
+ /**
+ * An exception that indicates that file was corrupted.
+ * @param filename name of corrupted file
+ * @param description a description of the corruption details
+ */
+ public BlockMissingException(String filename, String description, long offset) {
+ super(description);
+ this.filename = filename;
+ this.offset = offset;
+ }
+
+ /**
+ * Returns the name of the corrupted file.
+ * @return name of corrupted file
+ */
+ public String getFile() {
+ return filename;
+ }
+
+ /**
+ * Returns the offset at which this file is corrupted
+ * @return offset of corrupted file
+ */
+ public long getOffset() {
+ return offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
new file mode 100644
index 0000000..69e9da2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -0,0 +1,893 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.PerformanceAdvisory;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to create BlockReader implementations.
+ */
+@InterfaceAudience.Private
+public class BlockReaderFactory implements ShortCircuitReplicaCreator {
+ static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class);
+
+ public static class FailureInjector {
+ public void injectRequestFileDescriptorsFailure() throws IOException {
+ // do nothing
+ }
+ public boolean getSupportsReceiptVerification() {
+ return true;
+ }
+ }
+
+ @VisibleForTesting
+ static ShortCircuitReplicaCreator
+ createShortCircuitReplicaInfoCallback = null;
+
+ private final DfsClientConf conf;
+
+ /**
+ * Injects failures into specific operations during unit tests.
+ */
+ private static FailureInjector failureInjector = new FailureInjector();
+
+ /**
+ * The file name, for logging and debugging purposes.
+ */
+ private String fileName;
+
+ /**
+ * The block ID and block pool ID to use.
+ */
+ private ExtendedBlock block;
+
+ /**
+ * The block token to use for security purposes.
+ */
+ private Token<BlockTokenIdentifier> token;
+
+ /**
+ * The offset within the block to start reading at.
+ */
+ private long startOffset;
+
+ /**
+ * If false, we won't try to verify the block checksum.
+ */
+ private boolean verifyChecksum;
+
+ /**
+ * The name of this client.
+ */
+ private String clientName;
+
+ /**
+ * The DataNode we're talking to.
+ */
+ private DatanodeInfo datanode;
+
+ /**
+ * StorageType of replica on DataNode.
+ */
+ private StorageType storageType;
+
+ /**
+ * If false, we won't try short-circuit local reads.
+ */
+ private boolean allowShortCircuitLocalReads;
+
+ /**
+ * The ClientContext to use for things like the PeerCache.
+ */
+ private ClientContext clientContext;
+
+ /**
+ * Number of bytes to read. -1 indicates no limit.
+ */
+ private long length = -1;
+
+ /**
+ * Caching strategy to use when reading the block.
+ */
+ private CachingStrategy cachingStrategy;
+
+ /**
+ * Socket address to use to connect to peer.
+ */
+ private InetSocketAddress inetSocketAddress;
+
+ /**
+ * Remote peer factory to use to create a peer, if needed.
+ */
+ private RemotePeerFactory remotePeerFactory;
+
+ /**
+ * UserGroupInformation to use for legacy block reader local objects, if needed.
+ */
+ private UserGroupInformation userGroupInformation;
+
+ /**
+ * Configuration to use for legacy block reader local objects, if needed.
+ */
+ private Configuration configuration;
+
+ /**
+ * Information about the domain socket path we should use to connect to the
+ * local peer-- or null if we haven't examined the local domain socket.
+ */
+ private DomainSocketFactory.PathInfo pathInfo;
+
+ /**
+ * The remaining number of times that we'll try to pull a socket out of the
+ * cache.
+ */
+ private int remainingCacheTries;
+
+ public BlockReaderFactory(DfsClientConf conf) {
+ this.conf = conf;
+ this.remainingCacheTries = conf.getNumCachedConnRetry();
+ }
+
+ public BlockReaderFactory setFileName(String fileName) {
+ this.fileName = fileName;
+ return this;
+ }
+
+ public BlockReaderFactory setBlock(ExtendedBlock block) {
+ this.block = block;
+ return this;
+ }
+
+ public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
+ this.token = token;
+ return this;
+ }
+
+ public BlockReaderFactory setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ return this;
+ }
+
+ public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
+ this.verifyChecksum = verifyChecksum;
+ return this;
+ }
+
+ public BlockReaderFactory setClientName(String clientName) {
+ this.clientName = clientName;
+ return this;
+ }
+
+ public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
+ this.datanode = datanode;
+ return this;
+ }
+
+ public BlockReaderFactory setStorageType(StorageType storageType) {
+ this.storageType = storageType;
+ return this;
+ }
+
+ public BlockReaderFactory setAllowShortCircuitLocalReads(
+ boolean allowShortCircuitLocalReads) {
+ this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
+ return this;
+ }
+
+ public BlockReaderFactory setClientCacheContext(
+ ClientContext clientContext) {
+ this.clientContext = clientContext;
+ return this;
+ }
+
+ public BlockReaderFactory setLength(long length) {
+ this.length = length;
+ return this;
+ }
+
+ public BlockReaderFactory setCachingStrategy(
+ CachingStrategy cachingStrategy) {
+ this.cachingStrategy = cachingStrategy;
+ return this;
+ }
+
+ public BlockReaderFactory setInetSocketAddress (
+ InetSocketAddress inetSocketAddress) {
+ this.inetSocketAddress = inetSocketAddress;
+ return this;
+ }
+
+ public BlockReaderFactory setUserGroupInformation(
+ UserGroupInformation userGroupInformation) {
+ this.userGroupInformation = userGroupInformation;
+ return this;
+ }
+
+ public BlockReaderFactory setRemotePeerFactory(
+ RemotePeerFactory remotePeerFactory) {
+ this.remotePeerFactory = remotePeerFactory;
+ return this;
+ }
+
+ public BlockReaderFactory setConfiguration(
+ Configuration configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ @VisibleForTesting
+ public static void setFailureInjectorForTesting(FailureInjector injector) {
+ failureInjector = injector;
+ }
+
+ /**
+ * Build a BlockReader with the given options.
+ *
+ * This function will do the best it can to create a block reader that meets
+ * all of our requirements. We prefer short-circuit block readers
+ * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
+ * former avoid the overhead of socket communication. If short-circuit is
+ * unavailable, our next fallback is data transfer over UNIX domain sockets,
+ * if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't
+ * work, we will try to create a remote block reader that operates over TCP
+ * sockets.
+ *
+ * There are a few caches that are important here.
+ *
+ * The ShortCircuitCache stores file descriptor objects which have been passed
+ * from the DataNode.
+ *
+ * The DomainSocketFactory stores information about UNIX domain socket paths
+ * that we not been able to use in the past, so that we don't waste time
+ * retrying them over and over. (Like all the caches, it does have a timeout,
+ * though.)
+ *
+ * The PeerCache stores peers that we have used in the past. If we can reuse
+ * one of these peers, we avoid the overhead of re-opening a socket. However,
+ * if the socket has been timed out on the remote end, our attempt to reuse
+ * the socket may end with an IOException. For that reason, we limit our
+ * attempts at socket reuse to dfs.client.cached.conn.retry times. After
+ * that, we create new sockets. This avoids the problem where a thread tries
+ * to talk to a peer that it hasn't talked to in a while, and has to clean out
+ * every entry in a socket cache full of stale entries.
+ *
+ * @return The new BlockReader. We will not return null.
+ *
+ * @throws InvalidToken
+ * If the block token was invalid.
+ * InvalidEncryptionKeyException
+ * If the encryption key was invalid.
+ * Other IOException
+ * If there was another problem.
+ */
+ public BlockReader build() throws IOException {
+ Preconditions.checkNotNull(configuration);
+ BlockReader reader = tryToCreateExternalBlockReader();
+ if (reader != null) {
+ return reader;
+ }
+ final ShortCircuitConf scConf = conf.getShortCircuitConf();
+ if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
+ if (clientContext.getUseLegacyBlockReaderLocal()) {
+ reader = getLegacyBlockReaderLocal();
+ if (reader != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": returning new legacy block reader local.");
+ }
+ return reader;
+ }
+ } else {
+ reader = getBlockReaderLocal();
+ if (reader != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": returning new block reader local.");
+ }
+ return reader;
+ }
+ }
+ }
+ if (scConf.isDomainSocketDataTraffic()) {
+ reader = getRemoteBlockReaderFromDomain();
+ if (reader != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": returning new remote block reader using " +
+ "UNIX domain socket on " + pathInfo.getPath());
+ }
+ return reader;
+ }
+ }
+ Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
+ "TCP reads were disabled for testing, but we failed to " +
+ "do a non-TCP read.");
+ return getRemoteBlockReaderFromTcp();
+ }
+
+ private BlockReader tryToCreateExternalBlockReader() {
+ List<Class<? extends ReplicaAccessorBuilder>> clses =
+ conf.getReplicaAccessorBuilderClasses();
+ for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
+ try {
+ ByteArrayDataOutput bado = ByteStreams.newDataOutput();
+ token.write(bado);
+ byte tokenBytes[] = bado.toByteArray();
+
+ Constructor<? extends ReplicaAccessorBuilder> ctor =
+ cls.getConstructor();
+ ReplicaAccessorBuilder builder = ctor.newInstance();
+ ReplicaAccessor accessor = builder.
+ setAllowShortCircuitReads(allowShortCircuitLocalReads).
+ setBlock(block.getBlockId(), block.getBlockPoolId()).
+ setGenerationStamp(block.getGenerationStamp()).
+ setBlockAccessToken(tokenBytes).
+ setClientName(clientName).
+ setConfiguration(configuration).
+ setFileName(fileName).
+ setVerifyChecksum(verifyChecksum).
+ setVisibleLength(length).
+ build();
+ if (accessor == null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": No ReplicaAccessor created by " +
+ cls.getName());
+ }
+ } else {
+ return new ExternalBlockReader(accessor, length, startOffset);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Failed to construct new object of type " +
+ cls.getName(), t);
+ }
+ }
+ return null;
+ }
+
+
+ /**
+ * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
+ * This block reader implements the path-based style of local reads
+ * first introduced in HDFS-2246.
+ */
+ private BlockReader getLegacyBlockReaderLocal() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
+ }
+ if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
+ "the address " + inetSocketAddress + " is not local");
+ }
+ return null;
+ }
+ if (clientContext.getDisableLegacyBlockReaderLocal()) {
+ PerformanceAdvisory.LOG.debug("{}: can't construct " +
+ "BlockReaderLocalLegacy because " +
+ "disableLegacyBlockReaderLocal is set.", this);
+ return null;
+ }
+ IOException ioe;
+ try {
+ return BlockReaderLocalLegacy.newBlockReader(conf,
+ userGroupInformation, configuration, fileName, block, token,
+ datanode, startOffset, length, storageType);
+ } catch (RemoteException remoteException) {
+ ioe = remoteException.unwrapRemoteException(
+ InvalidToken.class, AccessControlException.class);
+ } catch (IOException e) {
+ ioe = e;
+ }
+ if ((!(ioe instanceof AccessControlException)) &&
+ isSecurityException(ioe)) {
+ // Handle security exceptions.
+ // We do not handle AccessControlException here, since
+ // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
+ // that the user is not in dfs.block.local-path-access.user, a condition
+ // which requires us to disable legacy SCR.
+ throw ioe;
+ }
+ LOG.warn(this + ": error creating legacy BlockReaderLocal. " +
+ "Disabling legacy local reads.", ioe);
+ clientContext.setDisableLegacyBlockReaderLocal();
+ return null;
+ }
+
+ private BlockReader getBlockReaderLocal() throws InvalidToken {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trying to construct a BlockReaderLocal " +
+ "for short-circuit reads.");
+ }
+ if (pathInfo == null) {
+ pathInfo = clientContext.getDomainSocketFactory()
+ .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
+ }
+ if (!pathInfo.getPathState().getUsableForShortCircuit()) {
+ PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
+ "giving up on BlockReaderLocal.", this, pathInfo);
+ return null;
+ }
+ ShortCircuitCache cache = clientContext.getShortCircuitCache();
+ ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+ ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
+ InvalidToken exc = info.getInvalidTokenException();
+ if (exc != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": got InvalidToken exception while trying to " +
+ "construct BlockReaderLocal via " + pathInfo.getPath());
+ }
+ throw exc;
+ }
+ if (info.getReplica() == null) {
+ PerformanceAdvisory.LOG.debug("{}: failed to get " +
+ "ShortCircuitReplica. Cannot construct " +
+ "BlockReaderLocal via {}", this, pathInfo.getPath());
+ return null;
+ }
+ return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
+ setFilename(fileName).
+ setBlock(block).
+ setStartOffset(startOffset).
+ setShortCircuitReplica(info.getReplica()).
+ setVerifyChecksum(verifyChecksum).
+ setCachingStrategy(cachingStrategy).
+ setStorageType(storageType).
+ build();
+ }
+
+ /**
+ * Fetch a pair of short-circuit block descriptors from a local DataNode.
+ *
+ * @return Null if we could not communicate with the datanode,
+ * a new ShortCircuitReplicaInfo object otherwise.
+ * ShortCircuitReplicaInfo objects may contain either an InvalidToken
+ * exception, or a ShortCircuitReplica object ready to use.
+ */
+ @Override
+ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+ if (createShortCircuitReplicaInfoCallback != null) {
+ ShortCircuitReplicaInfo info =
+ createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
+ if (info != null) return info;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
+ }
+ BlockReaderPeer curPeer;
+ while (true) {
+ curPeer = nextDomainPeer();
+ if (curPeer == null) break;
+ if (curPeer.fromCache) remainingCacheTries--;
+ DomainPeer peer = (DomainPeer)curPeer.peer;
+ Slot slot = null;
+ ShortCircuitCache cache = clientContext.getShortCircuitCache();
+ try {
+ MutableBoolean usedPeer = new MutableBoolean(false);
+ slot = cache.allocShmSlot(datanode, peer, usedPeer,
+ new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
+ clientName);
+ if (usedPeer.booleanValue()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": allocShmSlot used up our previous socket " +
+ peer.getDomainSocket() + ". Allocating a new one...");
+ }
+ curPeer = nextDomainPeer();
+ if (curPeer == null) break;
+ peer = (DomainPeer)curPeer.peer;
+ }
+ ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
+ clientContext.getPeerCache().put(datanode, peer);
+ return info;
+ } catch (IOException e) {
+ if (slot != null) {
+ cache.freeSlot(slot);
+ }
+ if (curPeer.fromCache) {
+ // Handle an I/O error we got when using a cached socket.
+ // These are considered less serious, because the socket may be stale.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + ": closing stale domain peer " + peer, e);
+ }
+ IOUtilsClient.cleanup(LOG, peer);
+ } else {
+ // Handle an I/O error we got when using a newly created socket.
+ // We temporarily disable the domain socket path for a few minutes in
+ // this case, to prevent wasting more time on it.
+ LOG.warn(this + ": I/O error requesting file descriptors. " +
+ "Disabling domain socket " + peer.getDomainSocket(), e);
+ IOUtilsClient.cleanup(LOG, peer);
+ clientContext.getDomainSocketFactory()
+ .disableDomainSocketPath(pathInfo.getPath());
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Request file descriptors from a DomainPeer.
+ *
+ * @param peer The peer to use for communication.
+ * @param slot If non-null, the shared memory slot to associate with the
+ * new ShortCircuitReplica.
+ *
+ * @return A ShortCircuitReplica object if we could communicate with the
+ * datanode; null, otherwise.
+ * @throws IOException If we encountered an I/O exception while communicating
+ * with the datanode.
+ */
+ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
+ Slot slot) throws IOException {
+ ShortCircuitCache cache = clientContext.getShortCircuitCache();
+ final DataOutputStream out =
+ new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+ SlotId slotId = slot == null ? null : slot.getSlotId();
+ new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
+ failureInjector.getSupportsReceiptVerification());
+ DataInputStream in = new DataInputStream(peer.getInputStream());
+ BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(in));
+ DomainSocket sock = peer.getDomainSocket();
+ failureInjector.injectRequestFileDescriptorsFailure();
+ switch (resp.getStatus()) {
+ case SUCCESS:
+ byte buf[] = new byte[1];
+ FileInputStream fis[] = new FileInputStream[2];
+ sock.recvFileInputStreams(fis, buf, 0, buf.length);
+ ShortCircuitReplica replica = null;
+ try {
+ ExtendedBlockId key =
+ new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+ if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
+ LOG.trace("Sending receipt verification byte for slot " + slot);
+ sock.getOutputStream().write(0);
+ }
+ replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
+ Time.monotonicNow(), slot);
+ return new ShortCircuitReplicaInfo(replica);
+ } catch (IOException e) {
+ // This indicates an error reading from disk, or a format error. Since
+ // it's not a socket communication problem, we return null rather than
+ // throwing an exception.
+ LOG.warn(this + ": error creating ShortCircuitReplica.", e);
+ return null;
+ } finally {
+ if (replica == null) {
+ IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]);
+ }
+ }
+ case ERROR_UNSUPPORTED:
+ if (!resp.hasShortCircuitAccessVersion()) {
+ LOG.warn("short-circuit read access is disabled for " +
+ "DataNode " + datanode + ". reason: " + resp.getMessage());
+ clientContext.getDomainSocketFactory()
+ .disableShortCircuitForPath(pathInfo.getPath());
+ } else {
+ LOG.warn("short-circuit read access for the file " +
+ fileName + " is disabled for DataNode " + datanode +
+ ". reason: " + resp.getMessage());
+ }
+ return null;
+ case ERROR_ACCESS_TOKEN:
+ String msg = "access control error while " +
+ "attempting to set up short-circuit access to " +
+ fileName + resp.getMessage();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + ":" + msg);
+ }
+ return new ShortCircuitReplicaInfo(new InvalidToken(msg));
+ default:
+ LOG.warn(this + ": unknown response code " + resp.getStatus() +
+ " while attempting to set up short-circuit access. " +
+ resp.getMessage());
+ clientContext.getDomainSocketFactory()
+ .disableShortCircuitForPath(pathInfo.getPath());
+ return null;
+ }
+ }
+
+ /**
+ * Get a RemoteBlockReader that communicates over a UNIX domain socket.
+ *
+ * @return The new BlockReader, or null if we failed to create the block
+ * reader.
+ *
+ * @throws InvalidToken If the block token was invalid.
+ * Potentially other security-related execptions.
+ */
+ private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
+ if (pathInfo == null) {
+ pathInfo = clientContext.getDomainSocketFactory()
+ .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
+ }
+ if (!pathInfo.getPathState().getUsableForDataTransfer()) {
+ PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
+ "remote block reader because the UNIX domain socket at {}" +
+ " is not usable.", this, pathInfo);
+ return null;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trying to create a remote block reader from the " +
+ "UNIX domain socket at " + pathInfo.getPath());
+ }
+
+ while (true) {
+ BlockReaderPeer curPeer = nextDomainPeer();
+ if (curPeer == null) break;
+ if (curPeer.fromCache) remainingCacheTries--;
+ DomainPeer peer = (DomainPeer)curPeer.peer;
+ BlockReader blockReader = null;
+ try {
+ blockReader = getRemoteBlockReader(peer);
+ return blockReader;
+ } catch (IOException ioe) {
+ IOUtilsClient.cleanup(LOG, peer);
+ if (isSecurityException(ioe)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": got security exception while constructing " +
+ "a remote block reader from the unix domain socket at " +
+ pathInfo.getPath(), ioe);
+ }
+ throw ioe;
+ }
+ if (curPeer.fromCache) {
+ // Handle an I/O error we got when using a cached peer. These are
+ // considered less serious, because the underlying socket may be stale.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closed potentially stale domain peer " + peer, ioe);
+ }
+ } else {
+ // Handle an I/O error we got when using a newly created domain peer.
+ // We temporarily disable the domain socket path for a few minutes in
+ // this case, to prevent wasting more time on it.
+ LOG.warn("I/O error constructing remote block reader. Disabling " +
+ "domain socket " + peer.getDomainSocket(), ioe);
+ clientContext.getDomainSocketFactory()
+ .disableDomainSocketPath(pathInfo.getPath());
+ return null;
+ }
+ } finally {
+ if (blockReader == null) {
+ IOUtilsClient.cleanup(LOG, peer);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get a RemoteBlockReader that communicates over a TCP socket.
+ *
+ * @return The new BlockReader. We will not return null, but instead throw
+ * an exception if this fails.
+ *
+ * @throws InvalidToken
+ * If the block token was invalid.
+ * InvalidEncryptionKeyException
+ * If the encryption key was invalid.
+ * Other IOException
+ * If there was another problem.
+ */
+ private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trying to create a remote block reader from a " +
+ "TCP socket");
+ }
+ BlockReader blockReader = null;
+ while (true) {
+ BlockReaderPeer curPeer = null;
+ Peer peer = null;
+ try {
+ curPeer = nextTcpPeer();
+ if (curPeer.fromCache) remainingCacheTries--;
+ peer = curPeer.peer;
+ blockReader = getRemoteBlockReader(peer);
+ return blockReader;
+ } catch (IOException ioe) {
+ if (isSecurityException(ioe)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": got security exception while constructing " +
+ "a remote block reader from " + peer, ioe);
+ }
+ throw ioe;
+ }
+ if ((curPeer != null) && curPeer.fromCache) {
+ // Handle an I/O error we got when using a cached peer. These are
+ // considered less serious, because the underlying socket may be
+ // stale.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closed potentially stale remote peer " + peer, ioe);
+ }
+ } else {
+ // Handle an I/O error we got when using a newly created peer.
+ LOG.warn("I/O error constructing remote block reader.", ioe);
+ throw ioe;
+ }
+ } finally {
+ if (blockReader == null) {
+ IOUtilsClient.cleanup(LOG, peer);
+ }
+ }
+ }
+ }
+
+ public static class BlockReaderPeer {
+ final Peer peer;
+ final boolean fromCache;
+
+ BlockReaderPeer(Peer peer, boolean fromCache) {
+ this.peer = peer;
+ this.fromCache = fromCache;
+ }
+ }
+
+ /**
+ * Get the next DomainPeer-- either from the cache or by creating it.
+ *
+ * @return the next DomainPeer, or null if we could not construct one.
+ */
+ private BlockReaderPeer nextDomainPeer() {
+ if (remainingCacheTries > 0) {
+ Peer peer = clientContext.getPeerCache().get(datanode, true);
+ if (peer != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("nextDomainPeer: reusing existing peer " + peer);
+ }
+ return new BlockReaderPeer(peer, true);
+ }
+ }
+ DomainSocket sock = clientContext.getDomainSocketFactory().
+ createSocket(pathInfo, conf.getSocketTimeout());
+ if (sock == null) return null;
+ return new BlockReaderPeer(new DomainPeer(sock), false);
+ }
+
+ /**
+ * Get the next TCP-based peer-- either from the cache or by creating it.
+ *
+ * @return the next Peer, or null if we could not construct one.
+ *
+ * @throws IOException If there was an error while constructing the peer
+ * (such as an InvalidEncryptionKeyException)
+ */
+ private BlockReaderPeer nextTcpPeer() throws IOException {
+ if (remainingCacheTries > 0) {
+ Peer peer = clientContext.getPeerCache().get(datanode, false);
+ if (peer != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("nextTcpPeer: reusing existing peer " + peer);
+ }
+ return new BlockReaderPeer(peer, true);
+ }
+ }
+ try {
+ Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
+ datanode);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
+ }
+ return new BlockReaderPeer(peer, false);
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
+ "connected to " + datanode);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Determine if an exception is security-related.
+ *
+ * We need to handle these exceptions differently than other IOExceptions.
+ * They don't indicate a communication problem. Instead, they mean that there
+ * is some action the client needs to take, such as refetching block tokens,
+ * renewing encryption keys, etc.
+ *
+ * @param ioe The exception
+ * @return True only if the exception is security-related.
+ */
+ private static boolean isSecurityException(IOException ioe) {
+ return (ioe instanceof InvalidToken) ||
+ (ioe instanceof InvalidEncryptionKeyException) ||
+ (ioe instanceof InvalidBlockTokenException) ||
+ (ioe instanceof AccessControlException);
+ }
+
+ @SuppressWarnings("deprecation")
+ private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
+ if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
+ return RemoteBlockReader.newBlockReader(fileName,
+ block, token, startOffset, length, conf.getIoBufferSize(),
+ verifyChecksum, clientName, peer, datanode,
+ clientContext.getPeerCache(), cachingStrategy);
+ } else {
+ return RemoteBlockReader2.newBlockReader(
+ fileName, block, token, startOffset, length,
+ verifyChecksum, clientName, peer, datanode,
+ clientContext.getPeerCache(), cachingStrategy);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
+ }
+
+ /**
+ * File name to print when accessing a block directly (from servlets)
+ * @param s Address of the block location
+ * @param poolId Block pool ID of the block
+ * @param blockId Block ID of the block
+ * @return string that has a file name for debug purposes
+ */
+ public static String getFileName(final InetSocketAddress s,
+ final String poolId, final long blockId) {
+ return s.toString() + ":" + poolId + ":" + blockId;
+ }
+}