You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/09/26 20:17:15 UTC
[06/12] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream and
related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
deleted file mode 100644
index c9add53..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ /dev/null
@@ -1,892 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.PerformanceAdvisory;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-
-/**
- * Utility class to create BlockReader implementations.
- */
-@InterfaceAudience.Private
-public class BlockReaderFactory implements ShortCircuitReplicaCreator {
- static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
-
- public static class FailureInjector {
- public void injectRequestFileDescriptorsFailure() throws IOException {
- // do nothing
- }
- public boolean getSupportsReceiptVerification() {
- return true;
- }
- }
-
- @VisibleForTesting
- static ShortCircuitReplicaCreator
- createShortCircuitReplicaInfoCallback = null;
-
- private final DfsClientConf conf;
-
- /**
- * Injects failures into specific operations during unit tests.
- */
- private static FailureInjector failureInjector = new FailureInjector();
-
- /**
- * The file name, for logging and debugging purposes.
- */
- private String fileName;
-
- /**
- * The block ID and block pool ID to use.
- */
- private ExtendedBlock block;
-
- /**
- * The block token to use for security purposes.
- */
- private Token<BlockTokenIdentifier> token;
-
- /**
- * The offset within the block to start reading at.
- */
- private long startOffset;
-
- /**
- * If false, we won't try to verify the block checksum.
- */
- private boolean verifyChecksum;
-
- /**
- * The name of this client.
- */
- private String clientName;
-
- /**
- * The DataNode we're talking to.
- */
- private DatanodeInfo datanode;
-
- /**
- * StorageType of replica on DataNode.
- */
- private StorageType storageType;
-
- /**
- * If false, we won't try short-circuit local reads.
- */
- private boolean allowShortCircuitLocalReads;
-
- /**
- * The ClientContext to use for things like the PeerCache.
- */
- private ClientContext clientContext;
-
- /**
- * Number of bytes to read. -1 indicates no limit.
- */
- private long length = -1;
-
- /**
- * Caching strategy to use when reading the block.
- */
- private CachingStrategy cachingStrategy;
-
- /**
- * Socket address to use to connect to peer.
- */
- private InetSocketAddress inetSocketAddress;
-
- /**
- * Remote peer factory to use to create a peer, if needed.
- */
- private RemotePeerFactory remotePeerFactory;
-
- /**
- * UserGroupInformation to use for legacy block reader local objects, if needed.
- */
- private UserGroupInformation userGroupInformation;
-
- /**
- * Configuration to use for legacy block reader local objects, if needed.
- */
- private Configuration configuration;
-
- /**
- * Information about the domain socket path we should use to connect to the
- * local peer-- or null if we haven't examined the local domain socket.
- */
- private DomainSocketFactory.PathInfo pathInfo;
-
- /**
- * The remaining number of times that we'll try to pull a socket out of the
- * cache.
- */
- private int remainingCacheTries;
-
- public BlockReaderFactory(DfsClientConf conf) {
- this.conf = conf;
- this.remainingCacheTries = conf.getNumCachedConnRetry();
- }
-
- public BlockReaderFactory setFileName(String fileName) {
- this.fileName = fileName;
- return this;
- }
-
- public BlockReaderFactory setBlock(ExtendedBlock block) {
- this.block = block;
- return this;
- }
-
- public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
- this.token = token;
- return this;
- }
-
- public BlockReaderFactory setStartOffset(long startOffset) {
- this.startOffset = startOffset;
- return this;
- }
-
- public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
- this.verifyChecksum = verifyChecksum;
- return this;
- }
-
- public BlockReaderFactory setClientName(String clientName) {
- this.clientName = clientName;
- return this;
- }
-
- public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
- this.datanode = datanode;
- return this;
- }
-
- public BlockReaderFactory setStorageType(StorageType storageType) {
- this.storageType = storageType;
- return this;
- }
-
- public BlockReaderFactory setAllowShortCircuitLocalReads(
- boolean allowShortCircuitLocalReads) {
- this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
- return this;
- }
-
- public BlockReaderFactory setClientCacheContext(
- ClientContext clientContext) {
- this.clientContext = clientContext;
- return this;
- }
-
- public BlockReaderFactory setLength(long length) {
- this.length = length;
- return this;
- }
-
- public BlockReaderFactory setCachingStrategy(
- CachingStrategy cachingStrategy) {
- this.cachingStrategy = cachingStrategy;
- return this;
- }
-
- public BlockReaderFactory setInetSocketAddress (
- InetSocketAddress inetSocketAddress) {
- this.inetSocketAddress = inetSocketAddress;
- return this;
- }
-
- public BlockReaderFactory setUserGroupInformation(
- UserGroupInformation userGroupInformation) {
- this.userGroupInformation = userGroupInformation;
- return this;
- }
-
- public BlockReaderFactory setRemotePeerFactory(
- RemotePeerFactory remotePeerFactory) {
- this.remotePeerFactory = remotePeerFactory;
- return this;
- }
-
- public BlockReaderFactory setConfiguration(
- Configuration configuration) {
- this.configuration = configuration;
- return this;
- }
-
- @VisibleForTesting
- public static void setFailureInjectorForTesting(FailureInjector injector) {
- failureInjector = injector;
- }
-
- /**
- * Build a BlockReader with the given options.
- *
- * This function will do the best it can to create a block reader that meets
- * all of our requirements. We prefer short-circuit block readers
- * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
- * former avoid the overhead of socket communication. If short-circuit is
- * unavailable, our next fallback is data transfer over UNIX domain sockets,
- * if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't
- * work, we will try to create a remote block reader that operates over TCP
- * sockets.
- *
- * There are a few caches that are important here.
- *
- * The ShortCircuitCache stores file descriptor objects which have been passed
- * from the DataNode.
- *
- * The DomainSocketFactory stores information about UNIX domain socket paths
- * that we not been able to use in the past, so that we don't waste time
- * retrying them over and over. (Like all the caches, it does have a timeout,
- * though.)
- *
- * The PeerCache stores peers that we have used in the past. If we can reuse
- * one of these peers, we avoid the overhead of re-opening a socket. However,
- * if the socket has been timed out on the remote end, our attempt to reuse
- * the socket may end with an IOException. For that reason, we limit our
- * attempts at socket reuse to dfs.client.cached.conn.retry times. After
- * that, we create new sockets. This avoids the problem where a thread tries
- * to talk to a peer that it hasn't talked to in a while, and has to clean out
- * every entry in a socket cache full of stale entries.
- *
- * @return The new BlockReader. We will not return null.
- *
- * @throws InvalidToken
- * If the block token was invalid.
- * InvalidEncryptionKeyException
- * If the encryption key was invalid.
- * Other IOException
- * If there was another problem.
- */
- public BlockReader build() throws IOException {
- Preconditions.checkNotNull(configuration);
- BlockReader reader = tryToCreateExternalBlockReader();
- if (reader != null) {
- return reader;
- }
- final ShortCircuitConf scConf = conf.getShortCircuitConf();
- if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
- if (clientContext.getUseLegacyBlockReaderLocal()) {
- reader = getLegacyBlockReaderLocal();
- if (reader != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": returning new legacy block reader local.");
- }
- return reader;
- }
- } else {
- reader = getBlockReaderLocal();
- if (reader != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": returning new block reader local.");
- }
- return reader;
- }
- }
- }
- if (scConf.isDomainSocketDataTraffic()) {
- reader = getRemoteBlockReaderFromDomain();
- if (reader != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": returning new remote block reader using " +
- "UNIX domain socket on " + pathInfo.getPath());
- }
- return reader;
- }
- }
- Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
- "TCP reads were disabled for testing, but we failed to " +
- "do a non-TCP read.");
- return getRemoteBlockReaderFromTcp();
- }
-
- private BlockReader tryToCreateExternalBlockReader() {
- List<Class<? extends ReplicaAccessorBuilder>> clses =
- conf.getReplicaAccessorBuilderClasses();
- for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
- try {
- ByteArrayDataOutput bado = ByteStreams.newDataOutput();
- token.write(bado);
- byte tokenBytes[] = bado.toByteArray();
-
- Constructor<? extends ReplicaAccessorBuilder> ctor =
- cls.getConstructor();
- ReplicaAccessorBuilder builder = ctor.newInstance();
- ReplicaAccessor accessor = builder.
- setAllowShortCircuitReads(allowShortCircuitLocalReads).
- setBlock(block.getBlockId(), block.getBlockPoolId()).
- setGenerationStamp(block.getGenerationStamp()).
- setBlockAccessToken(tokenBytes).
- setClientName(clientName).
- setConfiguration(configuration).
- setFileName(fileName).
- setVerifyChecksum(verifyChecksum).
- setVisibleLength(length).
- build();
- if (accessor == null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": No ReplicaAccessor created by " +
- cls.getName());
- }
- } else {
- return new ExternalBlockReader(accessor, length, startOffset);
- }
- } catch (Throwable t) {
- LOG.warn("Failed to construct new object of type " +
- cls.getName(), t);
- }
- }
- return null;
- }
-
-
- /**
- * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
- * This block reader implements the path-based style of local reads
- * first introduced in HDFS-2246.
- */
- private BlockReader getLegacyBlockReaderLocal() throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
- }
- if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
- "the address " + inetSocketAddress + " is not local");
- }
- return null;
- }
- if (clientContext.getDisableLegacyBlockReaderLocal()) {
- PerformanceAdvisory.LOG.debug("{}: can't construct " +
- "BlockReaderLocalLegacy because " +
- "disableLegacyBlockReaderLocal is set.", this);
- return null;
- }
- IOException ioe;
- try {
- return BlockReaderLocalLegacy.newBlockReader(conf,
- userGroupInformation, configuration, fileName, block, token,
- datanode, startOffset, length, storageType);
- } catch (RemoteException remoteException) {
- ioe = remoteException.unwrapRemoteException(
- InvalidToken.class, AccessControlException.class);
- } catch (IOException e) {
- ioe = e;
- }
- if ((!(ioe instanceof AccessControlException)) &&
- isSecurityException(ioe)) {
- // Handle security exceptions.
- // We do not handle AccessControlException here, since
- // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
- // that the user is not in dfs.block.local-path-access.user, a condition
- // which requires us to disable legacy SCR.
- throw ioe;
- }
- LOG.warn(this + ": error creating legacy BlockReaderLocal. " +
- "Disabling legacy local reads.", ioe);
- clientContext.setDisableLegacyBlockReaderLocal();
- return null;
- }
-
- private BlockReader getBlockReaderLocal() throws InvalidToken {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": trying to construct a BlockReaderLocal " +
- "for short-circuit reads.");
- }
- if (pathInfo == null) {
- pathInfo = clientContext.getDomainSocketFactory()
- .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
- }
- if (!pathInfo.getPathState().getUsableForShortCircuit()) {
- PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
- "giving up on BlockReaderLocal.", this, pathInfo);
- return null;
- }
- ShortCircuitCache cache = clientContext.getShortCircuitCache();
- ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
- ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
- InvalidToken exc = info.getInvalidTokenException();
- if (exc != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": got InvalidToken exception while trying to " +
- "construct BlockReaderLocal via " + pathInfo.getPath());
- }
- throw exc;
- }
- if (info.getReplica() == null) {
- PerformanceAdvisory.LOG.debug("{}: failed to get " +
- "ShortCircuitReplica. Cannot construct " +
- "BlockReaderLocal via {}", this, pathInfo.getPath());
- return null;
- }
- return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
- setFilename(fileName).
- setBlock(block).
- setStartOffset(startOffset).
- setShortCircuitReplica(info.getReplica()).
- setVerifyChecksum(verifyChecksum).
- setCachingStrategy(cachingStrategy).
- setStorageType(storageType).
- build();
- }
-
- /**
- * Fetch a pair of short-circuit block descriptors from a local DataNode.
- *
- * @return Null if we could not communicate with the datanode,
- * a new ShortCircuitReplicaInfo object otherwise.
- * ShortCircuitReplicaInfo objects may contain either an InvalidToken
- * exception, or a ShortCircuitReplica object ready to use.
- */
- @Override
- public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
- if (createShortCircuitReplicaInfoCallback != null) {
- ShortCircuitReplicaInfo info =
- createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
- if (info != null) return info;
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
- }
- BlockReaderPeer curPeer;
- while (true) {
- curPeer = nextDomainPeer();
- if (curPeer == null) break;
- if (curPeer.fromCache) remainingCacheTries--;
- DomainPeer peer = (DomainPeer)curPeer.peer;
- Slot slot = null;
- ShortCircuitCache cache = clientContext.getShortCircuitCache();
- try {
- MutableBoolean usedPeer = new MutableBoolean(false);
- slot = cache.allocShmSlot(datanode, peer, usedPeer,
- new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
- clientName);
- if (usedPeer.booleanValue()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": allocShmSlot used up our previous socket " +
- peer.getDomainSocket() + ". Allocating a new one...");
- }
- curPeer = nextDomainPeer();
- if (curPeer == null) break;
- peer = (DomainPeer)curPeer.peer;
- }
- ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
- clientContext.getPeerCache().put(datanode, peer);
- return info;
- } catch (IOException e) {
- if (slot != null) {
- cache.freeSlot(slot);
- }
- if (curPeer.fromCache) {
- // Handle an I/O error we got when using a cached socket.
- // These are considered less serious, because the socket may be stale.
- if (LOG.isDebugEnabled()) {
- LOG.debug(this + ": closing stale domain peer " + peer, e);
- }
- IOUtils.cleanup(LOG, peer);
- } else {
- // Handle an I/O error we got when using a newly created socket.
- // We temporarily disable the domain socket path for a few minutes in
- // this case, to prevent wasting more time on it.
- LOG.warn(this + ": I/O error requesting file descriptors. " +
- "Disabling domain socket " + peer.getDomainSocket(), e);
- IOUtils.cleanup(LOG, peer);
- clientContext.getDomainSocketFactory()
- .disableDomainSocketPath(pathInfo.getPath());
- return null;
- }
- }
- }
- return null;
- }
-
- /**
- * Request file descriptors from a DomainPeer.
- *
- * @param peer The peer to use for communication.
- * @param slot If non-null, the shared memory slot to associate with the
- * new ShortCircuitReplica.
- *
- * @return A ShortCircuitReplica object if we could communicate with the
- * datanode; null, otherwise.
- * @throws IOException If we encountered an I/O exception while communicating
- * with the datanode.
- */
- private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
- Slot slot) throws IOException {
- ShortCircuitCache cache = clientContext.getShortCircuitCache();
- final DataOutputStream out =
- new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
- SlotId slotId = slot == null ? null : slot.getSlotId();
- new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
- failureInjector.getSupportsReceiptVerification());
- DataInputStream in = new DataInputStream(peer.getInputStream());
- BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
- PBHelperClient.vintPrefixed(in));
- DomainSocket sock = peer.getDomainSocket();
- failureInjector.injectRequestFileDescriptorsFailure();
- switch (resp.getStatus()) {
- case SUCCESS:
- byte buf[] = new byte[1];
- FileInputStream fis[] = new FileInputStream[2];
- sock.recvFileInputStreams(fis, buf, 0, buf.length);
- ShortCircuitReplica replica = null;
- try {
- ExtendedBlockId key =
- new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
- if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
- LOG.trace("Sending receipt verification byte for slot " + slot);
- sock.getOutputStream().write(0);
- }
- replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
- Time.monotonicNow(), slot);
- return new ShortCircuitReplicaInfo(replica);
- } catch (IOException e) {
- // This indicates an error reading from disk, or a format error. Since
- // it's not a socket communication problem, we return null rather than
- // throwing an exception.
- LOG.warn(this + ": error creating ShortCircuitReplica.", e);
- return null;
- } finally {
- if (replica == null) {
- IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
- }
- }
- case ERROR_UNSUPPORTED:
- if (!resp.hasShortCircuitAccessVersion()) {
- LOG.warn("short-circuit read access is disabled for " +
- "DataNode " + datanode + ". reason: " + resp.getMessage());
- clientContext.getDomainSocketFactory()
- .disableShortCircuitForPath(pathInfo.getPath());
- } else {
- LOG.warn("short-circuit read access for the file " +
- fileName + " is disabled for DataNode " + datanode +
- ". reason: " + resp.getMessage());
- }
- return null;
- case ERROR_ACCESS_TOKEN:
- String msg = "access control error while " +
- "attempting to set up short-circuit access to " +
- fileName + resp.getMessage();
- if (LOG.isDebugEnabled()) {
- LOG.debug(this + ":" + msg);
- }
- return new ShortCircuitReplicaInfo(new InvalidToken(msg));
- default:
- LOG.warn(this + ": unknown response code " + resp.getStatus() +
- " while attempting to set up short-circuit access. " +
- resp.getMessage());
- clientContext.getDomainSocketFactory()
- .disableShortCircuitForPath(pathInfo.getPath());
- return null;
- }
- }
-
- /**
- * Get a RemoteBlockReader that communicates over a UNIX domain socket.
- *
- * @return The new BlockReader, or null if we failed to create the block
- * reader.
- *
- * @throws InvalidToken If the block token was invalid.
- * Potentially other security-related execptions.
- */
- private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
- if (pathInfo == null) {
- pathInfo = clientContext.getDomainSocketFactory()
- .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
- }
- if (!pathInfo.getPathState().getUsableForDataTransfer()) {
- PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
- "remote block reader because the UNIX domain socket at {}" +
- " is not usable.", this, pathInfo);
- return null;
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": trying to create a remote block reader from the " +
- "UNIX domain socket at " + pathInfo.getPath());
- }
-
- while (true) {
- BlockReaderPeer curPeer = nextDomainPeer();
- if (curPeer == null) break;
- if (curPeer.fromCache) remainingCacheTries--;
- DomainPeer peer = (DomainPeer)curPeer.peer;
- BlockReader blockReader = null;
- try {
- blockReader = getRemoteBlockReader(peer);
- return blockReader;
- } catch (IOException ioe) {
- IOUtils.cleanup(LOG, peer);
- if (isSecurityException(ioe)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": got security exception while constructing " +
- "a remote block reader from the unix domain socket at " +
- pathInfo.getPath(), ioe);
- }
- throw ioe;
- }
- if (curPeer.fromCache) {
- // Handle an I/O error we got when using a cached peer. These are
- // considered less serious, because the underlying socket may be stale.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closed potentially stale domain peer " + peer, ioe);
- }
- } else {
- // Handle an I/O error we got when using a newly created domain peer.
- // We temporarily disable the domain socket path for a few minutes in
- // this case, to prevent wasting more time on it.
- LOG.warn("I/O error constructing remote block reader. Disabling " +
- "domain socket " + peer.getDomainSocket(), ioe);
- clientContext.getDomainSocketFactory()
- .disableDomainSocketPath(pathInfo.getPath());
- return null;
- }
- } finally {
- if (blockReader == null) {
- IOUtils.cleanup(LOG, peer);
- }
- }
- }
- return null;
- }
-
- /**
- * Get a RemoteBlockReader that communicates over a TCP socket.
- *
- * @return The new BlockReader. We will not return null, but instead throw
- * an exception if this fails.
- *
- * @throws InvalidToken
- * If the block token was invalid.
- * InvalidEncryptionKeyException
- * If the encryption key was invalid.
- * Other IOException
- * If there was another problem.
- */
- private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": trying to create a remote block reader from a " +
- "TCP socket");
- }
- BlockReader blockReader = null;
- while (true) {
- BlockReaderPeer curPeer = null;
- Peer peer = null;
- try {
- curPeer = nextTcpPeer();
- if (curPeer.fromCache) remainingCacheTries--;
- peer = curPeer.peer;
- blockReader = getRemoteBlockReader(peer);
- return blockReader;
- } catch (IOException ioe) {
- if (isSecurityException(ioe)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + ": got security exception while constructing " +
- "a remote block reader from " + peer, ioe);
- }
- throw ioe;
- }
- if ((curPeer != null) && curPeer.fromCache) {
- // Handle an I/O error we got when using a cached peer. These are
- // considered less serious, because the underlying socket may be
- // stale.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closed potentially stale remote peer " + peer, ioe);
- }
- } else {
- // Handle an I/O error we got when using a newly created peer.
- LOG.warn("I/O error constructing remote block reader.", ioe);
- throw ioe;
- }
- } finally {
- if (blockReader == null) {
- IOUtils.cleanup(LOG, peer);
- }
- }
- }
- }
-
- public static class BlockReaderPeer {
- final Peer peer;
- final boolean fromCache;
-
- BlockReaderPeer(Peer peer, boolean fromCache) {
- this.peer = peer;
- this.fromCache = fromCache;
- }
- }
-
- /**
- * Get the next DomainPeer-- either from the cache or by creating it.
- *
- * @return the next DomainPeer, or null if we could not construct one.
- */
- private BlockReaderPeer nextDomainPeer() {
- if (remainingCacheTries > 0) {
- Peer peer = clientContext.getPeerCache().get(datanode, true);
- if (peer != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("nextDomainPeer: reusing existing peer " + peer);
- }
- return new BlockReaderPeer(peer, true);
- }
- }
- DomainSocket sock = clientContext.getDomainSocketFactory().
- createSocket(pathInfo, conf.getSocketTimeout());
- if (sock == null) return null;
- return new BlockReaderPeer(new DomainPeer(sock), false);
- }
-
- /**
- * Get the next TCP-based peer-- either from the cache or by creating it.
- *
- * @return the next Peer, or null if we could not construct one.
- *
- * @throws IOException If there was an error while constructing the peer
- * (such as an InvalidEncryptionKeyException)
- */
- private BlockReaderPeer nextTcpPeer() throws IOException {
- if (remainingCacheTries > 0) {
- Peer peer = clientContext.getPeerCache().get(datanode, false);
- if (peer != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("nextTcpPeer: reusing existing peer " + peer);
- }
- return new BlockReaderPeer(peer, true);
- }
- }
- try {
- Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
- datanode);
- if (LOG.isTraceEnabled()) {
- LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
- }
- return new BlockReaderPeer(peer, false);
- } catch (IOException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
- "connected to " + datanode);
- }
- throw e;
- }
- }
-
- /**
- * Determine if an exception is security-related.
- *
- * We need to handle these exceptions differently than other IOExceptions.
- * They don't indicate a communication problem. Instead, they mean that there
- * is some action the client needs to take, such as refetching block tokens,
- * renewing encryption keys, etc.
- *
- * @param ioe The exception
- * @return True only if the exception is security-related.
- */
- private static boolean isSecurityException(IOException ioe) {
- return (ioe instanceof InvalidToken) ||
- (ioe instanceof InvalidEncryptionKeyException) ||
- (ioe instanceof InvalidBlockTokenException) ||
- (ioe instanceof AccessControlException);
- }
-
- @SuppressWarnings("deprecation")
- private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
- if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
- return RemoteBlockReader.newBlockReader(fileName,
- block, token, startOffset, length, conf.getIoBufferSize(),
- verifyChecksum, clientName, peer, datanode,
- clientContext.getPeerCache(), cachingStrategy);
- } else {
- return RemoteBlockReader2.newBlockReader(
- fileName, block, token, startOffset, length,
- verifyChecksum, clientName, peer, datanode,
- clientContext.getPeerCache(), cachingStrategy);
- }
- }
-
- @Override
- public String toString() {
- return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
- }
-
- /**
- * File name to print when accessing a block directly (from servlets)
- * @param s Address of the block location
- * @param poolId Block pool ID of the block
- * @param blockId Block ID of the block
- * @return string that has a file name for debug purposes
- */
- public static String getFileName(final InetSocketAddress s,
- final String poolId, final long blockId) {
- return s.toString() + ":" + poolId + ":" + blockId;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
deleted file mode 100644
index cac5366..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.BlockStorageLocation;
-import org.apache.hadoop.fs.HdfsVolumeId;
-import org.apache.hadoop.fs.VolumeId;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.token.Token;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-class BlockStorageLocationUtil {
-
- static final Log LOG = LogFactory
- .getLog(BlockStorageLocationUtil.class);
-
- /**
- * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
- * of datanodes and blocks. The blocks must all correspond to the same
- * block pool.
- *
- * @param datanodeBlocks
- * Map of datanodes to block replicas at each datanode
- * @return callables Used to query each datanode for location information on
- * the block replicas at the datanode
- */
- private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
- Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
- int timeout, boolean connectToDnViaHostname, Span parent) {
-
- if (datanodeBlocks.isEmpty()) {
- return Lists.newArrayList();
- }
-
- // Construct the callables, one per datanode
- List<VolumeBlockLocationCallable> callables =
- new ArrayList<VolumeBlockLocationCallable>();
- for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks
- .entrySet()) {
- // Construct RPC parameters
- DatanodeInfo datanode = entry.getKey();
- List<LocatedBlock> locatedBlocks = entry.getValue();
- if (locatedBlocks.isEmpty()) {
- continue;
- }
-
- // Ensure that the blocks all are from the same block pool.
- String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId();
- for (LocatedBlock lb : locatedBlocks) {
- if (!poolId.equals(lb.getBlock().getBlockPoolId())) {
- throw new IllegalArgumentException(
- "All blocks to be queried must be in the same block pool: " +
- locatedBlocks.get(0).getBlock() + " and " + lb +
- " are from different pools.");
- }
- }
-
- long[] blockIds = new long[locatedBlocks.size()];
- int i = 0;
- List<Token<BlockTokenIdentifier>> dnTokens =
- new ArrayList<Token<BlockTokenIdentifier>>(
- locatedBlocks.size());
- for (LocatedBlock b : locatedBlocks) {
- blockIds[i++] = b.getBlock().getBlockId();
- dnTokens.add(b.getBlockToken());
- }
- VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
- conf, datanode, poolId, blockIds, dnTokens, timeout,
- connectToDnViaHostname, parent);
- callables.add(callable);
- }
- return callables;
- }
-
- /**
- * Queries datanodes for the blocks specified in <code>datanodeBlocks</code>,
- * making one RPC to each datanode. These RPCs are made in parallel using a
- * threadpool.
- *
- * @param datanodeBlocks
- * Map of datanodes to the blocks present on the DN
- * @return metadatas Map of datanodes to block metadata of the DN
- * @throws InvalidBlockTokenException
- * if client does not have read access on a requested block
- */
- static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
- Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
- int poolsize, int timeoutMs, boolean connectToDnViaHostname)
- throws InvalidBlockTokenException {
-
- List<VolumeBlockLocationCallable> callables =
- createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs,
- connectToDnViaHostname, Trace.currentSpan());
-
- // Use a thread pool to execute the Callables in parallel
- List<Future<HdfsBlocksMetadata>> futures =
- new ArrayList<Future<HdfsBlocksMetadata>>();
- ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
- try {
- futures = executor.invokeAll(callables, timeoutMs,
- TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- // Swallow the exception here, because we can return partial results
- }
- executor.shutdown();
-
- Map<DatanodeInfo, HdfsBlocksMetadata> metadatas =
- Maps.newHashMapWithExpectedSize(datanodeBlocks.size());
- // Fill in metadatas with results from DN RPCs, where possible
- for (int i = 0; i < futures.size(); i++) {
- VolumeBlockLocationCallable callable = callables.get(i);
- DatanodeInfo datanode = callable.getDatanodeInfo();
- Future<HdfsBlocksMetadata> future = futures.get(i);
- try {
- HdfsBlocksMetadata metadata = future.get();
- metadatas.put(callable.getDatanodeInfo(), metadata);
- } catch (CancellationException e) {
- LOG.info("Cancelled while waiting for datanode "
- + datanode.getIpcAddr(false) + ": " + e.toString());
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof InvalidBlockTokenException) {
- LOG.warn("Invalid access token when trying to retrieve "
- + "information from datanode " + datanode.getIpcAddr(false));
- throw (InvalidBlockTokenException) t;
- }
- else if (t instanceof UnsupportedOperationException) {
- LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support"
- + " required #getHdfsBlocksMetadata() API");
- throw (UnsupportedOperationException) t;
- } else {
- LOG.info("Failed to query block locations on datanode " +
- datanode.getIpcAddr(false) + ": " + t);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Could not fetch information from datanode", t);
- }
- } catch (InterruptedException e) {
- // Shouldn't happen, because invokeAll waits for all Futures to be ready
- LOG.info("Interrupted while fetching HdfsBlocksMetadata");
- }
- }
-
- return metadatas;
- }
-
- /**
- * Group the per-replica {@link VolumeId} info returned from
- * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be
- * associated
- * with the corresponding {@link LocatedBlock}.
- *
- * @param blocks
- * Original LocatedBlock array
- * @param metadatas
- * VolumeId information for the replicas on each datanode
- * @return blockVolumeIds per-replica VolumeId information associated with the
- * parent LocatedBlock
- */
- static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
- List<LocatedBlock> blocks,
- Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) {
-
- // Initialize mapping of ExtendedBlock to LocatedBlock.
- // Used to associate results from DN RPCs to the parent LocatedBlock
- Map<Long, LocatedBlock> blockIdToLocBlock =
- new HashMap<Long, LocatedBlock>();
- for (LocatedBlock b : blocks) {
- blockIdToLocBlock.put(b.getBlock().getBlockId(), b);
- }
-
- // Initialize the mapping of blocks -> list of VolumeIds, one per replica
- // This is filled out with real values from the DN RPCs
- Map<LocatedBlock, List<VolumeId>> blockVolumeIds =
- new HashMap<LocatedBlock, List<VolumeId>>();
- for (LocatedBlock b : blocks) {
- ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
- for (int i = 0; i < b.getLocations().length; i++) {
- l.add(null);
- }
- blockVolumeIds.put(b, l);
- }
-
- // Iterate through the list of metadatas (one per datanode).
- // For each metadata, if it's valid, insert its volume location information
- // into the Map returned to the caller
- for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) {
- DatanodeInfo datanode = entry.getKey();
- HdfsBlocksMetadata metadata = entry.getValue();
- // Check if metadata is valid
- if (metadata == null) {
- continue;
- }
- long[] metaBlockIds = metadata.getBlockIds();
- List<byte[]> metaVolumeIds = metadata.getVolumeIds();
- List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
- // Add VolumeId for each replica in the HdfsBlocksMetadata
- for (int j = 0; j < metaBlockIds.length; j++) {
- int volumeIndex = metaVolumeIndexes.get(j);
- long blockId = metaBlockIds[j];
- // Skip if block wasn't found, or not a valid index into metaVolumeIds
- // Also skip if the DN responded with a block we didn't ask for
- if (volumeIndex == Integer.MAX_VALUE
- || volumeIndex >= metaVolumeIds.size()
- || !blockIdToLocBlock.containsKey(blockId)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No data for block " + blockId);
- }
- continue;
- }
- // Get the VolumeId by indexing into the list of VolumeIds
- // provided by the datanode
- byte[] volumeId = metaVolumeIds.get(volumeIndex);
- HdfsVolumeId id = new HdfsVolumeId(volumeId);
- // Find out which index we are in the LocatedBlock's replicas
- LocatedBlock locBlock = blockIdToLocBlock.get(blockId);
- DatanodeInfo[] dnInfos = locBlock.getLocations();
- int index = -1;
- for (int k = 0; k < dnInfos.length; k++) {
- if (dnInfos[k].equals(datanode)) {
- index = k;
- break;
- }
- }
- if (index < 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Datanode responded with a block volume id we did" +
- " not request, omitting.");
- }
- continue;
- }
- // Place VolumeId at the same index as the DN's index in the list of
- // replicas
- List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
- volumeIds.set(index, id);
- }
- }
- return blockVolumeIds;
- }
-
- /**
- * Helper method to combine a list of {@link LocatedBlock} with associated
- * {@link VolumeId} information to form a list of {@link BlockStorageLocation}
- * .
- */
- static BlockStorageLocation[] convertToVolumeBlockLocations(
- List<LocatedBlock> blocks,
- Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
- // Construct the final return value of VolumeBlockLocation[]
- BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
- List<BlockStorageLocation> volumeBlockLocs =
- new ArrayList<BlockStorageLocation>(locations.length);
- for (int i = 0; i < locations.length; i++) {
- LocatedBlock locBlock = blocks.get(i);
- List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
- BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i],
- volumeIds.toArray(new VolumeId[0]));
- volumeBlockLocs.add(bsLoc);
- }
- return volumeBlockLocs.toArray(new BlockStorageLocation[] {});
- }
-
- /**
- * Callable that sets up an RPC proxy to a datanode and queries it for
- * volume location information for a list of ExtendedBlocks.
- */
- private static class VolumeBlockLocationCallable implements
- Callable<HdfsBlocksMetadata> {
-
- private final Configuration configuration;
- private final int timeout;
- private final DatanodeInfo datanode;
- private final String poolId;
- private final long[] blockIds;
- private final List<Token<BlockTokenIdentifier>> dnTokens;
- private final boolean connectToDnViaHostname;
- private final Span parentSpan;
-
- VolumeBlockLocationCallable(Configuration configuration,
- DatanodeInfo datanode, String poolId, long []blockIds,
- List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
- boolean connectToDnViaHostname, Span parentSpan) {
- this.configuration = configuration;
- this.timeout = timeout;
- this.datanode = datanode;
- this.poolId = poolId;
- this.blockIds = blockIds;
- this.dnTokens = dnTokens;
- this.connectToDnViaHostname = connectToDnViaHostname;
- this.parentSpan = parentSpan;
- }
-
- public DatanodeInfo getDatanodeInfo() {
- return datanode;
- }
-
- @Override
- public HdfsBlocksMetadata call() throws Exception {
- HdfsBlocksMetadata metadata = null;
- // Create the RPC proxy and make the RPC
- ClientDatanodeProtocol cdp = null;
- TraceScope scope =
- Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
- try {
- cdp = DFSUtilClient.createClientDatanodeProtocolProxy(
- datanode, configuration,
- timeout, connectToDnViaHostname);
- metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
- } catch (IOException e) {
- // Bubble this up to the caller, handle with the Future
- throw e;
- } finally {
- scope.close();
- if (cdp != null) {
- RPC.stopProxy(cdp);
- }
- }
- return metadata;
- }
- }
-}