You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/09/26 20:17:21 UTC
[12/12] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream and
related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/94cbb6d1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/94cbb6d1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/94cbb6d1
Branch: refs/heads/branch-2
Commit: 94cbb6d16483ba011b7104565b4084bf2a3eb6e6
Parents: b46e4ce
Author: Haohui Mai <wh...@apache.org>
Authored: Sat Sep 26 11:08:25 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Sat Sep 26 11:16:50 2015 -0700
----------------------------------------------------------------------
.../dev-support/findbugsExcludeFile.xml | 24 +
.../apache/hadoop/fs/BlockStorageLocation.java | 52 +
.../org/apache/hadoop/fs/HdfsBlockLocation.java | 47 +
.../java/org/apache/hadoop/fs/HdfsVolumeId.java | 73 +
.../java/org/apache/hadoop/fs/VolumeId.java | 40 +
.../hadoop/hdfs/BlockMissingException.java | 65 +
.../apache/hadoop/hdfs/BlockReaderFactory.java | 893 +++++
.../hadoop/hdfs/BlockStorageLocationUtil.java | 369 ++
.../java/org/apache/hadoop/hdfs/DFSClient.java | 3203 ++++++++++++++++++
.../hadoop/hdfs/DFSClientFaultInjector.java | 60 +
.../hadoop/hdfs/DFSHedgedReadMetrics.java | 58 +
.../hadoop/hdfs/DFSInotifyEventInputStream.java | 239 ++
.../org/apache/hadoop/hdfs/DFSInputStream.java | 1915 +++++++++++
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 923 +++++
.../java/org/apache/hadoop/hdfs/DFSPacket.java | 345 ++
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 24 +
.../org/apache/hadoop/hdfs/DataStreamer.java | 1904 +++++++++++
.../hadoop/hdfs/HdfsConfigurationLoader.java | 44 +
.../apache/hadoop/hdfs/RemotePeerFactory.java | 43 +
.../hdfs/UnknownCipherSuiteException.java | 35 +
.../UnknownCryptoProtocolVersionException.java | 38 +
.../org/apache/hadoop/hdfs/XAttrHelper.java | 174 +
.../hadoop/hdfs/client/HdfsDataInputStream.java | 113 +
.../hdfs/client/HdfsDataOutputStream.java | 112 +
.../hadoop/hdfs/client/impl/LeaseRenewer.java | 524 +++
.../hdfs/inotify/MissingEventsException.java | 54 +
.../hadoop/hdfs/protocol/AclException.java | 39 +
.../hdfs/protocol/CacheDirectiveIterator.java | 130 +
.../hadoop/hdfs/protocol/CachePoolIterator.java | 63 +
.../hdfs/protocol/EncryptionZoneIterator.java | 64 +
.../QuotaByStorageTypeExceededException.java | 56 +
.../hdfs/protocol/UnresolvedPathException.java | 87 +
.../datatransfer/ReplaceDatanodeOnFailure.java | 200 ++
.../datanode/ReplicaNotFoundException.java | 53 +
.../namenode/RetryStartFileException.java | 36 +
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../dev-support/findbugsExcludeFile.xml | 19 -
.../apache/hadoop/fs/BlockStorageLocation.java | 52 -
.../org/apache/hadoop/fs/HdfsBlockLocation.java | 47 -
.../java/org/apache/hadoop/fs/HdfsVolumeId.java | 73 -
.../java/org/apache/hadoop/fs/VolumeId.java | 40 -
.../hadoop/hdfs/BlockMissingException.java | 65 -
.../apache/hadoop/hdfs/BlockReaderFactory.java | 892 -----
.../hadoop/hdfs/BlockStorageLocationUtil.java | 369 --
.../java/org/apache/hadoop/hdfs/DFSClient.java | 3200 -----------------
.../hadoop/hdfs/DFSClientFaultInjector.java | 57 -
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 -
.../hadoop/hdfs/DFSHedgedReadMetrics.java | 58 -
.../hadoop/hdfs/DFSInotifyEventInputStream.java | 239 --
.../org/apache/hadoop/hdfs/DFSInputStream.java | 1915 -----------
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 923 -----
.../java/org/apache/hadoop/hdfs/DFSPacket.java | 345 --
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 23 -
.../org/apache/hadoop/hdfs/DataStreamer.java | 1903 -----------
.../apache/hadoop/hdfs/HdfsConfiguration.java | 11 +-
.../apache/hadoop/hdfs/RemotePeerFactory.java | 43 -
.../hdfs/UnknownCipherSuiteException.java | 35 -
.../UnknownCryptoProtocolVersionException.java | 38 -
.../org/apache/hadoop/hdfs/XAttrHelper.java | 174 -
.../hadoop/hdfs/client/HdfsDataInputStream.java | 113 -
.../hdfs/client/HdfsDataOutputStream.java | 112 -
.../hadoop/hdfs/client/impl/LeaseRenewer.java | 524 ---
.../hdfs/inotify/MissingEventsException.java | 54 -
.../hadoop/hdfs/protocol/AclException.java | 39 -
.../hdfs/protocol/CacheDirectiveIterator.java | 130 -
.../hadoop/hdfs/protocol/CachePoolIterator.java | 63 -
.../hdfs/protocol/EncryptionZoneIterator.java | 64 -
.../QuotaByStorageTypeExceededException.java | 56 -
.../hdfs/protocol/UnresolvedPathException.java | 87 -
.../datatransfer/ReplaceDatanodeOnFailure.java | 200 --
.../hadoop/hdfs/server/balancer/Dispatcher.java | 3 +-
.../hdfs/server/datanode/BlockReceiver.java | 4 +-
.../hdfs/server/datanode/BlockSender.java | 4 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 3 +-
.../hdfs/server/datanode/DataXceiver.java | 6 +-
.../datanode/ReplicaNotFoundException.java | 53 -
.../datanode/fsdataset/impl/BlockPoolSlice.java | 4 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 8 +-
.../impl/RamDiskAsyncLazyPersistService.java | 4 +-
.../namenode/RetryStartFileException.java | 36 -
.../hdfs/server/namenode/TransferFsImage.java | 4 +-
.../datanode/TestFiDataTransferProtocol2.java | 1 -
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +-
.../hdfs/MiniDFSClusterWithNodeGroup.java | 2 +-
.../hadoop/hdfs/TestBlockStoragePolicy.java | 1 -
.../TestClientProtocolForPipelineRecovery.java | 6 +-
.../apache/hadoop/hdfs/TestCrcCorruption.java | 2 +-
.../org/apache/hadoop/hdfs/TestDFSUtil.java | 8 +-
.../org/apache/hadoop/hdfs/TestFileStatus.java | 2 +-
.../java/org/apache/hadoop/hdfs/TestPread.java | 10 +-
90 files changed, 12143 insertions(+), 12087 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
index 515da24..41a8564 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
@@ -32,4 +32,28 @@
<Method name="allocSlot" />
<Bug pattern="UL_UNRELEASED_LOCK" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.DFSInputStream"/>
+ <Field name="tcpReadsDisabledForTesting"/>
+ <Bug pattern="MS_SHOULD_BE_FINAL"/>
+ </Match>
+
+ <!--
+ ResponseProccessor is thread that is designed to catch RuntimeException.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" />
+ <Method name="run" />
+ <Bug pattern="REC_CATCH_EXCEPTION" />
+ </Match>
+
+ <!--
+ We use a separate lock to guard cachingStrategy in order to separate
+ locks for p-reads from seek + read invocations.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.DFSInputStream" />
+ <Field name="cachingStrategy" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
new file mode 100644
index 0000000..2200994
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Wrapper for {@link BlockLocation} that also adds {@link VolumeId} volume
+ * location information for each replica.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+@Deprecated
+public class BlockStorageLocation extends BlockLocation {
+
+ private final VolumeId[] volumeIds;
+
+ public BlockStorageLocation(BlockLocation loc, VolumeId[] volumeIds)
+ throws IOException {
+ // Initialize with data from passed in BlockLocation
+ super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), loc
+ .getOffset(), loc.getLength(), loc.isCorrupt());
+ this.volumeIds = volumeIds;
+ }
+
+ /**
+ * Gets the list of {@link VolumeId} corresponding to the block's replicas.
+ *
+ * @return volumeIds list of VolumeId for the block's replicas
+ */
+ public VolumeId[] getVolumeIds() {
+ return volumeIds;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
new file mode 100644
index 0000000..0ccacda
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+
+/**
+ * Wrapper for {@link BlockLocation} that also includes a {@link LocatedBlock},
+ * allowing more detailed queries to the datanode about a block.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HdfsBlockLocation extends BlockLocation {
+
+ private final LocatedBlock block;
+
+ public HdfsBlockLocation(BlockLocation loc, LocatedBlock block)
+ throws IOException {
+ // Initialize with data from passed in BlockLocation
+ super(loc);
+ this.block = block;
+ }
+
+ public LocatedBlock getLocatedBlock() {
+ return block;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
new file mode 100644
index 0000000..6e9d3d7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * HDFS-specific volume identifier which implements {@link VolumeId}. Can be
+ * used to differentiate between the data directories on a single datanode. This
+ * identifier is only unique on a per-datanode basis.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+public class HdfsVolumeId implements VolumeId {
+
+ private final byte[] id;
+
+ public HdfsVolumeId(byte[] id) {
+ Preconditions.checkNotNull(id, "id cannot be null");
+ this.id = id;
+ }
+
+ @Override
+ public int compareTo(VolumeId arg0) {
+ if (arg0 == null) {
+ return 1;
+ }
+ return hashCode() - arg0.hashCode();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(id).toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || obj.getClass() != getClass()) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ HdfsVolumeId that = (HdfsVolumeId) obj;
+ return new EqualsBuilder().append(this.id, that.id).isEquals();
+ }
+
+ @Override
+ public String toString() {
+ return StringUtils.byteToHexString(id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java
new file mode 100644
index 0000000..e56e304
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Opaque interface that identifies a disk location. Subclasses
+ * should implement {@link Comparable} and override both equals and hashCode.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+public interface VolumeId extends Comparable<VolumeId> {
+
+ @Override
+ abstract public int compareTo(VolumeId arg0);
+
+ @Override
+ abstract public int hashCode();
+
+ @Override
+ abstract public boolean equals(Object obj);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
new file mode 100644
index 0000000..7bba8a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This exception is thrown when a read encounters a block that has no locations
+ * associated with it.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMissingException extends IOException {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String filename;
+ private final long offset;
+
+ /**
+ * An exception that indicates that file was corrupted.
+ * @param filename name of corrupted file
+ * @param description a description of the corruption details
+ */
+ public BlockMissingException(String filename, String description, long offset) {
+ super(description);
+ this.filename = filename;
+ this.offset = offset;
+ }
+
+ /**
+ * Returns the name of the corrupted file.
+ * @return name of corrupted file
+ */
+ public String getFile() {
+ return filename;
+ }
+
+ /**
+ * Returns the offset at which this file is corrupted
+ * @return offset of corrupted file
+ */
+ public long getOffset() {
+ return offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
new file mode 100644
index 0000000..69e9da2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -0,0 +1,893 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.PerformanceAdvisory;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to create BlockReader implementations.
+ */
+@InterfaceAudience.Private
+public class BlockReaderFactory implements ShortCircuitReplicaCreator {
+ static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class);
+
+ public static class FailureInjector {
+ public void injectRequestFileDescriptorsFailure() throws IOException {
+ // do nothing
+ }
+ public boolean getSupportsReceiptVerification() {
+ return true;
+ }
+ }
+
+ @VisibleForTesting
+ static ShortCircuitReplicaCreator
+ createShortCircuitReplicaInfoCallback = null;
+
+ private final DfsClientConf conf;
+
+ /**
+ * Injects failures into specific operations during unit tests.
+ */
+ private static FailureInjector failureInjector = new FailureInjector();
+
+ /**
+ * The file name, for logging and debugging purposes.
+ */
+ private String fileName;
+
+ /**
+ * The block ID and block pool ID to use.
+ */
+ private ExtendedBlock block;
+
+ /**
+ * The block token to use for security purposes.
+ */
+ private Token<BlockTokenIdentifier> token;
+
+ /**
+ * The offset within the block to start reading at.
+ */
+ private long startOffset;
+
+ /**
+ * If false, we won't try to verify the block checksum.
+ */
+ private boolean verifyChecksum;
+
+ /**
+ * The name of this client.
+ */
+ private String clientName;
+
+ /**
+ * The DataNode we're talking to.
+ */
+ private DatanodeInfo datanode;
+
+ /**
+ * StorageType of replica on DataNode.
+ */
+ private StorageType storageType;
+
+ /**
+ * If false, we won't try short-circuit local reads.
+ */
+ private boolean allowShortCircuitLocalReads;
+
+ /**
+ * The ClientContext to use for things like the PeerCache.
+ */
+ private ClientContext clientContext;
+
+ /**
+ * Number of bytes to read. -1 indicates no limit.
+ */
+ private long length = -1;
+
+ /**
+ * Caching strategy to use when reading the block.
+ */
+ private CachingStrategy cachingStrategy;
+
+ /**
+ * Socket address to use to connect to peer.
+ */
+ private InetSocketAddress inetSocketAddress;
+
+ /**
+ * Remote peer factory to use to create a peer, if needed.
+ */
+ private RemotePeerFactory remotePeerFactory;
+
+ /**
+ * UserGroupInformation to use for legacy block reader local objects, if needed.
+ */
+ private UserGroupInformation userGroupInformation;
+
+ /**
+ * Configuration to use for legacy block reader local objects, if needed.
+ */
+ private Configuration configuration;
+
+ /**
+ * Information about the domain socket path we should use to connect to the
+ * local peer-- or null if we haven't examined the local domain socket.
+ */
+ private DomainSocketFactory.PathInfo pathInfo;
+
+ /**
+ * The remaining number of times that we'll try to pull a socket out of the
+ * cache.
+ */
+ private int remainingCacheTries;
+
+ public BlockReaderFactory(DfsClientConf conf) {
+ this.conf = conf;
+ this.remainingCacheTries = conf.getNumCachedConnRetry();
+ }
+
+ public BlockReaderFactory setFileName(String fileName) {
+ this.fileName = fileName;
+ return this;
+ }
+
+ public BlockReaderFactory setBlock(ExtendedBlock block) {
+ this.block = block;
+ return this;
+ }
+
+ public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
+ this.token = token;
+ return this;
+ }
+
+ public BlockReaderFactory setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ return this;
+ }
+
+ public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
+ this.verifyChecksum = verifyChecksum;
+ return this;
+ }
+
+ public BlockReaderFactory setClientName(String clientName) {
+ this.clientName = clientName;
+ return this;
+ }
+
+ public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
+ this.datanode = datanode;
+ return this;
+ }
+
+ public BlockReaderFactory setStorageType(StorageType storageType) {
+ this.storageType = storageType;
+ return this;
+ }
+
+ public BlockReaderFactory setAllowShortCircuitLocalReads(
+ boolean allowShortCircuitLocalReads) {
+ this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
+ return this;
+ }
+
+ public BlockReaderFactory setClientCacheContext(
+ ClientContext clientContext) {
+ this.clientContext = clientContext;
+ return this;
+ }
+
+ public BlockReaderFactory setLength(long length) {
+ this.length = length;
+ return this;
+ }
+
+ public BlockReaderFactory setCachingStrategy(
+ CachingStrategy cachingStrategy) {
+ this.cachingStrategy = cachingStrategy;
+ return this;
+ }
+
+ public BlockReaderFactory setInetSocketAddress (
+ InetSocketAddress inetSocketAddress) {
+ this.inetSocketAddress = inetSocketAddress;
+ return this;
+ }
+
+ public BlockReaderFactory setUserGroupInformation(
+ UserGroupInformation userGroupInformation) {
+ this.userGroupInformation = userGroupInformation;
+ return this;
+ }
+
+ public BlockReaderFactory setRemotePeerFactory(
+ RemotePeerFactory remotePeerFactory) {
+ this.remotePeerFactory = remotePeerFactory;
+ return this;
+ }
+
+ public BlockReaderFactory setConfiguration(
+ Configuration configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ @VisibleForTesting
+ public static void setFailureInjectorForTesting(FailureInjector injector) {
+ failureInjector = injector;
+ }
+
+ /**
+ * Build a BlockReader with the given options.
+ *
+ * This function will do the best it can to create a block reader that meets
+ * all of our requirements. We prefer short-circuit block readers
+ * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
+ * former avoid the overhead of socket communication. If short-circuit is
+ * unavailable, our next fallback is data transfer over UNIX domain sockets,
+ * if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't
+ * work, we will try to create a remote block reader that operates over TCP
+ * sockets.
+ *
+ * There are a few caches that are important here.
+ *
+ * The ShortCircuitCache stores file descriptor objects which have been passed
+ * from the DataNode.
+ *
+ * The DomainSocketFactory stores information about UNIX domain socket paths
+ * that we not been able to use in the past, so that we don't waste time
+ * retrying them over and over. (Like all the caches, it does have a timeout,
+ * though.)
+ *
+ * The PeerCache stores peers that we have used in the past. If we can reuse
+ * one of these peers, we avoid the overhead of re-opening a socket. However,
+ * if the socket has been timed out on the remote end, our attempt to reuse
+ * the socket may end with an IOException. For that reason, we limit our
+ * attempts at socket reuse to dfs.client.cached.conn.retry times. After
+ * that, we create new sockets. This avoids the problem where a thread tries
+ * to talk to a peer that it hasn't talked to in a while, and has to clean out
+ * every entry in a socket cache full of stale entries.
+ *
+ * @return The new BlockReader. We will not return null.
+ *
+ * @throws InvalidToken
+ * If the block token was invalid.
+ * InvalidEncryptionKeyException
+ * If the encryption key was invalid.
+ * Other IOException
+ * If there was another problem.
+ */
+ public BlockReader build() throws IOException {
+ Preconditions.checkNotNull(configuration);
+ BlockReader reader = tryToCreateExternalBlockReader();
+ if (reader != null) {
+ return reader;
+ }
+ final ShortCircuitConf scConf = conf.getShortCircuitConf();
+ if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
+ if (clientContext.getUseLegacyBlockReaderLocal()) {
+ reader = getLegacyBlockReaderLocal();
+ if (reader != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": returning new legacy block reader local.");
+ }
+ return reader;
+ }
+ } else {
+ reader = getBlockReaderLocal();
+ if (reader != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": returning new block reader local.");
+ }
+ return reader;
+ }
+ }
+ }
+ if (scConf.isDomainSocketDataTraffic()) {
+ reader = getRemoteBlockReaderFromDomain();
+ if (reader != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": returning new remote block reader using " +
+ "UNIX domain socket on " + pathInfo.getPath());
+ }
+ return reader;
+ }
+ }
+ Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
+ "TCP reads were disabled for testing, but we failed to " +
+ "do a non-TCP read.");
+ return getRemoteBlockReaderFromTcp();
+ }
+
+ private BlockReader tryToCreateExternalBlockReader() {
+ List<Class<? extends ReplicaAccessorBuilder>> clses =
+ conf.getReplicaAccessorBuilderClasses();
+ for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
+ try {
+ ByteArrayDataOutput bado = ByteStreams.newDataOutput();
+ token.write(bado);
+ byte tokenBytes[] = bado.toByteArray();
+
+ Constructor<? extends ReplicaAccessorBuilder> ctor =
+ cls.getConstructor();
+ ReplicaAccessorBuilder builder = ctor.newInstance();
+ ReplicaAccessor accessor = builder.
+ setAllowShortCircuitReads(allowShortCircuitLocalReads).
+ setBlock(block.getBlockId(), block.getBlockPoolId()).
+ setGenerationStamp(block.getGenerationStamp()).
+ setBlockAccessToken(tokenBytes).
+ setClientName(clientName).
+ setConfiguration(configuration).
+ setFileName(fileName).
+ setVerifyChecksum(verifyChecksum).
+ setVisibleLength(length).
+ build();
+ if (accessor == null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": No ReplicaAccessor created by " +
+ cls.getName());
+ }
+ } else {
+ return new ExternalBlockReader(accessor, length, startOffset);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Failed to construct new object of type " +
+ cls.getName(), t);
+ }
+ }
+ return null;
+ }
+
+
+ /**
+ * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
+ * This block reader implements the path-based style of local reads
+ * first introduced in HDFS-2246.
+ */
+ private BlockReader getLegacyBlockReaderLocal() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
+ }
+ if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
+ "the address " + inetSocketAddress + " is not local");
+ }
+ return null;
+ }
+ if (clientContext.getDisableLegacyBlockReaderLocal()) {
+ PerformanceAdvisory.LOG.debug("{}: can't construct " +
+ "BlockReaderLocalLegacy because " +
+ "disableLegacyBlockReaderLocal is set.", this);
+ return null;
+ }
+ IOException ioe;
+ try {
+ return BlockReaderLocalLegacy.newBlockReader(conf,
+ userGroupInformation, configuration, fileName, block, token,
+ datanode, startOffset, length, storageType);
+ } catch (RemoteException remoteException) {
+ ioe = remoteException.unwrapRemoteException(
+ InvalidToken.class, AccessControlException.class);
+ } catch (IOException e) {
+ ioe = e;
+ }
+ if ((!(ioe instanceof AccessControlException)) &&
+ isSecurityException(ioe)) {
+ // Handle security exceptions.
+ // We do not handle AccessControlException here, since
+ // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
+ // that the user is not in dfs.block.local-path-access.user, a condition
+ // which requires us to disable legacy SCR.
+ throw ioe;
+ }
+ LOG.warn(this + ": error creating legacy BlockReaderLocal. " +
+ "Disabling legacy local reads.", ioe);
+ clientContext.setDisableLegacyBlockReaderLocal();
+ return null;
+ }
+
+ private BlockReader getBlockReaderLocal() throws InvalidToken {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trying to construct a BlockReaderLocal " +
+ "for short-circuit reads.");
+ }
+ if (pathInfo == null) {
+ pathInfo = clientContext.getDomainSocketFactory()
+ .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
+ }
+ if (!pathInfo.getPathState().getUsableForShortCircuit()) {
+ PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
+ "giving up on BlockReaderLocal.", this, pathInfo);
+ return null;
+ }
+ ShortCircuitCache cache = clientContext.getShortCircuitCache();
+ ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+ ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
+ InvalidToken exc = info.getInvalidTokenException();
+ if (exc != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": got InvalidToken exception while trying to " +
+ "construct BlockReaderLocal via " + pathInfo.getPath());
+ }
+ throw exc;
+ }
+ if (info.getReplica() == null) {
+ PerformanceAdvisory.LOG.debug("{}: failed to get " +
+ "ShortCircuitReplica. Cannot construct " +
+ "BlockReaderLocal via {}", this, pathInfo.getPath());
+ return null;
+ }
+ return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
+ setFilename(fileName).
+ setBlock(block).
+ setStartOffset(startOffset).
+ setShortCircuitReplica(info.getReplica()).
+ setVerifyChecksum(verifyChecksum).
+ setCachingStrategy(cachingStrategy).
+ setStorageType(storageType).
+ build();
+ }
+
+ /**
+ * Fetch a pair of short-circuit block descriptors from a local DataNode.
+ *
+ * @return Null if we could not communicate with the datanode,
+ * a new ShortCircuitReplicaInfo object otherwise.
+ * ShortCircuitReplicaInfo objects may contain either an InvalidToken
+ * exception, or a ShortCircuitReplica object ready to use.
+ */
+ @Override
+ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+ if (createShortCircuitReplicaInfoCallback != null) {
+ ShortCircuitReplicaInfo info =
+ createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
+ if (info != null) return info;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
+ }
+ BlockReaderPeer curPeer;
+ while (true) {
+ curPeer = nextDomainPeer();
+ if (curPeer == null) break;
+ if (curPeer.fromCache) remainingCacheTries--;
+ DomainPeer peer = (DomainPeer)curPeer.peer;
+ Slot slot = null;
+ ShortCircuitCache cache = clientContext.getShortCircuitCache();
+ try {
+ MutableBoolean usedPeer = new MutableBoolean(false);
+ slot = cache.allocShmSlot(datanode, peer, usedPeer,
+ new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
+ clientName);
+ if (usedPeer.booleanValue()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": allocShmSlot used up our previous socket " +
+ peer.getDomainSocket() + ". Allocating a new one...");
+ }
+ curPeer = nextDomainPeer();
+ if (curPeer == null) break;
+ peer = (DomainPeer)curPeer.peer;
+ }
+ ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
+ clientContext.getPeerCache().put(datanode, peer);
+ return info;
+ } catch (IOException e) {
+ if (slot != null) {
+ cache.freeSlot(slot);
+ }
+ if (curPeer.fromCache) {
+ // Handle an I/O error we got when using a cached socket.
+ // These are considered less serious, because the socket may be stale.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + ": closing stale domain peer " + peer, e);
+ }
+ IOUtilsClient.cleanup(LOG, peer);
+ } else {
+ // Handle an I/O error we got when using a newly created socket.
+ // We temporarily disable the domain socket path for a few minutes in
+ // this case, to prevent wasting more time on it.
+ LOG.warn(this + ": I/O error requesting file descriptors. " +
+ "Disabling domain socket " + peer.getDomainSocket(), e);
+ IOUtilsClient.cleanup(LOG, peer);
+ clientContext.getDomainSocketFactory()
+ .disableDomainSocketPath(pathInfo.getPath());
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Request file descriptors from a DomainPeer.
+ *
+ * @param peer The peer to use for communication.
+ * @param slot If non-null, the shared memory slot to associate with the
+ * new ShortCircuitReplica.
+ *
+ * @return A ShortCircuitReplica object if we could communicate with the
+ * datanode; null, otherwise.
+ * @throws IOException If we encountered an I/O exception while communicating
+ * with the datanode.
+ */
+ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
+ Slot slot) throws IOException {
+ ShortCircuitCache cache = clientContext.getShortCircuitCache();
+ final DataOutputStream out =
+ new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+ SlotId slotId = slot == null ? null : slot.getSlotId();
+ new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
+ failureInjector.getSupportsReceiptVerification());
+ DataInputStream in = new DataInputStream(peer.getInputStream());
+ BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(in));
+ DomainSocket sock = peer.getDomainSocket();
+ failureInjector.injectRequestFileDescriptorsFailure();
+ switch (resp.getStatus()) {
+ case SUCCESS:
+ byte buf[] = new byte[1];
+ FileInputStream fis[] = new FileInputStream[2];
+ sock.recvFileInputStreams(fis, buf, 0, buf.length);
+ ShortCircuitReplica replica = null;
+ try {
+ ExtendedBlockId key =
+ new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+ if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
+ LOG.trace("Sending receipt verification byte for slot " + slot);
+ sock.getOutputStream().write(0);
+ }
+ replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
+ Time.monotonicNow(), slot);
+ return new ShortCircuitReplicaInfo(replica);
+ } catch (IOException e) {
+ // This indicates an error reading from disk, or a format error. Since
+ // it's not a socket communication problem, we return null rather than
+ // throwing an exception.
+ LOG.warn(this + ": error creating ShortCircuitReplica.", e);
+ return null;
+ } finally {
+ if (replica == null) {
+ IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]);
+ }
+ }
+ case ERROR_UNSUPPORTED:
+ if (!resp.hasShortCircuitAccessVersion()) {
+ LOG.warn("short-circuit read access is disabled for " +
+ "DataNode " + datanode + ". reason: " + resp.getMessage());
+ clientContext.getDomainSocketFactory()
+ .disableShortCircuitForPath(pathInfo.getPath());
+ } else {
+ LOG.warn("short-circuit read access for the file " +
+ fileName + " is disabled for DataNode " + datanode +
+ ". reason: " + resp.getMessage());
+ }
+ return null;
+ case ERROR_ACCESS_TOKEN:
+ String msg = "access control error while " +
+ "attempting to set up short-circuit access to " +
+ fileName + resp.getMessage();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + ":" + msg);
+ }
+ return new ShortCircuitReplicaInfo(new InvalidToken(msg));
+ default:
+ LOG.warn(this + ": unknown response code " + resp.getStatus() +
+ " while attempting to set up short-circuit access. " +
+ resp.getMessage());
+ clientContext.getDomainSocketFactory()
+ .disableShortCircuitForPath(pathInfo.getPath());
+ return null;
+ }
+ }
+
+ /**
+ * Get a RemoteBlockReader that communicates over a UNIX domain socket.
+ *
+ * @return The new BlockReader, or null if we failed to create the block
+ * reader.
+ *
+ * @throws InvalidToken If the block token was invalid.
+ * Potentially other security-related execptions.
+ */
+ private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
+ if (pathInfo == null) {
+ pathInfo = clientContext.getDomainSocketFactory()
+ .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
+ }
+ if (!pathInfo.getPathState().getUsableForDataTransfer()) {
+ PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
+ "remote block reader because the UNIX domain socket at {}" +
+ " is not usable.", this, pathInfo);
+ return null;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trying to create a remote block reader from the " +
+ "UNIX domain socket at " + pathInfo.getPath());
+ }
+
+ while (true) {
+ BlockReaderPeer curPeer = nextDomainPeer();
+ if (curPeer == null) break;
+ if (curPeer.fromCache) remainingCacheTries--;
+ DomainPeer peer = (DomainPeer)curPeer.peer;
+ BlockReader blockReader = null;
+ try {
+ blockReader = getRemoteBlockReader(peer);
+ return blockReader;
+ } catch (IOException ioe) {
+ IOUtilsClient.cleanup(LOG, peer);
+ if (isSecurityException(ioe)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": got security exception while constructing " +
+ "a remote block reader from the unix domain socket at " +
+ pathInfo.getPath(), ioe);
+ }
+ throw ioe;
+ }
+ if (curPeer.fromCache) {
+ // Handle an I/O error we got when using a cached peer. These are
+ // considered less serious, because the underlying socket may be stale.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closed potentially stale domain peer " + peer, ioe);
+ }
+ } else {
+ // Handle an I/O error we got when using a newly created domain peer.
+ // We temporarily disable the domain socket path for a few minutes in
+ // this case, to prevent wasting more time on it.
+ LOG.warn("I/O error constructing remote block reader. Disabling " +
+ "domain socket " + peer.getDomainSocket(), ioe);
+ clientContext.getDomainSocketFactory()
+ .disableDomainSocketPath(pathInfo.getPath());
+ return null;
+ }
+ } finally {
+ if (blockReader == null) {
+ IOUtilsClient.cleanup(LOG, peer);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get a RemoteBlockReader that communicates over a TCP socket.
+ *
+ * @return The new BlockReader. We will not return null, but instead throw
+ * an exception if this fails.
+ *
+ * @throws InvalidToken
+ * If the block token was invalid.
+ * InvalidEncryptionKeyException
+ * If the encryption key was invalid.
+ * Other IOException
+ * If there was another problem.
+ */
+ private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trying to create a remote block reader from a " +
+ "TCP socket");
+ }
+ BlockReader blockReader = null;
+ while (true) {
+ BlockReaderPeer curPeer = null;
+ Peer peer = null;
+ try {
+ curPeer = nextTcpPeer();
+ if (curPeer.fromCache) remainingCacheTries--;
+ peer = curPeer.peer;
+ blockReader = getRemoteBlockReader(peer);
+ return blockReader;
+ } catch (IOException ioe) {
+ if (isSecurityException(ioe)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": got security exception while constructing " +
+ "a remote block reader from " + peer, ioe);
+ }
+ throw ioe;
+ }
+ if ((curPeer != null) && curPeer.fromCache) {
+ // Handle an I/O error we got when using a cached peer. These are
+ // considered less serious, because the underlying socket may be
+ // stale.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closed potentially stale remote peer " + peer, ioe);
+ }
+ } else {
+ // Handle an I/O error we got when using a newly created peer.
+ LOG.warn("I/O error constructing remote block reader.", ioe);
+ throw ioe;
+ }
+ } finally {
+ if (blockReader == null) {
+ IOUtilsClient.cleanup(LOG, peer);
+ }
+ }
+ }
+ }
+
+ public static class BlockReaderPeer {
+ final Peer peer;
+ final boolean fromCache;
+
+ BlockReaderPeer(Peer peer, boolean fromCache) {
+ this.peer = peer;
+ this.fromCache = fromCache;
+ }
+ }
+
+ /**
+ * Get the next DomainPeer-- either from the cache or by creating it.
+ *
+ * @return the next DomainPeer, or null if we could not construct one.
+ */
+ private BlockReaderPeer nextDomainPeer() {
+ if (remainingCacheTries > 0) {
+ Peer peer = clientContext.getPeerCache().get(datanode, true);
+ if (peer != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("nextDomainPeer: reusing existing peer " + peer);
+ }
+ return new BlockReaderPeer(peer, true);
+ }
+ }
+ DomainSocket sock = clientContext.getDomainSocketFactory().
+ createSocket(pathInfo, conf.getSocketTimeout());
+ if (sock == null) return null;
+ return new BlockReaderPeer(new DomainPeer(sock), false);
+ }
+
+ /**
+ * Get the next TCP-based peer-- either from the cache or by creating it.
+ *
+ * @return the next Peer, or null if we could not construct one.
+ *
+ * @throws IOException If there was an error while constructing the peer
+ * (such as an InvalidEncryptionKeyException)
+ */
+ private BlockReaderPeer nextTcpPeer() throws IOException {
+ if (remainingCacheTries > 0) {
+ Peer peer = clientContext.getPeerCache().get(datanode, false);
+ if (peer != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("nextTcpPeer: reusing existing peer " + peer);
+ }
+ return new BlockReaderPeer(peer, true);
+ }
+ }
+ try {
+ Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
+ datanode);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
+ }
+ return new BlockReaderPeer(peer, false);
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
+ "connected to " + datanode);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Determine if an exception is security-related.
+ *
+ * We need to handle these exceptions differently than other IOExceptions.
+ * They don't indicate a communication problem. Instead, they mean that there
+ * is some action the client needs to take, such as refetching block tokens,
+ * renewing encryption keys, etc.
+ *
+ * @param ioe The exception
+ * @return True only if the exception is security-related.
+ */
+ private static boolean isSecurityException(IOException ioe) {
+ return (ioe instanceof InvalidToken) ||
+ (ioe instanceof InvalidEncryptionKeyException) ||
+ (ioe instanceof InvalidBlockTokenException) ||
+ (ioe instanceof AccessControlException);
+ }
+
+ @SuppressWarnings("deprecation")
+ private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
+ if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
+ return RemoteBlockReader.newBlockReader(fileName,
+ block, token, startOffset, length, conf.getIoBufferSize(),
+ verifyChecksum, clientName, peer, datanode,
+ clientContext.getPeerCache(), cachingStrategy);
+ } else {
+ return RemoteBlockReader2.newBlockReader(
+ fileName, block, token, startOffset, length,
+ verifyChecksum, clientName, peer, datanode,
+ clientContext.getPeerCache(), cachingStrategy);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
+ }
+
+ /**
+ * File name to print when accessing a block directly (from servlets)
+ * @param s Address of the block location
+ * @param poolId Block pool ID of the block
+ * @param blockId Block ID of the block
+ * @return string that has a file name for debug purposes
+ */
+ public static String getFileName(final InetSocketAddress s,
+ final String poolId, final long blockId) {
+ return s.toString() + ":" + poolId + ":" + blockId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
new file mode 100644
index 0000000..807ede8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
+import org.apache.hadoop.fs.HdfsVolumeId;
+import org.apache.hadoop.fs.VolumeId;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class BlockStorageLocationUtil {
+
+ static final Logger LOG = LoggerFactory
+ .getLogger(BlockStorageLocationUtil.class);
+
+ /**
+ * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
+ * of datanodes and blocks. The blocks must all correspond to the same
+ * block pool.
+ *
+ * @param datanodeBlocks
+ * Map of datanodes to block replicas at each datanode
+ * @return callables Used to query each datanode for location information on
+ * the block replicas at the datanode
+ */
+ private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
+ Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
+ int timeout, boolean connectToDnViaHostname, Span parent) {
+
+ if (datanodeBlocks.isEmpty()) {
+ return Lists.newArrayList();
+ }
+
+ // Construct the callables, one per datanode
+ List<VolumeBlockLocationCallable> callables =
+ new ArrayList<VolumeBlockLocationCallable>();
+ for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks
+ .entrySet()) {
+ // Construct RPC parameters
+ DatanodeInfo datanode = entry.getKey();
+ List<LocatedBlock> locatedBlocks = entry.getValue();
+ if (locatedBlocks.isEmpty()) {
+ continue;
+ }
+
+ // Ensure that the blocks all are from the same block pool.
+ String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId();
+ for (LocatedBlock lb : locatedBlocks) {
+ if (!poolId.equals(lb.getBlock().getBlockPoolId())) {
+ throw new IllegalArgumentException(
+ "All blocks to be queried must be in the same block pool: " +
+ locatedBlocks.get(0).getBlock() + " and " + lb +
+ " are from different pools.");
+ }
+ }
+
+ long[] blockIds = new long[locatedBlocks.size()];
+ int i = 0;
+ List<Token<BlockTokenIdentifier>> dnTokens =
+ new ArrayList<Token<BlockTokenIdentifier>>(
+ locatedBlocks.size());
+ for (LocatedBlock b : locatedBlocks) {
+ blockIds[i++] = b.getBlock().getBlockId();
+ dnTokens.add(b.getBlockToken());
+ }
+ VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
+ conf, datanode, poolId, blockIds, dnTokens, timeout,
+ connectToDnViaHostname, parent);
+ callables.add(callable);
+ }
+ return callables;
+ }
+
+ /**
+ * Queries datanodes for the blocks specified in <code>datanodeBlocks</code>,
+ * making one RPC to each datanode. These RPCs are made in parallel using a
+ * threadpool.
+ *
+ * @param datanodeBlocks
+ * Map of datanodes to the blocks present on the DN
+ * @return metadatas Map of datanodes to block metadata of the DN
+ * @throws InvalidBlockTokenException
+ * if client does not have read access on a requested block
+ */
+ static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
+ Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
+ int poolsize, int timeoutMs, boolean connectToDnViaHostname)
+ throws InvalidBlockTokenException {
+
+ List<VolumeBlockLocationCallable> callables =
+ createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs,
+ connectToDnViaHostname, Trace.currentSpan());
+
+ // Use a thread pool to execute the Callables in parallel
+ List<Future<HdfsBlocksMetadata>> futures =
+ new ArrayList<Future<HdfsBlocksMetadata>>();
+ ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
+ try {
+ futures = executor.invokeAll(callables, timeoutMs,
+ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // Swallow the exception here, because we can return partial results
+ }
+ executor.shutdown();
+
+ Map<DatanodeInfo, HdfsBlocksMetadata> metadatas =
+ Maps.newHashMapWithExpectedSize(datanodeBlocks.size());
+ // Fill in metadatas with results from DN RPCs, where possible
+ for (int i = 0; i < futures.size(); i++) {
+ VolumeBlockLocationCallable callable = callables.get(i);
+ DatanodeInfo datanode = callable.getDatanodeInfo();
+ Future<HdfsBlocksMetadata> future = futures.get(i);
+ try {
+ HdfsBlocksMetadata metadata = future.get();
+ metadatas.put(callable.getDatanodeInfo(), metadata);
+ } catch (CancellationException e) {
+ LOG.info("Cancelled while waiting for datanode "
+ + datanode.getIpcAddr(false) + ": " + e.toString());
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof InvalidBlockTokenException) {
+ LOG.warn("Invalid access token when trying to retrieve "
+ + "information from datanode " + datanode.getIpcAddr(false));
+ throw (InvalidBlockTokenException) t;
+ }
+ else if (t instanceof UnsupportedOperationException) {
+ LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support"
+ + " required #getHdfsBlocksMetadata() API");
+ throw (UnsupportedOperationException) t;
+ } else {
+ LOG.info("Failed to query block locations on datanode " +
+ datanode.getIpcAddr(false) + ": " + t);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Could not fetch information from datanode", t);
+ }
+ } catch (InterruptedException e) {
+ // Shouldn't happen, because invokeAll waits for all Futures to be ready
+ LOG.info("Interrupted while fetching HdfsBlocksMetadata");
+ }
+ }
+
+ return metadatas;
+ }
+
+ /**
+ * Group the per-replica {@link VolumeId} info returned from
+ * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be
+ * associated
+ * with the corresponding {@link LocatedBlock}.
+ *
+ * @param blocks
+ * Original LocatedBlock array
+ * @param metadatas
+ * VolumeId information for the replicas on each datanode
+ * @return blockVolumeIds per-replica VolumeId information associated with the
+ * parent LocatedBlock
+ */
+ static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
+ List<LocatedBlock> blocks,
+ Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) {
+
+ // Initialize mapping of ExtendedBlock to LocatedBlock.
+ // Used to associate results from DN RPCs to the parent LocatedBlock
+ Map<Long, LocatedBlock> blockIdToLocBlock =
+ new HashMap<Long, LocatedBlock>();
+ for (LocatedBlock b : blocks) {
+ blockIdToLocBlock.put(b.getBlock().getBlockId(), b);
+ }
+
+ // Initialize the mapping of blocks -> list of VolumeIds, one per replica
+ // This is filled out with real values from the DN RPCs
+ Map<LocatedBlock, List<VolumeId>> blockVolumeIds =
+ new HashMap<LocatedBlock, List<VolumeId>>();
+ for (LocatedBlock b : blocks) {
+ ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
+ for (int i = 0; i < b.getLocations().length; i++) {
+ l.add(null);
+ }
+ blockVolumeIds.put(b, l);
+ }
+
+ // Iterate through the list of metadatas (one per datanode).
+ // For each metadata, if it's valid, insert its volume location information
+ // into the Map returned to the caller
+ for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) {
+ DatanodeInfo datanode = entry.getKey();
+ HdfsBlocksMetadata metadata = entry.getValue();
+ // Check if metadata is valid
+ if (metadata == null) {
+ continue;
+ }
+ long[] metaBlockIds = metadata.getBlockIds();
+ List<byte[]> metaVolumeIds = metadata.getVolumeIds();
+ List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
+ // Add VolumeId for each replica in the HdfsBlocksMetadata
+ for (int j = 0; j < metaBlockIds.length; j++) {
+ int volumeIndex = metaVolumeIndexes.get(j);
+ long blockId = metaBlockIds[j];
+ // Skip if block wasn't found, or not a valid index into metaVolumeIds
+ // Also skip if the DN responded with a block we didn't ask for
+ if (volumeIndex == Integer.MAX_VALUE
+ || volumeIndex >= metaVolumeIds.size()
+ || !blockIdToLocBlock.containsKey(blockId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No data for block " + blockId);
+ }
+ continue;
+ }
+ // Get the VolumeId by indexing into the list of VolumeIds
+ // provided by the datanode
+ byte[] volumeId = metaVolumeIds.get(volumeIndex);
+ HdfsVolumeId id = new HdfsVolumeId(volumeId);
+ // Find out which index we are in the LocatedBlock's replicas
+ LocatedBlock locBlock = blockIdToLocBlock.get(blockId);
+ DatanodeInfo[] dnInfos = locBlock.getLocations();
+ int index = -1;
+ for (int k = 0; k < dnInfos.length; k++) {
+ if (dnInfos[k].equals(datanode)) {
+ index = k;
+ break;
+ }
+ }
+ if (index < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Datanode responded with a block volume id we did" +
+ " not request, omitting.");
+ }
+ continue;
+ }
+ // Place VolumeId at the same index as the DN's index in the list of
+ // replicas
+ List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
+ volumeIds.set(index, id);
+ }
+ }
+ return blockVolumeIds;
+ }
+
+ /**
+ * Helper method to combine a list of {@link LocatedBlock} with associated
+ * {@link VolumeId} information to form a list of {@link BlockStorageLocation}
+ * .
+ */
+ static BlockStorageLocation[] convertToVolumeBlockLocations(
+ List<LocatedBlock> blocks,
+ Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
+ // Construct the final return value of VolumeBlockLocation[]
+ BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
+ List<BlockStorageLocation> volumeBlockLocs =
+ new ArrayList<BlockStorageLocation>(locations.length);
+ for (int i = 0; i < locations.length; i++) {
+ LocatedBlock locBlock = blocks.get(i);
+ List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
+ BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i],
+ volumeIds.toArray(new VolumeId[0]));
+ volumeBlockLocs.add(bsLoc);
+ }
+ return volumeBlockLocs.toArray(new BlockStorageLocation[] {});
+ }
+
+ /**
+ * Callable that sets up an RPC proxy to a datanode and queries it for
+ * volume location information for a list of ExtendedBlocks.
+ */
+ private static class VolumeBlockLocationCallable implements
+ Callable<HdfsBlocksMetadata> {
+
+ private final Configuration configuration;
+ private final int timeout;
+ private final DatanodeInfo datanode;
+ private final String poolId;
+ private final long[] blockIds;
+ private final List<Token<BlockTokenIdentifier>> dnTokens;
+ private final boolean connectToDnViaHostname;
+ private final Span parentSpan;
+
+ VolumeBlockLocationCallable(Configuration configuration,
+ DatanodeInfo datanode, String poolId, long []blockIds,
+ List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
+ boolean connectToDnViaHostname, Span parentSpan) {
+ this.configuration = configuration;
+ this.timeout = timeout;
+ this.datanode = datanode;
+ this.poolId = poolId;
+ this.blockIds = blockIds;
+ this.dnTokens = dnTokens;
+ this.connectToDnViaHostname = connectToDnViaHostname;
+ this.parentSpan = parentSpan;
+ }
+
+ public DatanodeInfo getDatanodeInfo() {
+ return datanode;
+ }
+
+ @Override
+ public HdfsBlocksMetadata call() throws Exception {
+ HdfsBlocksMetadata metadata = null;
+ // Create the RPC proxy and make the RPC
+ ClientDatanodeProtocol cdp = null;
+ TraceScope scope =
+ Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
+ try {
+ cdp = DFSUtilClient.createClientDatanodeProtocolProxy(
+ datanode, configuration,
+ timeout, connectToDnViaHostname);
+ metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
+ } catch (IOException e) {
+ // Bubble this up to the caller, handle with the Future
+ throw e;
+ } finally {
+ scope.close();
+ if (cdp != null) {
+ RPC.stopProxy(cdp);
+ }
+ }
+ return metadata;
+ }
+ }
+}