You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/09/26 20:17:21 UTC

[12/12] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.

HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/94cbb6d1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/94cbb6d1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/94cbb6d1

Branch: refs/heads/branch-2
Commit: 94cbb6d16483ba011b7104565b4084bf2a3eb6e6
Parents: b46e4ce
Author: Haohui Mai <wh...@apache.org>
Authored: Sat Sep 26 11:08:25 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Sat Sep 26 11:16:50 2015 -0700

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |   24 +
 .../apache/hadoop/fs/BlockStorageLocation.java  |   52 +
 .../org/apache/hadoop/fs/HdfsBlockLocation.java |   47 +
 .../java/org/apache/hadoop/fs/HdfsVolumeId.java |   73 +
 .../java/org/apache/hadoop/fs/VolumeId.java     |   40 +
 .../hadoop/hdfs/BlockMissingException.java      |   65 +
 .../apache/hadoop/hdfs/BlockReaderFactory.java  |  893 +++++
 .../hadoop/hdfs/BlockStorageLocationUtil.java   |  369 ++
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 3203 ++++++++++++++++++
 .../hadoop/hdfs/DFSClientFaultInjector.java     |   60 +
 .../hadoop/hdfs/DFSHedgedReadMetrics.java       |   58 +
 .../hadoop/hdfs/DFSInotifyEventInputStream.java |  239 ++
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 1915 +++++++++++
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  923 +++++
 .../java/org/apache/hadoop/hdfs/DFSPacket.java  |  345 ++
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   |   24 +
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 1904 +++++++++++
 .../hadoop/hdfs/HdfsConfigurationLoader.java    |   44 +
 .../apache/hadoop/hdfs/RemotePeerFactory.java   |   43 +
 .../hdfs/UnknownCipherSuiteException.java       |   35 +
 .../UnknownCryptoProtocolVersionException.java  |   38 +
 .../org/apache/hadoop/hdfs/XAttrHelper.java     |  174 +
 .../hadoop/hdfs/client/HdfsDataInputStream.java |  113 +
 .../hdfs/client/HdfsDataOutputStream.java       |  112 +
 .../hadoop/hdfs/client/impl/LeaseRenewer.java   |  524 +++
 .../hdfs/inotify/MissingEventsException.java    |   54 +
 .../hadoop/hdfs/protocol/AclException.java      |   39 +
 .../hdfs/protocol/CacheDirectiveIterator.java   |  130 +
 .../hadoop/hdfs/protocol/CachePoolIterator.java |   63 +
 .../hdfs/protocol/EncryptionZoneIterator.java   |   64 +
 .../QuotaByStorageTypeExceededException.java    |   56 +
 .../hdfs/protocol/UnresolvedPathException.java  |   87 +
 .../datatransfer/ReplaceDatanodeOnFailure.java  |  200 ++
 .../datanode/ReplicaNotFoundException.java      |   53 +
 .../namenode/RetryStartFileException.java       |   36 +
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |    3 +
 .../dev-support/findbugsExcludeFile.xml         |   19 -
 .../apache/hadoop/fs/BlockStorageLocation.java  |   52 -
 .../org/apache/hadoop/fs/HdfsBlockLocation.java |   47 -
 .../java/org/apache/hadoop/fs/HdfsVolumeId.java |   73 -
 .../java/org/apache/hadoop/fs/VolumeId.java     |   40 -
 .../hadoop/hdfs/BlockMissingException.java      |   65 -
 .../apache/hadoop/hdfs/BlockReaderFactory.java  |  892 -----
 .../hadoop/hdfs/BlockStorageLocationUtil.java   |  369 --
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 3200 -----------------
 .../hadoop/hdfs/DFSClientFaultInjector.java     |   57 -
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |    2 -
 .../hadoop/hdfs/DFSHedgedReadMetrics.java       |   58 -
 .../hadoop/hdfs/DFSInotifyEventInputStream.java |  239 --
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 1915 -----------
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  923 -----
 .../java/org/apache/hadoop/hdfs/DFSPacket.java  |  345 --
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |   23 -
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 1903 -----------
 .../apache/hadoop/hdfs/HdfsConfiguration.java   |   11 +-
 .../apache/hadoop/hdfs/RemotePeerFactory.java   |   43 -
 .../hdfs/UnknownCipherSuiteException.java       |   35 -
 .../UnknownCryptoProtocolVersionException.java  |   38 -
 .../org/apache/hadoop/hdfs/XAttrHelper.java     |  174 -
 .../hadoop/hdfs/client/HdfsDataInputStream.java |  113 -
 .../hdfs/client/HdfsDataOutputStream.java       |  112 -
 .../hadoop/hdfs/client/impl/LeaseRenewer.java   |  524 ---
 .../hdfs/inotify/MissingEventsException.java    |   54 -
 .../hadoop/hdfs/protocol/AclException.java      |   39 -
 .../hdfs/protocol/CacheDirectiveIterator.java   |  130 -
 .../hadoop/hdfs/protocol/CachePoolIterator.java |   63 -
 .../hdfs/protocol/EncryptionZoneIterator.java   |   64 -
 .../QuotaByStorageTypeExceededException.java    |   56 -
 .../hdfs/protocol/UnresolvedPathException.java  |   87 -
 .../datatransfer/ReplaceDatanodeOnFailure.java  |  200 --
 .../hadoop/hdfs/server/balancer/Dispatcher.java |    3 +-
 .../hdfs/server/datanode/BlockReceiver.java     |    4 +-
 .../hdfs/server/datanode/BlockSender.java       |    4 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |    3 +-
 .../hdfs/server/datanode/DataXceiver.java       |    6 +-
 .../datanode/ReplicaNotFoundException.java      |   53 -
 .../datanode/fsdataset/impl/BlockPoolSlice.java |    4 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |    8 +-
 .../impl/RamDiskAsyncLazyPersistService.java    |    4 +-
 .../namenode/RetryStartFileException.java       |   36 -
 .../hdfs/server/namenode/TransferFsImage.java   |    4 +-
 .../datanode/TestFiDataTransferProtocol2.java   |    1 -
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |    2 +-
 .../hdfs/MiniDFSClusterWithNodeGroup.java       |    2 +-
 .../hadoop/hdfs/TestBlockStoragePolicy.java     |    1 -
 .../TestClientProtocolForPipelineRecovery.java  |    6 +-
 .../apache/hadoop/hdfs/TestCrcCorruption.java   |    2 +-
 .../org/apache/hadoop/hdfs/TestDFSUtil.java     |    8 +-
 .../org/apache/hadoop/hdfs/TestFileStatus.java  |    2 +-
 .../java/org/apache/hadoop/hdfs/TestPread.java  |   10 +-
 90 files changed, 12143 insertions(+), 12087 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
index 515da24..41a8564 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
@@ -32,4 +32,28 @@
     <Method name="allocSlot" />
     <Bug pattern="UL_UNRELEASED_LOCK" />
   </Match>
+  <Match>
+    <Class name="org.apache.hadoop.hdfs.DFSInputStream"/>
+    <Field name="tcpReadsDisabledForTesting"/>
+    <Bug pattern="MS_SHOULD_BE_FINAL"/>
+  </Match>
+
+  <!--
+    ResponseProccessor is thread that is designed to catch RuntimeException.
+  -->
+  <Match>
+    <Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" />
+    <Method name="run" />
+    <Bug pattern="REC_CATCH_EXCEPTION" />
+  </Match>
+
+  <!--
+    We use a separate lock to guard cachingStrategy in order to separate
+    locks for p-reads from seek + read invocations.
+  -->
+  <Match>
+    <Class name="org.apache.hadoop.hdfs.DFSInputStream" />
+    <Field name="cachingStrategy" />
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
new file mode 100644
index 0000000..2200994
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Wrapper for {@link BlockLocation} that also adds {@link VolumeId} volume
+ * location information for each replica.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+@Deprecated
+public class BlockStorageLocation extends BlockLocation {
+
+  private final VolumeId[] volumeIds;
+
+  public BlockStorageLocation(BlockLocation loc, VolumeId[] volumeIds)
+      throws IOException {
+    // Initialize with data from passed in BlockLocation
+    super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), loc
+        .getOffset(), loc.getLength(), loc.isCorrupt());
+    this.volumeIds = volumeIds;
+  }
+
+  /**
+   * Gets the list of {@link VolumeId} corresponding to the block's replicas.
+   * 
+   * @return volumeIds list of VolumeId for the block's replicas
+   */
+  public VolumeId[] getVolumeIds() {
+    return volumeIds;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
new file mode 100644
index 0000000..0ccacda
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+
+/**
+ * Wrapper for {@link BlockLocation} that also includes a {@link LocatedBlock},
+ * allowing more detailed queries to the datanode about a block.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HdfsBlockLocation extends BlockLocation {
+
+  private final LocatedBlock block;
+  
+  public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) 
+      throws IOException {
+    // Initialize with data from passed in BlockLocation
+    super(loc);
+    this.block = block;
+  }
+  
+  public LocatedBlock getLocatedBlock() {
+    return block;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
new file mode 100644
index 0000000..6e9d3d7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * HDFS-specific volume identifier which implements {@link VolumeId}. Can be
+ * used to differentiate between the data directories on a single datanode. This
+ * identifier is only unique on a per-datanode basis.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+public class HdfsVolumeId implements VolumeId {
+  
+  private final byte[] id;
+
+  public HdfsVolumeId(byte[] id) {
+    Preconditions.checkNotNull(id, "id cannot be null");
+    this.id = id;
+  }
+
+  @Override
+  public int compareTo(VolumeId arg0) {
+    if (arg0 == null) {
+      return 1;
+    }
+    return hashCode() - arg0.hashCode();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(id).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || obj.getClass() != getClass()) {
+      return false;
+    }
+    if (obj == this) {
+      return true;
+    }
+    HdfsVolumeId that = (HdfsVolumeId) obj;
+    return new EqualsBuilder().append(this.id, that.id).isEquals();
+  }
+
+  @Override
+  public String toString() {
+    return StringUtils.byteToHexString(id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java
new file mode 100644
index 0000000..e56e304
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Opaque interface that identifies a disk location. Subclasses
+ * should implement {@link Comparable} and override both equals and hashCode.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+public interface VolumeId extends Comparable<VolumeId> {
+
+  @Override
+  abstract public int compareTo(VolumeId arg0);
+
+  @Override
+  abstract public int hashCode();
+
+  @Override
+  abstract public boolean equals(Object obj);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
new file mode 100644
index 0000000..7bba8a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/** 
+  * This exception is thrown when a read encounters a block that has no locations
+  * associated with it.
+  */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMissingException extends IOException {
+
+  private static final long serialVersionUID = 1L;
+
+  private final String filename;
+  private final long   offset;
+
+  /**
+   * An exception that indicates that file was corrupted.
+   * @param filename name of corrupted file
+   * @param description a description of the corruption details
+   */
+  public BlockMissingException(String filename, String description, long offset) {
+    super(description);
+    this.filename = filename;
+    this.offset = offset;
+  }
+
+  /**
+   * Returns the name of the corrupted file.
+   * @return name of corrupted file
+   */
+  public String getFile() {
+    return filename;
+  }
+
+  /**
+   * Returns the offset at which this file is corrupted
+   * @return offset of corrupted file
+   */
+  public long getOffset() {
+    return offset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
new file mode 100644
index 0000000..69e9da2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -0,0 +1,893 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.PerformanceAdvisory;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** 
+ * Utility class to create BlockReader implementations.
+ */
+@InterfaceAudience.Private
+public class BlockReaderFactory implements ShortCircuitReplicaCreator {
+  static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class);
+
+  public static class FailureInjector {
+    public void injectRequestFileDescriptorsFailure() throws IOException {
+      // do nothing
+    }
+    public boolean getSupportsReceiptVerification() {
+      return true;
+    }
+  }
+
+  @VisibleForTesting
+  static ShortCircuitReplicaCreator
+      createShortCircuitReplicaInfoCallback = null;
+
+  private final DfsClientConf conf;
+
+  /**
+   * Injects failures into specific operations during unit tests.
+   */
+  private static FailureInjector failureInjector = new FailureInjector();
+
+  /**
+   * The file name, for logging and debugging purposes.
+   */
+  private String fileName;
+
+  /**
+   * The block ID and block pool ID to use.
+   */
+  private ExtendedBlock block;
+
+  /**
+   * The block token to use for security purposes.
+   */
+  private Token<BlockTokenIdentifier> token;
+
+  /**
+   * The offset within the block to start reading at.
+   */
+  private long startOffset;
+
+  /**
+   * If false, we won't try to verify the block checksum.
+   */
+  private boolean verifyChecksum;
+
+  /**
+   * The name of this client.
+   */
+  private String clientName; 
+
+  /**
+   * The DataNode we're talking to.
+   */
+  private DatanodeInfo datanode;
+
+  /**
+   * StorageType of replica on DataNode.
+   */
+  private StorageType storageType;
+
+  /**
+   * If false, we won't try short-circuit local reads.
+   */
+  private boolean allowShortCircuitLocalReads;
+
+  /**
+   * The ClientContext to use for things like the PeerCache.
+   */
+  private ClientContext clientContext;
+
+  /**
+   * Number of bytes to read.  -1 indicates no limit.
+   */
+  private long length = -1;
+
+  /**
+   * Caching strategy to use when reading the block.
+   */
+  private CachingStrategy cachingStrategy;
+
+  /**
+   * Socket address to use to connect to peer.
+   */
+  private InetSocketAddress inetSocketAddress;
+
+  /**
+   * Remote peer factory to use to create a peer, if needed.
+   */
+  private RemotePeerFactory remotePeerFactory;
+
+  /**
+   * UserGroupInformation  to use for legacy block reader local objects, if needed.
+   */
+  private UserGroupInformation userGroupInformation;
+
+  /**
+   * Configuration to use for legacy block reader local objects, if needed.
+   */
+  private Configuration configuration;
+
+  /**
+   * Information about the domain socket path we should use to connect to the
+   * local peer-- or null if we haven't examined the local domain socket.
+   */
+  private DomainSocketFactory.PathInfo pathInfo;
+
+  /**
+   * The remaining number of times that we'll try to pull a socket out of the
+   * cache.
+   */
+  private int remainingCacheTries;
+
+  public BlockReaderFactory(DfsClientConf conf) {
+    this.conf = conf;
+    this.remainingCacheTries = conf.getNumCachedConnRetry();
+  }
+
+  public BlockReaderFactory setFileName(String fileName) {
+    this.fileName = fileName;
+    return this;
+  }
+
+  public BlockReaderFactory setBlock(ExtendedBlock block) {
+    this.block = block;
+    return this;
+  }
+
+  public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
+    this.token = token;
+    return this;
+  }
+
+  public BlockReaderFactory setStartOffset(long startOffset) {
+    this.startOffset = startOffset;
+    return this;
+  }
+
+  public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
+    this.verifyChecksum = verifyChecksum;
+    return this;
+  }
+
+  public BlockReaderFactory setClientName(String clientName) {
+    this.clientName = clientName;
+    return this;
+  }
+
+  public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
+    this.datanode = datanode;
+    return this;
+  }
+
+  public BlockReaderFactory setStorageType(StorageType storageType) {
+    this.storageType = storageType;
+    return this;
+  }
+
+  public BlockReaderFactory setAllowShortCircuitLocalReads(
+      boolean allowShortCircuitLocalReads) {
+    this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
+    return this;
+  }
+
+  public BlockReaderFactory setClientCacheContext(
+      ClientContext clientContext) {
+    this.clientContext = clientContext;
+    return this;
+  }
+
+  public BlockReaderFactory setLength(long length) {
+    this.length = length;
+    return this;
+  }
+
+  public BlockReaderFactory setCachingStrategy(
+      CachingStrategy cachingStrategy) {
+    this.cachingStrategy = cachingStrategy;
+    return this;
+  }
+
+  public BlockReaderFactory setInetSocketAddress (
+      InetSocketAddress inetSocketAddress) {
+    this.inetSocketAddress = inetSocketAddress;
+    return this;
+  }
+
+  public BlockReaderFactory setUserGroupInformation(
+      UserGroupInformation userGroupInformation) {
+    this.userGroupInformation = userGroupInformation;
+    return this;
+  }
+
+  public BlockReaderFactory setRemotePeerFactory(
+      RemotePeerFactory remotePeerFactory) {
+    this.remotePeerFactory = remotePeerFactory;
+    return this;
+  }
+
+  public BlockReaderFactory setConfiguration(
+      Configuration configuration) {
+    this.configuration = configuration;
+    return this;
+  }
+
+  @VisibleForTesting
+  public static void setFailureInjectorForTesting(FailureInjector injector) {
+    failureInjector = injector;
+  }
+
+  /**
+   * Build a BlockReader with the given options.
+   *
+   * This function will do the best it can to create a block reader that meets
+   * all of our requirements.  We prefer short-circuit block readers
+   * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
+   * former avoid the overhead of socket communication.  If short-circuit is
+   * unavailable, our next fallback is data transfer over UNIX domain sockets,
+   * if dfs.client.domain.socket.data.traffic has been enabled.  If that doesn't
+   * work, we will try to create a remote block reader that operates over TCP
+   * sockets.
+   *
+   * There are a few caches that are important here.
+   *
+   * The ShortCircuitCache stores file descriptor objects which have been passed
+   * from the DataNode. 
+   *
+   * The DomainSocketFactory stores information about UNIX domain socket paths
+   * that we not been able to use in the past, so that we don't waste time
+   * retrying them over and over.  (Like all the caches, it does have a timeout,
+   * though.)
+   *
+   * The PeerCache stores peers that we have used in the past.  If we can reuse
+   * one of these peers, we avoid the overhead of re-opening a socket.  However,
+   * if the socket has been timed out on the remote end, our attempt to reuse
+   * the socket may end with an IOException.  For that reason, we limit our
+   * attempts at socket reuse to dfs.client.cached.conn.retry times.  After
+   * that, we create new sockets.  This avoids the problem where a thread tries
+   * to talk to a peer that it hasn't talked to in a while, and has to clean out
+   * every entry in a socket cache full of stale entries.
+   *
+   * @return The new BlockReader.  We will not return null.
+   *
+   * @throws InvalidToken
+   *             If the block token was invalid.
+   *         InvalidEncryptionKeyException
+   *             If the encryption key was invalid.
+   *         Other IOException
+   *             If there was another problem.
+   */
+  public BlockReader build() throws IOException {
+    Preconditions.checkNotNull(configuration);
+    BlockReader reader = tryToCreateExternalBlockReader();
+    if (reader != null) {
+      return reader;
+    }
+    final ShortCircuitConf scConf = conf.getShortCircuitConf();
+    if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
+      if (clientContext.getUseLegacyBlockReaderLocal()) {
+        reader = getLegacyBlockReaderLocal();
+        if (reader != null) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": returning new legacy block reader local.");
+          }
+          return reader;
+        }
+      } else {
+        reader = getBlockReaderLocal();
+        if (reader != null) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": returning new block reader local.");
+          }
+          return reader;
+        }
+      }
+    }
+    if (scConf.isDomainSocketDataTraffic()) {
+      reader = getRemoteBlockReaderFromDomain();
+      if (reader != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": returning new remote block reader using " +
+              "UNIX domain socket on " + pathInfo.getPath());
+        }
+        return reader;
+      }
+    }
+    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
+        "TCP reads were disabled for testing, but we failed to " +
+        "do a non-TCP read.");
+    return getRemoteBlockReaderFromTcp();
+  }
+
+  private BlockReader tryToCreateExternalBlockReader() {
+    List<Class<? extends ReplicaAccessorBuilder>> clses =
+        conf.getReplicaAccessorBuilderClasses();
+    for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
+      try {
+        ByteArrayDataOutput bado = ByteStreams.newDataOutput();
+        token.write(bado);
+        byte tokenBytes[] = bado.toByteArray();
+
+        Constructor<? extends ReplicaAccessorBuilder> ctor =
+            cls.getConstructor();
+        ReplicaAccessorBuilder builder = ctor.newInstance();
+        ReplicaAccessor accessor = builder.
+            setAllowShortCircuitReads(allowShortCircuitLocalReads).
+            setBlock(block.getBlockId(), block.getBlockPoolId()).
+            setGenerationStamp(block.getGenerationStamp()).
+            setBlockAccessToken(tokenBytes).
+            setClientName(clientName).
+            setConfiguration(configuration).
+            setFileName(fileName).
+            setVerifyChecksum(verifyChecksum).
+            setVisibleLength(length).
+            build();
+        if (accessor == null) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": No ReplicaAccessor created by " +
+                cls.getName());
+          }
+        } else {
+          return new ExternalBlockReader(accessor, length, startOffset);
+        }
+      } catch (Throwable t) {
+        LOG.warn("Failed to construct new object of type " +
+            cls.getName(), t);
+      }
+    }
+    return null;
+  }
+
+
+  /**
+   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
+   * This block reader implements the path-based style of local reads
+   * first introduced in HDFS-2246.
+   */
+  private BlockReader getLegacyBlockReaderLocal() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
+    }
+    if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
+            "the address " + inetSocketAddress + " is not local");
+      }
+      return null;
+    }
+    if (clientContext.getDisableLegacyBlockReaderLocal()) {
+        PerformanceAdvisory.LOG.debug("{}: can't construct " +
+            "BlockReaderLocalLegacy because " +
+            "disableLegacyBlockReaderLocal is set.", this);
+      return null;
+    }
+    IOException ioe;
+    try {
+      return BlockReaderLocalLegacy.newBlockReader(conf,
+          userGroupInformation, configuration, fileName, block, token,
+          datanode, startOffset, length, storageType);
+    } catch (RemoteException remoteException) {
+      ioe = remoteException.unwrapRemoteException(
+                InvalidToken.class, AccessControlException.class);
+    } catch (IOException e) {
+      ioe = e;
+    }
+    if ((!(ioe instanceof AccessControlException)) &&
+        isSecurityException(ioe)) {
+      // Handle security exceptions.
+      // We do not handle AccessControlException here, since
+      // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
+      // that the user is not in dfs.block.local-path-access.user, a condition
+      // which requires us to disable legacy SCR.
+      throw ioe;
+    }
+    LOG.warn(this + ": error creating legacy BlockReaderLocal.  " +
+        "Disabling legacy local reads.", ioe);
+    clientContext.setDisableLegacyBlockReaderLocal();
+    return null;
+  }
+
+  private BlockReader getBlockReaderLocal() throws InvalidToken {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to construct a BlockReaderLocal " +
+          "for short-circuit reads.");
+    }
+    if (pathInfo == null) {
+      pathInfo = clientContext.getDomainSocketFactory()
+          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
+    }
+    if (!pathInfo.getPathState().getUsableForShortCircuit()) {
+      PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
+              "giving up on BlockReaderLocal.", this, pathInfo);
+      return null;
+    }
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+    ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
+    InvalidToken exc = info.getInvalidTokenException();
+    if (exc != null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": got InvalidToken exception while trying to " +
+            "construct BlockReaderLocal via " + pathInfo.getPath());
+      }
+      throw exc;
+    }
+    if (info.getReplica() == null) {
+      PerformanceAdvisory.LOG.debug("{}: failed to get " +
+          "ShortCircuitReplica. Cannot construct " +
+          "BlockReaderLocal via {}", this, pathInfo.getPath());
+      return null;
+    }
+    return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
+        setFilename(fileName).
+        setBlock(block).
+        setStartOffset(startOffset).
+        setShortCircuitReplica(info.getReplica()).
+        setVerifyChecksum(verifyChecksum).
+        setCachingStrategy(cachingStrategy).
+        setStorageType(storageType).
+        build();
+  }
+
+  /**
+   * Fetch a pair of short-circuit block descriptors from a local DataNode.
+   *
+   * @return    Null if we could not communicate with the datanode,
+   *            a new ShortCircuitReplicaInfo object otherwise.
+   *            ShortCircuitReplicaInfo objects may contain either an InvalidToken
+   *            exception, or a ShortCircuitReplica object ready to use.
+   */
+  @Override
+  public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+    if (createShortCircuitReplicaInfoCallback != null) {
+      ShortCircuitReplicaInfo info =
+        createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
+      if (info != null) return info;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
+    }
+    BlockReaderPeer curPeer;
+    while (true) {
+      curPeer = nextDomainPeer();
+      if (curPeer == null) break;
+      if (curPeer.fromCache) remainingCacheTries--;
+      DomainPeer peer = (DomainPeer)curPeer.peer;
+      Slot slot = null;
+      ShortCircuitCache cache = clientContext.getShortCircuitCache();
+      try {
+        MutableBoolean usedPeer = new MutableBoolean(false);
+        slot = cache.allocShmSlot(datanode, peer, usedPeer,
+            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
+            clientName);
+        if (usedPeer.booleanValue()) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": allocShmSlot used up our previous socket " +
+              peer.getDomainSocket() + ".  Allocating a new one...");
+          }
+          curPeer = nextDomainPeer();
+          if (curPeer == null) break;
+          peer = (DomainPeer)curPeer.peer;
+        }
+        ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
+        clientContext.getPeerCache().put(datanode, peer);
+        return info;
+      } catch (IOException e) {
+        if (slot != null) {
+          cache.freeSlot(slot);
+        }
+        if (curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached socket.
+          // These are considered less serious, because the socket may be stale.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(this + ": closing stale domain peer " + peer, e);
+          }
+          IOUtilsClient.cleanup(LOG, peer);
+        } else {
+          // Handle an I/O error we got when using a newly created socket.
+          // We temporarily disable the domain socket path for a few minutes in
+          // this case, to prevent wasting more time on it.
+          LOG.warn(this + ": I/O error requesting file descriptors.  " + 
+              "Disabling domain socket " + peer.getDomainSocket(), e);
+          IOUtilsClient.cleanup(LOG, peer);
+          clientContext.getDomainSocketFactory()
+              .disableDomainSocketPath(pathInfo.getPath());
+          return null;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Request file descriptors from a DomainPeer.
+   *
+   * @param peer   The peer to use for communication.
+   * @param slot   If non-null, the shared memory slot to associate with the 
+   *               new ShortCircuitReplica.
+   * 
+   * @return  A ShortCircuitReplica object if we could communicate with the
+   *          datanode; null, otherwise. 
+   * @throws  IOException If we encountered an I/O exception while communicating
+   *          with the datanode.
+   */
+  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
+          Slot slot) throws IOException {
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    final DataOutputStream out =
+        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+    SlotId slotId = slot == null ? null : slot.getSlotId();
+    new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
+        failureInjector.getSupportsReceiptVerification());
+    DataInputStream in = new DataInputStream(peer.getInputStream());
+    BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+        PBHelperClient.vintPrefixed(in));
+    DomainSocket sock = peer.getDomainSocket();
+    failureInjector.injectRequestFileDescriptorsFailure();
+    switch (resp.getStatus()) {
+    case SUCCESS:
+      byte buf[] = new byte[1];
+      FileInputStream fis[] = new FileInputStream[2];
+      sock.recvFileInputStreams(fis, buf, 0, buf.length);
+      ShortCircuitReplica replica = null;
+      try {
+        ExtendedBlockId key =
+            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+        if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
+          LOG.trace("Sending receipt verification byte for slot " + slot);
+          sock.getOutputStream().write(0);
+        }
+        replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
+            Time.monotonicNow(), slot);
+        return new ShortCircuitReplicaInfo(replica);
+      } catch (IOException e) {
+        // This indicates an error reading from disk, or a format error.  Since
+        // it's not a socket communication problem, we return null rather than
+        // throwing an exception.
+        LOG.warn(this + ": error creating ShortCircuitReplica.", e);
+        return null;
+      } finally {
+        if (replica == null) {
+          IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]);
+        }
+      }
+    case ERROR_UNSUPPORTED:
+      if (!resp.hasShortCircuitAccessVersion()) {
+        LOG.warn("short-circuit read access is disabled for " +
+            "DataNode " + datanode + ".  reason: " + resp.getMessage());
+        clientContext.getDomainSocketFactory()
+            .disableShortCircuitForPath(pathInfo.getPath());
+      } else {
+        LOG.warn("short-circuit read access for the file " +
+            fileName + " is disabled for DataNode " + datanode +
+            ".  reason: " + resp.getMessage());
+      }
+      return null;
+    case ERROR_ACCESS_TOKEN:
+      String msg = "access control error while " +
+          "attempting to set up short-circuit access to " +
+          fileName + resp.getMessage();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(this + ":" + msg);
+      }
+      return new ShortCircuitReplicaInfo(new InvalidToken(msg));
+    default:
+      LOG.warn(this + ": unknown response code " + resp.getStatus() +
+          " while attempting to set up short-circuit access. " +
+          resp.getMessage());
+      clientContext.getDomainSocketFactory()
+          .disableShortCircuitForPath(pathInfo.getPath());
+      return null;
+    }
+  }
+
+  /**
+   * Get a RemoteBlockReader that communicates over a UNIX domain socket.
+   *
+   * @return The new BlockReader, or null if we failed to create the block
+   * reader.
+   *
+   * @throws InvalidToken    If the block token was invalid.
+   * Potentially other security-related execptions.
+   */
+  private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
+    if (pathInfo == null) {
+      pathInfo = clientContext.getDomainSocketFactory()
+          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
+    }
+    if (!pathInfo.getPathState().getUsableForDataTransfer()) {
+      PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
+          "remote block reader because the UNIX domain socket at {}" +
+           " is not usable.", this, pathInfo);
+      return null;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to create a remote block reader from the " +
+          "UNIX domain socket at " + pathInfo.getPath());
+    }
+
+    while (true) {
+      BlockReaderPeer curPeer = nextDomainPeer();
+      if (curPeer == null) break;
+      if (curPeer.fromCache) remainingCacheTries--;
+      DomainPeer peer = (DomainPeer)curPeer.peer;
+      BlockReader blockReader = null;
+      try {
+        blockReader = getRemoteBlockReader(peer);
+        return blockReader;
+      } catch (IOException ioe) {
+        IOUtilsClient.cleanup(LOG, peer);
+        if (isSecurityException(ioe)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": got security exception while constructing " +
+                "a remote block reader from the unix domain socket at " +
+                pathInfo.getPath(), ioe);
+          }
+          throw ioe;
+        }
+        if (curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached peer.  These are
+          // considered less serious, because the underlying socket may be stale.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Closed potentially stale domain peer " + peer, ioe);
+          }
+        } else {
+          // Handle an I/O error we got when using a newly created domain peer.
+          // We temporarily disable the domain socket path for a few minutes in
+          // this case, to prevent wasting more time on it.
+          LOG.warn("I/O error constructing remote block reader.  Disabling " +
+              "domain socket " + peer.getDomainSocket(), ioe);
+          clientContext.getDomainSocketFactory()
+              .disableDomainSocketPath(pathInfo.getPath());
+          return null;
+        }
+      } finally {
+        if (blockReader == null) {
+          IOUtilsClient.cleanup(LOG, peer);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a RemoteBlockReader that communicates over a TCP socket.
+   *
+   * @return The new BlockReader.  We will not return null, but instead throw
+   *         an exception if this fails.
+   *
+   * @throws InvalidToken
+   *             If the block token was invalid.
+   *         InvalidEncryptionKeyException
+   *             If the encryption key was invalid.
+   *         Other IOException
+   *             If there was another problem.
+   */
+  private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to create a remote block reader from a " +
+          "TCP socket");
+    }
+    BlockReader blockReader = null;
+    while (true) {
+      BlockReaderPeer curPeer = null;
+      Peer peer = null;
+      try {
+        curPeer = nextTcpPeer();
+        if (curPeer.fromCache) remainingCacheTries--;
+        peer = curPeer.peer;
+        blockReader = getRemoteBlockReader(peer);
+        return blockReader;
+      } catch (IOException ioe) {
+        if (isSecurityException(ioe)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": got security exception while constructing " +
+                "a remote block reader from " + peer, ioe);
+          }
+          throw ioe;
+        }
+        if ((curPeer != null) && curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached peer.  These are
+          // considered less serious, because the underlying socket may be
+          // stale.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Closed potentially stale remote peer " + peer, ioe);
+          }
+        } else {
+          // Handle an I/O error we got when using a newly created peer.
+          LOG.warn("I/O error constructing remote block reader.", ioe);
+          throw ioe;
+        }
+      } finally {
+        if (blockReader == null) {
+          IOUtilsClient.cleanup(LOG, peer);
+        }
+      }
+    }
+  }
+
+  public static class BlockReaderPeer {
+    final Peer peer;
+    final boolean fromCache;
+    
+    BlockReaderPeer(Peer peer, boolean fromCache) {
+      this.peer = peer;
+      this.fromCache = fromCache;
+    }
+  }
+
+  /**
+   * Get the next DomainPeer-- either from the cache or by creating it.
+   *
+   * @return the next DomainPeer, or null if we could not construct one.
+   */
+  private BlockReaderPeer nextDomainPeer() {
+    if (remainingCacheTries > 0) {
+      Peer peer = clientContext.getPeerCache().get(datanode, true);
+      if (peer != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("nextDomainPeer: reusing existing peer " + peer);
+        }
+        return new BlockReaderPeer(peer, true);
+      }
+    }
+    DomainSocket sock = clientContext.getDomainSocketFactory().
+        createSocket(pathInfo, conf.getSocketTimeout());
+    if (sock == null) return null;
+    return new BlockReaderPeer(new DomainPeer(sock), false);
+  }
+
+  /**
+   * Get the next TCP-based peer-- either from the cache or by creating it.
+   *
+   * @return the next Peer, or null if we could not construct one.
+   *
+   * @throws IOException  If there was an error while constructing the peer
+   *                      (such as an InvalidEncryptionKeyException)
+   */
+  private BlockReaderPeer nextTcpPeer() throws IOException {
+    if (remainingCacheTries > 0) {
+      Peer peer = clientContext.getPeerCache().get(datanode, false);
+      if (peer != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("nextTcpPeer: reusing existing peer " + peer);
+        }
+        return new BlockReaderPeer(peer, true);
+      }
+    }
+    try {
+      Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
+        datanode);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
+      }
+      return new BlockReaderPeer(peer, false);
+    } catch (IOException e) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
+                  "connected to " + datanode);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Determine if an exception is security-related.
+   *
+   * We need to handle these exceptions differently than other IOExceptions.
+   * They don't indicate a communication problem.  Instead, they mean that there
+   * is some action the client needs to take, such as refetching block tokens,
+   * renewing encryption keys, etc.
+   *
+   * @param ioe    The exception
+   * @return       True only if the exception is security-related.
+   */
+  private static boolean isSecurityException(IOException ioe) {
+    return (ioe instanceof InvalidToken) ||
+            (ioe instanceof InvalidEncryptionKeyException) ||
+            (ioe instanceof InvalidBlockTokenException) ||
+            (ioe instanceof AccessControlException);
+  }
+
+  @SuppressWarnings("deprecation")
+  private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
+    if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
+      return RemoteBlockReader.newBlockReader(fileName,
+          block, token, startOffset, length, conf.getIoBufferSize(),
+          verifyChecksum, clientName, peer, datanode,
+          clientContext.getPeerCache(), cachingStrategy);
+    } else {
+      return RemoteBlockReader2.newBlockReader(
+          fileName, block, token, startOffset, length,
+          verifyChecksum, clientName, peer, datanode,
+          clientContext.getPeerCache(), cachingStrategy);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
+  }
+
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
+  public static String getFileName(final InetSocketAddress s,
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
new file mode 100644
index 0000000..807ede8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
+import org.apache.hadoop.fs.HdfsVolumeId;
+import org.apache.hadoop.fs.VolumeId;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class BlockStorageLocationUtil {
+  
+  static final Logger LOG = LoggerFactory
+      .getLogger(BlockStorageLocationUtil.class);
+  
+  /**
+   * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
+   * of datanodes and blocks. The blocks must all correspond to the same
+   * block pool.
+   * 
+   * @param datanodeBlocks
+   *          Map of datanodes to block replicas at each datanode
+   * @return callables Used to query each datanode for location information on
+   *         the block replicas at the datanode
+   */
+  private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
+      Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
+      int timeout, boolean connectToDnViaHostname, Span parent) {
+    
+    if (datanodeBlocks.isEmpty()) {
+      return Lists.newArrayList();
+    }
+    
+    // Construct the callables, one per datanode
+    List<VolumeBlockLocationCallable> callables = 
+        new ArrayList<VolumeBlockLocationCallable>();
+    for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks
+        .entrySet()) {
+      // Construct RPC parameters
+      DatanodeInfo datanode = entry.getKey();
+      List<LocatedBlock> locatedBlocks = entry.getValue();
+      if (locatedBlocks.isEmpty()) {
+        continue;
+      }
+      
+      // Ensure that the blocks all are from the same block pool.
+      String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId();
+      for (LocatedBlock lb : locatedBlocks) {
+        if (!poolId.equals(lb.getBlock().getBlockPoolId())) {
+          throw new IllegalArgumentException(
+              "All blocks to be queried must be in the same block pool: " +
+              locatedBlocks.get(0).getBlock() + " and " + lb +
+              " are from different pools.");
+        }
+      }
+      
+      long[] blockIds = new long[locatedBlocks.size()];
+      int i = 0;
+      List<Token<BlockTokenIdentifier>> dnTokens = 
+          new ArrayList<Token<BlockTokenIdentifier>>(
+          locatedBlocks.size());
+      for (LocatedBlock b : locatedBlocks) {
+        blockIds[i++] = b.getBlock().getBlockId();
+        dnTokens.add(b.getBlockToken());
+      }
+      VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
+          conf, datanode, poolId, blockIds, dnTokens, timeout, 
+          connectToDnViaHostname, parent);
+      callables.add(callable);
+    }
+    return callables;
+  }
+  
+  /**
+   * Queries datanodes for the blocks specified in <code>datanodeBlocks</code>,
+   * making one RPC to each datanode. These RPCs are made in parallel using a
+   * threadpool.
+   * 
+   * @param datanodeBlocks
+   *          Map of datanodes to the blocks present on the DN
+   * @return metadatas Map of datanodes to block metadata of the DN
+   * @throws InvalidBlockTokenException
+   *           if client does not have read access on a requested block
+   */
+  static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
+      Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
+      int poolsize, int timeoutMs, boolean connectToDnViaHostname)
+        throws InvalidBlockTokenException {
+
+    List<VolumeBlockLocationCallable> callables = 
+        createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs, 
+            connectToDnViaHostname, Trace.currentSpan());
+    
+    // Use a thread pool to execute the Callables in parallel
+    List<Future<HdfsBlocksMetadata>> futures = 
+        new ArrayList<Future<HdfsBlocksMetadata>>();
+    ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
+    try {
+      futures = executor.invokeAll(callables, timeoutMs,
+          TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      // Swallow the exception here, because we can return partial results
+    }
+    executor.shutdown();
+    
+    Map<DatanodeInfo, HdfsBlocksMetadata> metadatas =
+        Maps.newHashMapWithExpectedSize(datanodeBlocks.size());
+    // Fill in metadatas with results from DN RPCs, where possible
+    for (int i = 0; i < futures.size(); i++) {
+      VolumeBlockLocationCallable callable = callables.get(i);
+      DatanodeInfo datanode = callable.getDatanodeInfo();
+      Future<HdfsBlocksMetadata> future = futures.get(i);
+      try {
+        HdfsBlocksMetadata metadata = future.get();
+        metadatas.put(callable.getDatanodeInfo(), metadata);
+      } catch (CancellationException e) {
+        LOG.info("Cancelled while waiting for datanode "
+            + datanode.getIpcAddr(false) + ": " + e.toString());
+      } catch (ExecutionException e) {
+        Throwable t = e.getCause();
+        if (t instanceof InvalidBlockTokenException) {
+          LOG.warn("Invalid access token when trying to retrieve "
+              + "information from datanode " + datanode.getIpcAddr(false));
+          throw (InvalidBlockTokenException) t;
+        }
+        else if (t instanceof UnsupportedOperationException) {
+          LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support"
+              + " required #getHdfsBlocksMetadata() API");
+          throw (UnsupportedOperationException) t;
+        } else {
+          LOG.info("Failed to query block locations on datanode " +
+              datanode.getIpcAddr(false) + ": " + t);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Could not fetch information from datanode", t);
+        }
+      } catch (InterruptedException e) {
+        // Shouldn't happen, because invokeAll waits for all Futures to be ready
+        LOG.info("Interrupted while fetching HdfsBlocksMetadata");
+      }
+    }
+    
+    return metadatas;
+  }
+  
+  /**
+   * Group the per-replica {@link VolumeId} info returned from
+   * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be
+   * associated
+   * with the corresponding {@link LocatedBlock}.
+   * 
+   * @param blocks
+   *          Original LocatedBlock array
+   * @param metadatas
+   *          VolumeId information for the replicas on each datanode
+   * @return blockVolumeIds per-replica VolumeId information associated with the
+   *         parent LocatedBlock
+   */
+  static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
+      List<LocatedBlock> blocks,
+      Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) {
+    
+    // Initialize mapping of ExtendedBlock to LocatedBlock. 
+    // Used to associate results from DN RPCs to the parent LocatedBlock
+    Map<Long, LocatedBlock> blockIdToLocBlock = 
+        new HashMap<Long, LocatedBlock>();
+    for (LocatedBlock b : blocks) {
+      blockIdToLocBlock.put(b.getBlock().getBlockId(), b);
+    }
+    
+    // Initialize the mapping of blocks -> list of VolumeIds, one per replica
+    // This is filled out with real values from the DN RPCs
+    Map<LocatedBlock, List<VolumeId>> blockVolumeIds = 
+        new HashMap<LocatedBlock, List<VolumeId>>();
+    for (LocatedBlock b : blocks) {
+      ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
+      for (int i = 0; i < b.getLocations().length; i++) {
+        l.add(null);
+      }
+      blockVolumeIds.put(b, l);
+    }
+    
+    // Iterate through the list of metadatas (one per datanode). 
+    // For each metadata, if it's valid, insert its volume location information 
+    // into the Map returned to the caller 
+    for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) {
+      DatanodeInfo datanode = entry.getKey();
+      HdfsBlocksMetadata metadata = entry.getValue();
+      // Check if metadata is valid
+      if (metadata == null) {
+        continue;
+      }
+      long[] metaBlockIds = metadata.getBlockIds();
+      List<byte[]> metaVolumeIds = metadata.getVolumeIds();
+      List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
+      // Add VolumeId for each replica in the HdfsBlocksMetadata
+      for (int j = 0; j < metaBlockIds.length; j++) {
+        int volumeIndex = metaVolumeIndexes.get(j);
+        long blockId = metaBlockIds[j];
+        // Skip if block wasn't found, or not a valid index into metaVolumeIds
+        // Also skip if the DN responded with a block we didn't ask for
+        if (volumeIndex == Integer.MAX_VALUE
+            || volumeIndex >= metaVolumeIds.size()
+            || !blockIdToLocBlock.containsKey(blockId)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("No data for block " + blockId);
+          }
+          continue;
+        }
+        // Get the VolumeId by indexing into the list of VolumeIds
+        // provided by the datanode
+        byte[] volumeId = metaVolumeIds.get(volumeIndex);
+        HdfsVolumeId id = new HdfsVolumeId(volumeId);
+        // Find out which index we are in the LocatedBlock's replicas
+        LocatedBlock locBlock = blockIdToLocBlock.get(blockId);
+        DatanodeInfo[] dnInfos = locBlock.getLocations();
+        int index = -1;
+        for (int k = 0; k < dnInfos.length; k++) {
+          if (dnInfos[k].equals(datanode)) {
+            index = k;
+            break;
+          }
+        }
+        if (index < 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Datanode responded with a block volume id we did" +
+                " not request, omitting.");
+          }
+          continue;
+        }
+        // Place VolumeId at the same index as the DN's index in the list of
+        // replicas
+        List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
+        volumeIds.set(index, id);
+      }
+    }
+    return blockVolumeIds;
+  }
+
+  /**
+   * Helper method to combine a list of {@link LocatedBlock} with associated
+   * {@link VolumeId} information to form a list of {@link BlockStorageLocation}
+   * .
+   */
+  static BlockStorageLocation[] convertToVolumeBlockLocations(
+      List<LocatedBlock> blocks, 
+      Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
+    // Construct the final return value of VolumeBlockLocation[]
+    BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
+    List<BlockStorageLocation> volumeBlockLocs = 
+        new ArrayList<BlockStorageLocation>(locations.length);
+    for (int i = 0; i < locations.length; i++) {
+      LocatedBlock locBlock = blocks.get(i);
+      List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
+      BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], 
+          volumeIds.toArray(new VolumeId[0]));
+      volumeBlockLocs.add(bsLoc);
+    }
+    return volumeBlockLocs.toArray(new BlockStorageLocation[] {});
+  }
+  
+  /**
+   * Callable that sets up an RPC proxy to a datanode and queries it for
+   * volume location information for a list of ExtendedBlocks. 
+   */
+  private static class VolumeBlockLocationCallable implements 
+    Callable<HdfsBlocksMetadata> {
+    
+    private final Configuration configuration;
+    private final int timeout;
+    private final DatanodeInfo datanode;
+    private final String poolId;
+    private final long[] blockIds;
+    private final List<Token<BlockTokenIdentifier>> dnTokens;
+    private final boolean connectToDnViaHostname;
+    private final Span parentSpan;
+    
+    VolumeBlockLocationCallable(Configuration configuration,
+        DatanodeInfo datanode, String poolId, long []blockIds,
+        List<Token<BlockTokenIdentifier>> dnTokens, int timeout, 
+        boolean connectToDnViaHostname, Span parentSpan) {
+      this.configuration = configuration;
+      this.timeout = timeout;
+      this.datanode = datanode;
+      this.poolId = poolId;
+      this.blockIds = blockIds;
+      this.dnTokens = dnTokens;
+      this.connectToDnViaHostname = connectToDnViaHostname;
+      this.parentSpan = parentSpan;
+    }
+    
+    public DatanodeInfo getDatanodeInfo() {
+      return datanode;
+    }
+
+    @Override
+    public HdfsBlocksMetadata call() throws Exception {
+      HdfsBlocksMetadata metadata = null;
+      // Create the RPC proxy and make the RPC
+      ClientDatanodeProtocol cdp = null;
+      TraceScope scope =
+          Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
+      try {
+        cdp = DFSUtilClient.createClientDatanodeProtocolProxy(
+            datanode, configuration,
+            timeout, connectToDnViaHostname);
+        metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
+      } catch (IOException e) {
+        // Bubble this up to the caller, handle with the Future
+        throw e;
+      } finally {
+        scope.close();
+        if (cdp != null) {
+          RPC.stopProxy(cdp);
+        }
+      }
+      return metadata;
+    }
+  }
+}