You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/09/26 20:17:15 UTC

[06/12] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
deleted file mode 100644
index c9add53..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ /dev/null
@@ -1,892 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.PerformanceAdvisory;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-
-/** 
- * Utility class to create BlockReader implementations.
- */
-@InterfaceAudience.Private
-public class BlockReaderFactory implements ShortCircuitReplicaCreator {
-  static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
-
-  public static class FailureInjector {
-    public void injectRequestFileDescriptorsFailure() throws IOException {
-      // do nothing
-    }
-    public boolean getSupportsReceiptVerification() {
-      return true;
-    }
-  }
-
-  @VisibleForTesting
-  static ShortCircuitReplicaCreator
-      createShortCircuitReplicaInfoCallback = null;
-
-  private final DfsClientConf conf;
-
-  /**
-   * Injects failures into specific operations during unit tests.
-   */
-  private static FailureInjector failureInjector = new FailureInjector();
-
-  /**
-   * The file name, for logging and debugging purposes.
-   */
-  private String fileName;
-
-  /**
-   * The block ID and block pool ID to use.
-   */
-  private ExtendedBlock block;
-
-  /**
-   * The block token to use for security purposes.
-   */
-  private Token<BlockTokenIdentifier> token;
-
-  /**
-   * The offset within the block to start reading at.
-   */
-  private long startOffset;
-
-  /**
-   * If false, we won't try to verify the block checksum.
-   */
-  private boolean verifyChecksum;
-
-  /**
-   * The name of this client.
-   */
-  private String clientName; 
-
-  /**
-   * The DataNode we're talking to.
-   */
-  private DatanodeInfo datanode;
-
-  /**
-   * StorageType of replica on DataNode.
-   */
-  private StorageType storageType;
-
-  /**
-   * If false, we won't try short-circuit local reads.
-   */
-  private boolean allowShortCircuitLocalReads;
-
-  /**
-   * The ClientContext to use for things like the PeerCache.
-   */
-  private ClientContext clientContext;
-
-  /**
-   * Number of bytes to read.  -1 indicates no limit.
-   */
-  private long length = -1;
-
-  /**
-   * Caching strategy to use when reading the block.
-   */
-  private CachingStrategy cachingStrategy;
-
-  /**
-   * Socket address to use to connect to peer.
-   */
-  private InetSocketAddress inetSocketAddress;
-
-  /**
-   * Remote peer factory to use to create a peer, if needed.
-   */
-  private RemotePeerFactory remotePeerFactory;
-
-  /**
-   * UserGroupInformation  to use for legacy block reader local objects, if needed.
-   */
-  private UserGroupInformation userGroupInformation;
-
-  /**
-   * Configuration to use for legacy block reader local objects, if needed.
-   */
-  private Configuration configuration;
-
-  /**
-   * Information about the domain socket path we should use to connect to the
-   * local peer-- or null if we haven't examined the local domain socket.
-   */
-  private DomainSocketFactory.PathInfo pathInfo;
-
-  /**
-   * The remaining number of times that we'll try to pull a socket out of the
-   * cache.
-   */
-  private int remainingCacheTries;
-
-  public BlockReaderFactory(DfsClientConf conf) {
-    this.conf = conf;
-    this.remainingCacheTries = conf.getNumCachedConnRetry();
-  }
-
-  public BlockReaderFactory setFileName(String fileName) {
-    this.fileName = fileName;
-    return this;
-  }
-
-  public BlockReaderFactory setBlock(ExtendedBlock block) {
-    this.block = block;
-    return this;
-  }
-
-  public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
-    this.token = token;
-    return this;
-  }
-
-  public BlockReaderFactory setStartOffset(long startOffset) {
-    this.startOffset = startOffset;
-    return this;
-  }
-
-  public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
-    this.verifyChecksum = verifyChecksum;
-    return this;
-  }
-
-  public BlockReaderFactory setClientName(String clientName) {
-    this.clientName = clientName;
-    return this;
-  }
-
-  public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
-    this.datanode = datanode;
-    return this;
-  }
-
-  public BlockReaderFactory setStorageType(StorageType storageType) {
-    this.storageType = storageType;
-    return this;
-  }
-
-  public BlockReaderFactory setAllowShortCircuitLocalReads(
-      boolean allowShortCircuitLocalReads) {
-    this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
-    return this;
-  }
-
-  public BlockReaderFactory setClientCacheContext(
-      ClientContext clientContext) {
-    this.clientContext = clientContext;
-    return this;
-  }
-
-  public BlockReaderFactory setLength(long length) {
-    this.length = length;
-    return this;
-  }
-
-  public BlockReaderFactory setCachingStrategy(
-      CachingStrategy cachingStrategy) {
-    this.cachingStrategy = cachingStrategy;
-    return this;
-  }
-
-  public BlockReaderFactory setInetSocketAddress (
-      InetSocketAddress inetSocketAddress) {
-    this.inetSocketAddress = inetSocketAddress;
-    return this;
-  }
-
-  public BlockReaderFactory setUserGroupInformation(
-      UserGroupInformation userGroupInformation) {
-    this.userGroupInformation = userGroupInformation;
-    return this;
-  }
-
-  public BlockReaderFactory setRemotePeerFactory(
-      RemotePeerFactory remotePeerFactory) {
-    this.remotePeerFactory = remotePeerFactory;
-    return this;
-  }
-
-  public BlockReaderFactory setConfiguration(
-      Configuration configuration) {
-    this.configuration = configuration;
-    return this;
-  }
-
-  @VisibleForTesting
-  public static void setFailureInjectorForTesting(FailureInjector injector) {
-    failureInjector = injector;
-  }
-
-  /**
-   * Build a BlockReader with the given options.
-   *
-   * This function will do the best it can to create a block reader that meets
-   * all of our requirements.  We prefer short-circuit block readers
-   * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
-   * former avoid the overhead of socket communication.  If short-circuit is
-   * unavailable, our next fallback is data transfer over UNIX domain sockets,
-   * if dfs.client.domain.socket.data.traffic has been enabled.  If that doesn't
-   * work, we will try to create a remote block reader that operates over TCP
-   * sockets.
-   *
-   * There are a few caches that are important here.
-   *
-   * The ShortCircuitCache stores file descriptor objects which have been passed
-   * from the DataNode. 
-   *
-   * The DomainSocketFactory stores information about UNIX domain socket paths
-   * that we not been able to use in the past, so that we don't waste time
-   * retrying them over and over.  (Like all the caches, it does have a timeout,
-   * though.)
-   *
-   * The PeerCache stores peers that we have used in the past.  If we can reuse
-   * one of these peers, we avoid the overhead of re-opening a socket.  However,
-   * if the socket has been timed out on the remote end, our attempt to reuse
-   * the socket may end with an IOException.  For that reason, we limit our
-   * attempts at socket reuse to dfs.client.cached.conn.retry times.  After
-   * that, we create new sockets.  This avoids the problem where a thread tries
-   * to talk to a peer that it hasn't talked to in a while, and has to clean out
-   * every entry in a socket cache full of stale entries.
-   *
-   * @return The new BlockReader.  We will not return null.
-   *
-   * @throws InvalidToken
-   *             If the block token was invalid.
-   *         InvalidEncryptionKeyException
-   *             If the encryption key was invalid.
-   *         Other IOException
-   *             If there was another problem.
-   */
-  public BlockReader build() throws IOException {
-    Preconditions.checkNotNull(configuration);
-    BlockReader reader = tryToCreateExternalBlockReader();
-    if (reader != null) {
-      return reader;
-    }
-    final ShortCircuitConf scConf = conf.getShortCircuitConf();
-    if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
-      if (clientContext.getUseLegacyBlockReaderLocal()) {
-        reader = getLegacyBlockReaderLocal();
-        if (reader != null) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": returning new legacy block reader local.");
-          }
-          return reader;
-        }
-      } else {
-        reader = getBlockReaderLocal();
-        if (reader != null) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": returning new block reader local.");
-          }
-          return reader;
-        }
-      }
-    }
-    if (scConf.isDomainSocketDataTraffic()) {
-      reader = getRemoteBlockReaderFromDomain();
-      if (reader != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": returning new remote block reader using " +
-              "UNIX domain socket on " + pathInfo.getPath());
-        }
-        return reader;
-      }
-    }
-    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
-        "TCP reads were disabled for testing, but we failed to " +
-        "do a non-TCP read.");
-    return getRemoteBlockReaderFromTcp();
-  }
-
-  private BlockReader tryToCreateExternalBlockReader() {
-    List<Class<? extends ReplicaAccessorBuilder>> clses =
-        conf.getReplicaAccessorBuilderClasses();
-    for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
-      try {
-        ByteArrayDataOutput bado = ByteStreams.newDataOutput();
-        token.write(bado);
-        byte tokenBytes[] = bado.toByteArray();
-
-        Constructor<? extends ReplicaAccessorBuilder> ctor =
-            cls.getConstructor();
-        ReplicaAccessorBuilder builder = ctor.newInstance();
-        ReplicaAccessor accessor = builder.
-            setAllowShortCircuitReads(allowShortCircuitLocalReads).
-            setBlock(block.getBlockId(), block.getBlockPoolId()).
-            setGenerationStamp(block.getGenerationStamp()).
-            setBlockAccessToken(tokenBytes).
-            setClientName(clientName).
-            setConfiguration(configuration).
-            setFileName(fileName).
-            setVerifyChecksum(verifyChecksum).
-            setVisibleLength(length).
-            build();
-        if (accessor == null) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": No ReplicaAccessor created by " +
-                cls.getName());
-          }
-        } else {
-          return new ExternalBlockReader(accessor, length, startOffset);
-        }
-      } catch (Throwable t) {
-        LOG.warn("Failed to construct new object of type " +
-            cls.getName(), t);
-      }
-    }
-    return null;
-  }
-
-
-  /**
-   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
-   * This block reader implements the path-based style of local reads
-   * first introduced in HDFS-2246.
-   */
-  private BlockReader getLegacyBlockReaderLocal() throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
-    }
-    if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
-            "the address " + inetSocketAddress + " is not local");
-      }
-      return null;
-    }
-    if (clientContext.getDisableLegacyBlockReaderLocal()) {
-        PerformanceAdvisory.LOG.debug("{}: can't construct " +
-            "BlockReaderLocalLegacy because " +
-            "disableLegacyBlockReaderLocal is set.", this);
-      return null;
-    }
-    IOException ioe;
-    try {
-      return BlockReaderLocalLegacy.newBlockReader(conf,
-          userGroupInformation, configuration, fileName, block, token,
-          datanode, startOffset, length, storageType);
-    } catch (RemoteException remoteException) {
-      ioe = remoteException.unwrapRemoteException(
-                InvalidToken.class, AccessControlException.class);
-    } catch (IOException e) {
-      ioe = e;
-    }
-    if ((!(ioe instanceof AccessControlException)) &&
-        isSecurityException(ioe)) {
-      // Handle security exceptions.
-      // We do not handle AccessControlException here, since
-      // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
-      // that the user is not in dfs.block.local-path-access.user, a condition
-      // which requires us to disable legacy SCR.
-      throw ioe;
-    }
-    LOG.warn(this + ": error creating legacy BlockReaderLocal.  " +
-        "Disabling legacy local reads.", ioe);
-    clientContext.setDisableLegacyBlockReaderLocal();
-    return null;
-  }
-
-  private BlockReader getBlockReaderLocal() throws InvalidToken {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": trying to construct a BlockReaderLocal " +
-          "for short-circuit reads.");
-    }
-    if (pathInfo == null) {
-      pathInfo = clientContext.getDomainSocketFactory()
-          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
-    }
-    if (!pathInfo.getPathState().getUsableForShortCircuit()) {
-      PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
-              "giving up on BlockReaderLocal.", this, pathInfo);
-      return null;
-    }
-    ShortCircuitCache cache = clientContext.getShortCircuitCache();
-    ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
-    ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
-    InvalidToken exc = info.getInvalidTokenException();
-    if (exc != null) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": got InvalidToken exception while trying to " +
-            "construct BlockReaderLocal via " + pathInfo.getPath());
-      }
-      throw exc;
-    }
-    if (info.getReplica() == null) {
-      PerformanceAdvisory.LOG.debug("{}: failed to get " +
-          "ShortCircuitReplica. Cannot construct " +
-          "BlockReaderLocal via {}", this, pathInfo.getPath());
-      return null;
-    }
-    return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
-        setFilename(fileName).
-        setBlock(block).
-        setStartOffset(startOffset).
-        setShortCircuitReplica(info.getReplica()).
-        setVerifyChecksum(verifyChecksum).
-        setCachingStrategy(cachingStrategy).
-        setStorageType(storageType).
-        build();
-  }
-
-  /**
-   * Fetch a pair of short-circuit block descriptors from a local DataNode.
-   *
-   * @return    Null if we could not communicate with the datanode,
-   *            a new ShortCircuitReplicaInfo object otherwise.
-   *            ShortCircuitReplicaInfo objects may contain either an InvalidToken
-   *            exception, or a ShortCircuitReplica object ready to use.
-   */
-  @Override
-  public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
-    if (createShortCircuitReplicaInfoCallback != null) {
-      ShortCircuitReplicaInfo info =
-        createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
-      if (info != null) return info;
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
-    }
-    BlockReaderPeer curPeer;
-    while (true) {
-      curPeer = nextDomainPeer();
-      if (curPeer == null) break;
-      if (curPeer.fromCache) remainingCacheTries--;
-      DomainPeer peer = (DomainPeer)curPeer.peer;
-      Slot slot = null;
-      ShortCircuitCache cache = clientContext.getShortCircuitCache();
-      try {
-        MutableBoolean usedPeer = new MutableBoolean(false);
-        slot = cache.allocShmSlot(datanode, peer, usedPeer,
-            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
-            clientName);
-        if (usedPeer.booleanValue()) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": allocShmSlot used up our previous socket " +
-              peer.getDomainSocket() + ".  Allocating a new one...");
-          }
-          curPeer = nextDomainPeer();
-          if (curPeer == null) break;
-          peer = (DomainPeer)curPeer.peer;
-        }
-        ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
-        clientContext.getPeerCache().put(datanode, peer);
-        return info;
-      } catch (IOException e) {
-        if (slot != null) {
-          cache.freeSlot(slot);
-        }
-        if (curPeer.fromCache) {
-          // Handle an I/O error we got when using a cached socket.
-          // These are considered less serious, because the socket may be stale.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + ": closing stale domain peer " + peer, e);
-          }
-          IOUtils.cleanup(LOG, peer);
-        } else {
-          // Handle an I/O error we got when using a newly created socket.
-          // We temporarily disable the domain socket path for a few minutes in
-          // this case, to prevent wasting more time on it.
-          LOG.warn(this + ": I/O error requesting file descriptors.  " + 
-              "Disabling domain socket " + peer.getDomainSocket(), e);
-          IOUtils.cleanup(LOG, peer);
-          clientContext.getDomainSocketFactory()
-              .disableDomainSocketPath(pathInfo.getPath());
-          return null;
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Request file descriptors from a DomainPeer.
-   *
-   * @param peer   The peer to use for communication.
-   * @param slot   If non-null, the shared memory slot to associate with the 
-   *               new ShortCircuitReplica.
-   * 
-   * @return  A ShortCircuitReplica object if we could communicate with the
-   *          datanode; null, otherwise. 
-   * @throws  IOException If we encountered an I/O exception while communicating
-   *          with the datanode.
-   */
-  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
-          Slot slot) throws IOException {
-    ShortCircuitCache cache = clientContext.getShortCircuitCache();
-    final DataOutputStream out =
-        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
-    SlotId slotId = slot == null ? null : slot.getSlotId();
-    new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
-        failureInjector.getSupportsReceiptVerification());
-    DataInputStream in = new DataInputStream(peer.getInputStream());
-    BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
-    DomainSocket sock = peer.getDomainSocket();
-    failureInjector.injectRequestFileDescriptorsFailure();
-    switch (resp.getStatus()) {
-    case SUCCESS:
-      byte buf[] = new byte[1];
-      FileInputStream fis[] = new FileInputStream[2];
-      sock.recvFileInputStreams(fis, buf, 0, buf.length);
-      ShortCircuitReplica replica = null;
-      try {
-        ExtendedBlockId key =
-            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
-        if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
-          LOG.trace("Sending receipt verification byte for slot " + slot);
-          sock.getOutputStream().write(0);
-        }
-        replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
-            Time.monotonicNow(), slot);
-        return new ShortCircuitReplicaInfo(replica);
-      } catch (IOException e) {
-        // This indicates an error reading from disk, or a format error.  Since
-        // it's not a socket communication problem, we return null rather than
-        // throwing an exception.
-        LOG.warn(this + ": error creating ShortCircuitReplica.", e);
-        return null;
-      } finally {
-        if (replica == null) {
-          IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
-        }
-      }
-    case ERROR_UNSUPPORTED:
-      if (!resp.hasShortCircuitAccessVersion()) {
-        LOG.warn("short-circuit read access is disabled for " +
-            "DataNode " + datanode + ".  reason: " + resp.getMessage());
-        clientContext.getDomainSocketFactory()
-            .disableShortCircuitForPath(pathInfo.getPath());
-      } else {
-        LOG.warn("short-circuit read access for the file " +
-            fileName + " is disabled for DataNode " + datanode +
-            ".  reason: " + resp.getMessage());
-      }
-      return null;
-    case ERROR_ACCESS_TOKEN:
-      String msg = "access control error while " +
-          "attempting to set up short-circuit access to " +
-          fileName + resp.getMessage();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(this + ":" + msg);
-      }
-      return new ShortCircuitReplicaInfo(new InvalidToken(msg));
-    default:
-      LOG.warn(this + ": unknown response code " + resp.getStatus() +
-          " while attempting to set up short-circuit access. " +
-          resp.getMessage());
-      clientContext.getDomainSocketFactory()
-          .disableShortCircuitForPath(pathInfo.getPath());
-      return null;
-    }
-  }
-
-  /**
-   * Get a RemoteBlockReader that communicates over a UNIX domain socket.
-   *
-   * @return The new BlockReader, or null if we failed to create the block
-   * reader.
-   *
-   * @throws InvalidToken    If the block token was invalid.
-   * Potentially other security-related execptions.
-   */
-  private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
-    if (pathInfo == null) {
-      pathInfo = clientContext.getDomainSocketFactory()
-          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
-    }
-    if (!pathInfo.getPathState().getUsableForDataTransfer()) {
-      PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
-          "remote block reader because the UNIX domain socket at {}" +
-           " is not usable.", this, pathInfo);
-      return null;
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": trying to create a remote block reader from the " +
-          "UNIX domain socket at " + pathInfo.getPath());
-    }
-
-    while (true) {
-      BlockReaderPeer curPeer = nextDomainPeer();
-      if (curPeer == null) break;
-      if (curPeer.fromCache) remainingCacheTries--;
-      DomainPeer peer = (DomainPeer)curPeer.peer;
-      BlockReader blockReader = null;
-      try {
-        blockReader = getRemoteBlockReader(peer);
-        return blockReader;
-      } catch (IOException ioe) {
-        IOUtils.cleanup(LOG, peer);
-        if (isSecurityException(ioe)) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": got security exception while constructing " +
-                "a remote block reader from the unix domain socket at " +
-                pathInfo.getPath(), ioe);
-          }
-          throw ioe;
-        }
-        if (curPeer.fromCache) {
-          // Handle an I/O error we got when using a cached peer.  These are
-          // considered less serious, because the underlying socket may be stale.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Closed potentially stale domain peer " + peer, ioe);
-          }
-        } else {
-          // Handle an I/O error we got when using a newly created domain peer.
-          // We temporarily disable the domain socket path for a few minutes in
-          // this case, to prevent wasting more time on it.
-          LOG.warn("I/O error constructing remote block reader.  Disabling " +
-              "domain socket " + peer.getDomainSocket(), ioe);
-          clientContext.getDomainSocketFactory()
-              .disableDomainSocketPath(pathInfo.getPath());
-          return null;
-        }
-      } finally {
-        if (blockReader == null) {
-          IOUtils.cleanup(LOG, peer);
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Get a RemoteBlockReader that communicates over a TCP socket.
-   *
-   * @return The new BlockReader.  We will not return null, but instead throw
-   *         an exception if this fails.
-   *
-   * @throws InvalidToken
-   *             If the block token was invalid.
-   *         InvalidEncryptionKeyException
-   *             If the encryption key was invalid.
-   *         Other IOException
-   *             If there was another problem.
-   */
-  private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": trying to create a remote block reader from a " +
-          "TCP socket");
-    }
-    BlockReader blockReader = null;
-    while (true) {
-      BlockReaderPeer curPeer = null;
-      Peer peer = null;
-      try {
-        curPeer = nextTcpPeer();
-        if (curPeer.fromCache) remainingCacheTries--;
-        peer = curPeer.peer;
-        blockReader = getRemoteBlockReader(peer);
-        return blockReader;
-      } catch (IOException ioe) {
-        if (isSecurityException(ioe)) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": got security exception while constructing " +
-                "a remote block reader from " + peer, ioe);
-          }
-          throw ioe;
-        }
-        if ((curPeer != null) && curPeer.fromCache) {
-          // Handle an I/O error we got when using a cached peer.  These are
-          // considered less serious, because the underlying socket may be
-          // stale.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Closed potentially stale remote peer " + peer, ioe);
-          }
-        } else {
-          // Handle an I/O error we got when using a newly created peer.
-          LOG.warn("I/O error constructing remote block reader.", ioe);
-          throw ioe;
-        }
-      } finally {
-        if (blockReader == null) {
-          IOUtils.cleanup(LOG, peer);
-        }
-      }
-    }
-  }
-
-  public static class BlockReaderPeer {
-    final Peer peer;
-    final boolean fromCache;
-    
-    BlockReaderPeer(Peer peer, boolean fromCache) {
-      this.peer = peer;
-      this.fromCache = fromCache;
-    }
-  }
-
-  /**
-   * Get the next DomainPeer-- either from the cache or by creating it.
-   *
-   * @return the next DomainPeer, or null if we could not construct one.
-   */
-  private BlockReaderPeer nextDomainPeer() {
-    if (remainingCacheTries > 0) {
-      Peer peer = clientContext.getPeerCache().get(datanode, true);
-      if (peer != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("nextDomainPeer: reusing existing peer " + peer);
-        }
-        return new BlockReaderPeer(peer, true);
-      }
-    }
-    DomainSocket sock = clientContext.getDomainSocketFactory().
-        createSocket(pathInfo, conf.getSocketTimeout());
-    if (sock == null) return null;
-    return new BlockReaderPeer(new DomainPeer(sock), false);
-  }
-
-  /**
-   * Get the next TCP-based peer-- either from the cache or by creating it.
-   *
-   * @return the next Peer, or null if we could not construct one.
-   *
-   * @throws IOException  If there was an error while constructing the peer
-   *                      (such as an InvalidEncryptionKeyException)
-   */
-  private BlockReaderPeer nextTcpPeer() throws IOException {
-    if (remainingCacheTries > 0) {
-      Peer peer = clientContext.getPeerCache().get(datanode, false);
-      if (peer != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("nextTcpPeer: reusing existing peer " + peer);
-        }
-        return new BlockReaderPeer(peer, true);
-      }
-    }
-    try {
-      Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
-        datanode);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
-      }
-      return new BlockReaderPeer(peer, false);
-    } catch (IOException e) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
-                  "connected to " + datanode);
-      }
-      throw e;
-    }
-  }
-
-  /**
-   * Determine if an exception is security-related.
-   *
-   * We need to handle these exceptions differently than other IOExceptions.
-   * They don't indicate a communication problem.  Instead, they mean that there
-   * is some action the client needs to take, such as refetching block tokens,
-   * renewing encryption keys, etc.
-   *
-   * @param ioe    The exception
-   * @return       True only if the exception is security-related.
-   */
-  private static boolean isSecurityException(IOException ioe) {
-    return (ioe instanceof InvalidToken) ||
-            (ioe instanceof InvalidEncryptionKeyException) ||
-            (ioe instanceof InvalidBlockTokenException) ||
-            (ioe instanceof AccessControlException);
-  }
-
-  @SuppressWarnings("deprecation")
-  private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
-    if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
-      return RemoteBlockReader.newBlockReader(fileName,
-          block, token, startOffset, length, conf.getIoBufferSize(),
-          verifyChecksum, clientName, peer, datanode,
-          clientContext.getPeerCache(), cachingStrategy);
-    } else {
-      return RemoteBlockReader2.newBlockReader(
-          fileName, block, token, startOffset, length,
-          verifyChecksum, clientName, peer, datanode,
-          clientContext.getPeerCache(), cachingStrategy);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
-  }
-
-  /**
-   * File name to print when accessing a block directly (from servlets)
-   * @param s Address of the block location
-   * @param poolId Block pool ID of the block
-   * @param blockId Block ID of the block
-   * @return string that has a file name for debug purposes
-   */
-  public static String getFileName(final InetSocketAddress s,
-      final String poolId, final long blockId) {
-    return s.toString() + ":" + poolId + ":" + blockId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
deleted file mode 100644
index cac5366..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.BlockStorageLocation;
-import org.apache.hadoop.fs.HdfsVolumeId;
-import org.apache.hadoop.fs.VolumeId;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.token.Token;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-class BlockStorageLocationUtil {
-  
-  static final Log LOG = LogFactory
-      .getLog(BlockStorageLocationUtil.class);
-  
-  /**
-   * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
-   * of datanodes and blocks. The blocks must all correspond to the same
-   * block pool.
-   * 
-   * @param datanodeBlocks
-   *          Map of datanodes to block replicas at each datanode
-   * @return callables Used to query each datanode for location information on
-   *         the block replicas at the datanode
-   */
-  private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
-      Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
-      int timeout, boolean connectToDnViaHostname, Span parent) {
-    
-    if (datanodeBlocks.isEmpty()) {
-      return Lists.newArrayList();
-    }
-    
-    // Construct the callables, one per datanode
-    List<VolumeBlockLocationCallable> callables = 
-        new ArrayList<VolumeBlockLocationCallable>();
-    for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks
-        .entrySet()) {
-      // Construct RPC parameters
-      DatanodeInfo datanode = entry.getKey();
-      List<LocatedBlock> locatedBlocks = entry.getValue();
-      if (locatedBlocks.isEmpty()) {
-        continue;
-      }
-      
-      // Ensure that the blocks all are from the same block pool.
-      String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId();
-      for (LocatedBlock lb : locatedBlocks) {
-        if (!poolId.equals(lb.getBlock().getBlockPoolId())) {
-          throw new IllegalArgumentException(
-              "All blocks to be queried must be in the same block pool: " +
-              locatedBlocks.get(0).getBlock() + " and " + lb +
-              " are from different pools.");
-        }
-      }
-      
-      long[] blockIds = new long[locatedBlocks.size()];
-      int i = 0;
-      List<Token<BlockTokenIdentifier>> dnTokens = 
-          new ArrayList<Token<BlockTokenIdentifier>>(
-          locatedBlocks.size());
-      for (LocatedBlock b : locatedBlocks) {
-        blockIds[i++] = b.getBlock().getBlockId();
-        dnTokens.add(b.getBlockToken());
-      }
-      VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
-          conf, datanode, poolId, blockIds, dnTokens, timeout, 
-          connectToDnViaHostname, parent);
-      callables.add(callable);
-    }
-    return callables;
-  }
-  
-  /**
-   * Queries datanodes for the blocks specified in <code>datanodeBlocks</code>,
-   * making one RPC to each datanode. These RPCs are made in parallel using a
-   * threadpool.
-   * 
-   * @param datanodeBlocks
-   *          Map of datanodes to the blocks present on the DN
-   * @return metadatas Map of datanodes to block metadata of the DN
-   * @throws InvalidBlockTokenException
-   *           if client does not have read access on a requested block
-   */
-  static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
-      Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
-      int poolsize, int timeoutMs, boolean connectToDnViaHostname)
-        throws InvalidBlockTokenException {
-
-    List<VolumeBlockLocationCallable> callables = 
-        createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs, 
-            connectToDnViaHostname, Trace.currentSpan());
-    
-    // Use a thread pool to execute the Callables in parallel
-    List<Future<HdfsBlocksMetadata>> futures = 
-        new ArrayList<Future<HdfsBlocksMetadata>>();
-    ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
-    try {
-      futures = executor.invokeAll(callables, timeoutMs,
-          TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      // Swallow the exception here, because we can return partial results
-    }
-    executor.shutdown();
-    
-    Map<DatanodeInfo, HdfsBlocksMetadata> metadatas =
-        Maps.newHashMapWithExpectedSize(datanodeBlocks.size());
-    // Fill in metadatas with results from DN RPCs, where possible
-    for (int i = 0; i < futures.size(); i++) {
-      VolumeBlockLocationCallable callable = callables.get(i);
-      DatanodeInfo datanode = callable.getDatanodeInfo();
-      Future<HdfsBlocksMetadata> future = futures.get(i);
-      try {
-        HdfsBlocksMetadata metadata = future.get();
-        metadatas.put(callable.getDatanodeInfo(), metadata);
-      } catch (CancellationException e) {
-        LOG.info("Cancelled while waiting for datanode "
-            + datanode.getIpcAddr(false) + ": " + e.toString());
-      } catch (ExecutionException e) {
-        Throwable t = e.getCause();
-        if (t instanceof InvalidBlockTokenException) {
-          LOG.warn("Invalid access token when trying to retrieve "
-              + "information from datanode " + datanode.getIpcAddr(false));
-          throw (InvalidBlockTokenException) t;
-        }
-        else if (t instanceof UnsupportedOperationException) {
-          LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support"
-              + " required #getHdfsBlocksMetadata() API");
-          throw (UnsupportedOperationException) t;
-        } else {
-          LOG.info("Failed to query block locations on datanode " +
-              datanode.getIpcAddr(false) + ": " + t);
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Could not fetch information from datanode", t);
-        }
-      } catch (InterruptedException e) {
-        // Shouldn't happen, because invokeAll waits for all Futures to be ready
-        LOG.info("Interrupted while fetching HdfsBlocksMetadata");
-      }
-    }
-    
-    return metadatas;
-  }
-  
-  /**
-   * Group the per-replica {@link VolumeId} info returned from
-   * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be
-   * associated
-   * with the corresponding {@link LocatedBlock}.
-   * 
-   * @param blocks
-   *          Original LocatedBlock array
-   * @param metadatas
-   *          VolumeId information for the replicas on each datanode
-   * @return blockVolumeIds per-replica VolumeId information associated with the
-   *         parent LocatedBlock
-   */
-  static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
-      List<LocatedBlock> blocks,
-      Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) {
-    
-    // Initialize mapping of ExtendedBlock to LocatedBlock. 
-    // Used to associate results from DN RPCs to the parent LocatedBlock
-    Map<Long, LocatedBlock> blockIdToLocBlock = 
-        new HashMap<Long, LocatedBlock>();
-    for (LocatedBlock b : blocks) {
-      blockIdToLocBlock.put(b.getBlock().getBlockId(), b);
-    }
-    
-    // Initialize the mapping of blocks -> list of VolumeIds, one per replica
-    // This is filled out with real values from the DN RPCs
-    Map<LocatedBlock, List<VolumeId>> blockVolumeIds = 
-        new HashMap<LocatedBlock, List<VolumeId>>();
-    for (LocatedBlock b : blocks) {
-      ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
-      for (int i = 0; i < b.getLocations().length; i++) {
-        l.add(null);
-      }
-      blockVolumeIds.put(b, l);
-    }
-    
-    // Iterate through the list of metadatas (one per datanode). 
-    // For each metadata, if it's valid, insert its volume location information 
-    // into the Map returned to the caller 
-    for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) {
-      DatanodeInfo datanode = entry.getKey();
-      HdfsBlocksMetadata metadata = entry.getValue();
-      // Check if metadata is valid
-      if (metadata == null) {
-        continue;
-      }
-      long[] metaBlockIds = metadata.getBlockIds();
-      List<byte[]> metaVolumeIds = metadata.getVolumeIds();
-      List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
-      // Add VolumeId for each replica in the HdfsBlocksMetadata
-      for (int j = 0; j < metaBlockIds.length; j++) {
-        int volumeIndex = metaVolumeIndexes.get(j);
-        long blockId = metaBlockIds[j];
-        // Skip if block wasn't found, or not a valid index into metaVolumeIds
-        // Also skip if the DN responded with a block we didn't ask for
-        if (volumeIndex == Integer.MAX_VALUE
-            || volumeIndex >= metaVolumeIds.size()
-            || !blockIdToLocBlock.containsKey(blockId)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("No data for block " + blockId);
-          }
-          continue;
-        }
-        // Get the VolumeId by indexing into the list of VolumeIds
-        // provided by the datanode
-        byte[] volumeId = metaVolumeIds.get(volumeIndex);
-        HdfsVolumeId id = new HdfsVolumeId(volumeId);
-        // Find out which index we are in the LocatedBlock's replicas
-        LocatedBlock locBlock = blockIdToLocBlock.get(blockId);
-        DatanodeInfo[] dnInfos = locBlock.getLocations();
-        int index = -1;
-        for (int k = 0; k < dnInfos.length; k++) {
-          if (dnInfos[k].equals(datanode)) {
-            index = k;
-            break;
-          }
-        }
-        if (index < 0) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Datanode responded with a block volume id we did" +
-                " not request, omitting.");
-          }
-          continue;
-        }
-        // Place VolumeId at the same index as the DN's index in the list of
-        // replicas
-        List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
-        volumeIds.set(index, id);
-      }
-    }
-    return blockVolumeIds;
-  }
-
-  /**
-   * Helper method to combine a list of {@link LocatedBlock} with associated
-   * {@link VolumeId} information to form a list of {@link BlockStorageLocation}
-   * .
-   */
-  static BlockStorageLocation[] convertToVolumeBlockLocations(
-      List<LocatedBlock> blocks, 
-      Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
-    // Construct the final return value of VolumeBlockLocation[]
-    BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
-    List<BlockStorageLocation> volumeBlockLocs = 
-        new ArrayList<BlockStorageLocation>(locations.length);
-    for (int i = 0; i < locations.length; i++) {
-      LocatedBlock locBlock = blocks.get(i);
-      List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
-      BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], 
-          volumeIds.toArray(new VolumeId[0]));
-      volumeBlockLocs.add(bsLoc);
-    }
-    return volumeBlockLocs.toArray(new BlockStorageLocation[] {});
-  }
-  
-  /**
-   * Callable that sets up an RPC proxy to a datanode and queries it for
-   * volume location information for a list of ExtendedBlocks. 
-   */
-  private static class VolumeBlockLocationCallable implements 
-    Callable<HdfsBlocksMetadata> {
-    
-    private final Configuration configuration;
-    private final int timeout;
-    private final DatanodeInfo datanode;
-    private final String poolId;
-    private final long[] blockIds;
-    private final List<Token<BlockTokenIdentifier>> dnTokens;
-    private final boolean connectToDnViaHostname;
-    private final Span parentSpan;
-    
-    VolumeBlockLocationCallable(Configuration configuration,
-        DatanodeInfo datanode, String poolId, long []blockIds,
-        List<Token<BlockTokenIdentifier>> dnTokens, int timeout, 
-        boolean connectToDnViaHostname, Span parentSpan) {
-      this.configuration = configuration;
-      this.timeout = timeout;
-      this.datanode = datanode;
-      this.poolId = poolId;
-      this.blockIds = blockIds;
-      this.dnTokens = dnTokens;
-      this.connectToDnViaHostname = connectToDnViaHostname;
-      this.parentSpan = parentSpan;
-    }
-    
-    public DatanodeInfo getDatanodeInfo() {
-      return datanode;
-    }
-
-    @Override
-    public HdfsBlocksMetadata call() throws Exception {
-      HdfsBlocksMetadata metadata = null;
-      // Create the RPC proxy and make the RPC
-      ClientDatanodeProtocol cdp = null;
-      TraceScope scope =
-          Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
-      try {
-        cdp = DFSUtilClient.createClientDatanodeProtocolProxy(
-            datanode, configuration,
-            timeout, connectToDnViaHostname);
-        metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
-      } catch (IOException e) {
-        // Bubble this up to the caller, handle with the Future
-        throw e;
-      } finally {
-        scope.close();
-        if (cdp != null) {
-          RPC.stopProxy(cdp);
-        }
-      }
-      return metadata;
-    }
-  }
-}