You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/30 20:23:12 UTC
[38/52] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 0000000,92d117c..be346a4
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@@ -1,0 -1,3135 +1,3240 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs;
+
+ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
+ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
+
+ import java.io.BufferedOutputStream;
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.net.InetAddress;
+ import java.net.InetSocketAddress;
+ import java.net.Socket;
+ import java.net.SocketAddress;
+ import java.net.URI;
+ import java.net.UnknownHostException;
+ import java.security.GeneralSecurityException;
+ import java.util.ArrayList;
+ import java.util.EnumSet;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Random;
+ import java.util.concurrent.SynchronousQueue;
+ import java.util.concurrent.ThreadLocalRandom;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+
+ import javax.net.SocketFactory;
+
+ import org.apache.hadoop.HadoopIllegalArgumentException;
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.crypto.CipherSuite;
+ import org.apache.hadoop.crypto.CryptoCodec;
+ import org.apache.hadoop.crypto.CryptoInputStream;
+ import org.apache.hadoop.crypto.CryptoOutputStream;
+ import org.apache.hadoop.crypto.CryptoProtocolVersion;
+ import org.apache.hadoop.crypto.key.KeyProvider;
+ import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+ import org.apache.hadoop.fs.BlockLocation;
+ import org.apache.hadoop.fs.CacheFlag;
+ import org.apache.hadoop.fs.ContentSummary;
+ import org.apache.hadoop.fs.CreateFlag;
+ import org.apache.hadoop.fs.FileAlreadyExistsException;
+ import org.apache.hadoop.fs.FileEncryptionInfo;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.FsServerDefaults;
+ import org.apache.hadoop.fs.FsStatus;
+ import org.apache.hadoop.fs.FsTracer;
+ import org.apache.hadoop.fs.HdfsBlockLocation;
+ import org.apache.hadoop.fs.InvalidPathException;
+ import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
+ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+ import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
+ import org.apache.hadoop.fs.Options;
+ import org.apache.hadoop.fs.Options.ChecksumOpt;
+ import org.apache.hadoop.fs.ParentNotDirectoryException;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.RemoteIterator;
+ import org.apache.hadoop.fs.StorageType;
+ import org.apache.hadoop.fs.UnresolvedLinkException;
+ import org.apache.hadoop.fs.XAttr;
+ import org.apache.hadoop.fs.XAttrSetFlag;
+ import org.apache.hadoop.fs.permission.AclEntry;
+ import org.apache.hadoop.fs.permission.AclStatus;
+ import org.apache.hadoop.fs.permission.FsAction;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
+ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+ import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+ import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+ import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
+ import org.apache.hadoop.hdfs.net.Peer;
+ import org.apache.hadoop.hdfs.protocol.AclException;
+ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+ import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+ import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
+ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+ import org.apache.hadoop.hdfs.protocol.CachePoolIterator;
+ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+ import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+ import org.apache.hadoop.hdfs.protocol.DatanodeID;
+ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+ import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+ import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+ import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
+ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
+ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+ import org.apache.hadoop.hdfs.util.IOUtilsClient;
+ import org.apache.hadoop.io.DataOutputBuffer;
+ import org.apache.hadoop.io.EnumSetWritable;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.io.MD5Hash;
+ import org.apache.hadoop.io.Text;
++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+ import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
+ import org.apache.hadoop.ipc.RPC;
+ import org.apache.hadoop.ipc.RemoteException;
+ import org.apache.hadoop.net.DNS;
+ import org.apache.hadoop.net.NetUtils;
+ import org.apache.hadoop.security.AccessControlException;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.security.token.TokenRenewer;
+ import org.apache.hadoop.util.Daemon;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.hadoop.util.DataChecksum.Type;
+ import org.apache.hadoop.util.Progressable;
+ import org.apache.hadoop.util.Time;
+ import org.apache.htrace.core.TraceScope;
+
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Joiner;
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.Lists;
+ import com.google.common.net.InetAddresses;
+ import org.apache.htrace.core.Tracer;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /********************************************************
+ * DFSClient can connect to a Hadoop Filesystem and
+ * perform basic file tasks. It uses the ClientProtocol
+ * to communicate with a NameNode daemon, and connects
+ * directly to DataNodes to read/write block data.
+ *
+ * Hadoop DFS users should obtain an instance of
+ * DistributedFileSystem, which uses DFSClient to handle
+ * filesystem tasks.
+ *
+ ********************************************************/
+ @InterfaceAudience.Private
+ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
+ DataEncryptionKeyFactory {
+ public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
+ public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
+
+ private final Configuration conf;
+ private final Tracer tracer;
+ private final DfsClientConf dfsClientConf;
+ final ClientProtocol namenode;
+ /* The service used for delegation tokens */
+ private Text dtService;
+
+ final UserGroupInformation ugi;
+ volatile boolean clientRunning = true;
+ volatile long lastLeaseRenewal;
+ private volatile FsServerDefaults serverDefaults;
+ private volatile long serverDefaultsLastUpdate;
+ final String clientName;
+ final SocketFactory socketFactory;
+ final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
+ final FileSystem.Statistics stats;
+ private final String authority;
+ private final Random r = new Random();
+ private SocketAddress[] localInterfaceAddrs;
+ private DataEncryptionKey encryptionKey;
+ final SaslDataTransferClient saslClient;
+ private final CachingStrategy defaultReadCachingStrategy;
+ private final CachingStrategy defaultWriteCachingStrategy;
+ private final ClientContext clientContext;
+
+ private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
+ new DFSHedgedReadMetrics();
+ private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
++ private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
+ private final int smallBufferSize;
+
+ public DfsClientConf getConf() {
+ return dfsClientConf;
+ }
+
+ Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * A map from file names to {@link DFSOutputStream} objects
+ * that are currently being written by this client.
+ * Note that a file can only be written by a single client.
+ */
+ private final Map<Long, DFSOutputStream> filesBeingWritten
+ = new HashMap<Long, DFSOutputStream>();
+
+ /**
+ * Same as this(NameNode.getNNAddress(conf), conf);
+ * @see #DFSClient(InetSocketAddress, Configuration)
+ * @deprecated Deprecated at 0.21
+ */
+ @Deprecated
+ public DFSClient(Configuration conf) throws IOException {
+ this(DFSUtilClient.getNNAddress(conf), conf);
+ }
+
+ public DFSClient(InetSocketAddress address, Configuration conf) throws IOException {
+ this(DFSUtilClient.getNNUri(address), conf);
+ }
+
+ /**
+ * Same as this(nameNodeUri, conf, null);
+ * @see #DFSClient(URI, Configuration, FileSystem.Statistics)
+ */
+ public DFSClient(URI nameNodeUri, Configuration conf
+ ) throws IOException {
+ this(nameNodeUri, conf, null);
+ }
+
+ /**
+ * Same as this(nameNodeUri, null, conf, stats);
+ * @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics)
+ */
+ public DFSClient(URI nameNodeUri, Configuration conf,
+ FileSystem.Statistics stats)
+ throws IOException {
+ this(nameNodeUri, null, conf, stats);
+ }
+
+ /**
+ * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
+ * If HA is enabled and a positive value is set for
+ * {@link HdfsClientConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY}
+ * in the configuration, the DFSClient will use
+ * {@link LossyRetryInvocationHandler} as its RetryInvocationHandler.
+ * Otherwise one of nameNodeUri or rpcNamenode must be null.
+ */
+ @VisibleForTesting
+ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
+ Configuration conf, FileSystem.Statistics stats)
+ throws IOException {
+ // Copy only the required DFSClient configuration
+ this.tracer = FsTracer.get(conf);
+ this.dfsClientConf = new DfsClientConf(conf);
+ this.conf = conf;
+ this.stats = stats;
+ this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
+ this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
+ this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
+
+ this.ugi = UserGroupInformation.getCurrentUser();
+
+ this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
+ this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
+ ThreadLocalRandom.current().nextInt() + "_" +
+ Thread.currentThread().getId();
+ int numResponseToDrop = conf.getInt(
+ HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
+ HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
+ ProxyAndInfo<ClientProtocol> proxyInfo = null;
+ AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
+
+ if (numResponseToDrop > 0) {
+ // This case is used for testing.
+ LOG.warn(HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
+ + " is set to " + numResponseToDrop
+ + ", this hacked client will proactively drop responses");
+ proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
+ nameNodeUri, ClientProtocol.class, numResponseToDrop,
+ nnFallbackToSimpleAuth);
+ }
+
+ if (proxyInfo != null) {
+ this.dtService = proxyInfo.getDelegationTokenService();
+ this.namenode = proxyInfo.getProxy();
+ } else if (rpcNamenode != null) {
+ // This case is used for testing.
+ Preconditions.checkArgument(nameNodeUri == null);
+ this.namenode = rpcNamenode;
+ dtService = null;
+ } else {
+ Preconditions.checkArgument(nameNodeUri != null,
+ "null URI");
+ proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf,
+ nameNodeUri, nnFallbackToSimpleAuth);
+ this.dtService = proxyInfo.getDelegationTokenService();
+ this.namenode = proxyInfo.getProxy();
+ }
+
+ String localInterfaces[] =
+ conf.getTrimmedStrings(HdfsClientConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
+ localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
+ if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
+ LOG.debug("Using local interfaces [" +
+ Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
+ Joiner.on(',').join(localInterfaceAddrs) + "]");
+ }
+
+ Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
+ null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
+ Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
+ null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
+ Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
+ null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
+ this.defaultReadCachingStrategy =
+ new CachingStrategy(readDropBehind, readahead);
+ this.defaultWriteCachingStrategy =
+ new CachingStrategy(writeDropBehind, readahead);
+ this.clientContext = ClientContext.get(
+ conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
+ dfsClientConf);
+
+ if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
- this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());
++ this.initThreadsNumForHedgedReads(dfsClientConf.
++ getHedgedReadThreadpoolSize());
+ }
++
++ this.initThreadsNumForStripedReads(dfsClientConf.
++ getStripedReadThreadpoolSize());
+ this.saslClient = new SaslDataTransferClient(
+ conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
+ }
+
+ /**
+ * Return the socket addresses to use with each configured
+ * local interface. Local interfaces may be specified by IP
+ * address, IP address range using CIDR notation, interface
+ * name (e.g. eth0) or sub-interface name (e.g. eth0:0).
+ * The socket addresses consist of the IPs for the interfaces
+ * and the ephemeral port (port 0). If an IP, IP range, or
+ * interface name matches an interface with sub-interfaces
+ * only the IP of the interface is used. Sub-interfaces can
+ * be used by specifying them explicitly (by IP or name).
+ *
+ * @return SocketAddresses for the configured local interfaces,
+ * or an empty array if none are configured
+ * @throws UnknownHostException if a given interface name is invalid
+ */
+ private static SocketAddress[] getLocalInterfaceAddrs(
+ String interfaceNames[]) throws UnknownHostException {
+ List<SocketAddress> localAddrs = new ArrayList<SocketAddress>();
+ for (String interfaceName : interfaceNames) {
+ if (InetAddresses.isInetAddress(interfaceName)) {
+ localAddrs.add(new InetSocketAddress(interfaceName, 0));
+ } else if (NetUtils.isValidSubnet(interfaceName)) {
+ for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
+ localAddrs.add(new InetSocketAddress(addr, 0));
+ }
+ } else {
+ for (String ip : DNS.getIPs(interfaceName, false)) {
+ localAddrs.add(new InetSocketAddress(ip, 0));
+ }
+ }
+ }
+ return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
+ }
+
+ /**
+ * Select one of the configured local interfaces at random. We use a random
+ * interface because other policies like round-robin are less effective
+ * given that we cache connections to datanodes.
+ *
+ * @return one of the local interface addresses at random, or null if no
+ * local interfaces are configured
+ */
+ SocketAddress getRandomLocalInterfaceAddr() {
+ if (localInterfaceAddrs.length == 0) {
+ return null;
+ }
+ final int idx = r.nextInt(localInterfaceAddrs.length);
+ final SocketAddress addr = localInterfaceAddrs[idx];
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using local interface " + addr);
+ }
+ return addr;
+ }
+
+ /**
+ * Return the timeout that clients should use when writing to datanodes.
+ * @param numNodes the number of nodes in the pipeline.
+ */
+ int getDatanodeWriteTimeout(int numNodes) {
+ final int t = dfsClientConf.getDatanodeSocketWriteTimeout();
+ return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
+ }
+
+ int getDatanodeReadTimeout(int numNodes) {
+ final int t = dfsClientConf.getSocketTimeout();
+ return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
+ }
+
+ @VisibleForTesting
+ public String getClientName() {
+ return clientName;
+ }
+
+ void checkOpen() throws IOException {
+ if (!clientRunning) {
+ IOException result = new IOException("Filesystem closed");
+ throw result;
+ }
+ }
+
+ /** Return the lease renewer instance. The renewer thread won't start
+ * until the first output stream is created. The same instance will
+ * be returned until all output streams are closed.
+ */
+ public LeaseRenewer getLeaseRenewer() throws IOException {
+ return LeaseRenewer.getInstance(authority, ugi, this);
+ }
+
+ /** Get a lease and start automatic renewal */
+ private void beginFileLease(final long inodeId, final DFSOutputStream out)
+ throws IOException {
+ getLeaseRenewer().put(inodeId, out, this);
+ }
+
+ /** Stop renewal of lease for the file. */
+ void endFileLease(final long inodeId) throws IOException {
+ getLeaseRenewer().closeFile(inodeId, this);
+ }
+
+
+ /** Put a file. Only called from LeaseRenewer, where proper locking is
+ * enforced to consistently update its local dfsclients array and
+ * client's filesBeingWritten map.
+ */
+ public void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
+ synchronized(filesBeingWritten) {
+ filesBeingWritten.put(inodeId, out);
+ // update the last lease renewal time only when there was no
+ // writes. once there is one write stream open, the lease renewer
+ // thread keeps it updated well with in anyone's expiration time.
+ if (lastLeaseRenewal == 0) {
+ updateLastLeaseRenewal();
+ }
+ }
+ }
+
+ /** Remove a file. Only called from LeaseRenewer. */
+ public void removeFileBeingWritten(final long inodeId) {
+ synchronized(filesBeingWritten) {
+ filesBeingWritten.remove(inodeId);
+ if (filesBeingWritten.isEmpty()) {
+ lastLeaseRenewal = 0;
+ }
+ }
+ }
+
+ /** Is file-being-written map empty? */
+ public boolean isFilesBeingWrittenEmpty() {
+ synchronized(filesBeingWritten) {
+ return filesBeingWritten.isEmpty();
+ }
+ }
+
+ /** @return true if the client is running */
+ public boolean isClientRunning() {
+ return clientRunning;
+ }
+
+ long getLastLeaseRenewal() {
+ return lastLeaseRenewal;
+ }
+
+ void updateLastLeaseRenewal() {
+ synchronized(filesBeingWritten) {
+ if (filesBeingWritten.isEmpty()) {
+ return;
+ }
+ lastLeaseRenewal = Time.monotonicNow();
+ }
+ }
+
+ /**
+ * Renew leases.
+ * @return true if lease was renewed. May return false if this
+ * client has been closed or has no files open.
+ **/
+ public boolean renewLease() throws IOException {
+ if (clientRunning && !isFilesBeingWrittenEmpty()) {
+ try {
+ namenode.renewLease(clientName);
+ updateLastLeaseRenewal();
+ return true;
+ } catch (IOException e) {
+ // Abort if the lease has already expired.
+ final long elapsed = Time.monotonicNow() - getLastLeaseRenewal();
+ if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) {
+ LOG.warn("Failed to renew lease for " + clientName + " for "
+ + (elapsed/1000) + " seconds (>= hard-limit ="
+ + (HdfsConstants.LEASE_HARDLIMIT_PERIOD / 1000) + " seconds.) "
+ + "Closing all files being written ...", e);
+ closeAllFilesBeingWritten(true);
+ } else {
+ // Let the lease renewer handle it and retry.
+ throw e;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Close connections the Namenode.
+ */
+ void closeConnectionToNamenode() {
+ RPC.stopProxy(namenode);
+ }
+
+ /** Close/abort all files being written. */
+ public void closeAllFilesBeingWritten(final boolean abort) {
+ for(;;) {
+ final long inodeId;
+ final DFSOutputStream out;
+ synchronized(filesBeingWritten) {
+ if (filesBeingWritten.isEmpty()) {
+ return;
+ }
+ inodeId = filesBeingWritten.keySet().iterator().next();
+ out = filesBeingWritten.remove(inodeId);
+ }
+ if (out != null) {
+ try {
+ if (abort) {
+ out.abort();
+ } else {
+ out.close();
+ }
+ } catch(IOException ie) {
+ LOG.error("Failed to " + (abort ? "abort" : "close") + " file: "
+ + out.getSrc() + " with inode: " + inodeId, ie);
+ }
+ }
+ }
+ }
+
+ /**
+ * Close the file system, abandoning all of the leases and files being
+ * created and close connections to the namenode.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if(clientRunning) {
+ closeAllFilesBeingWritten(false);
+ clientRunning = false;
+ getLeaseRenewer().closeClient(this);
+ // close connections to the namenode
+ closeConnectionToNamenode();
+ }
+ }
+
+ /**
+ * Close all open streams, abandoning all of the leases and files being
+ * created.
+ * @param abort whether streams should be gracefully closed
+ */
+ public void closeOutputStreams(boolean abort) {
+ if (clientRunning) {
+ closeAllFilesBeingWritten(abort);
+ }
+ }
+
+ /**
+ * @see ClientProtocol#getPreferredBlockSize(String)
+ */
+ public long getBlockSize(String f) throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("getBlockSize", f);
+ try {
+ return namenode.getPreferredBlockSize(f);
+ } catch (IOException ie) {
+ LOG.warn("Problem getting block size", ie);
+ throw ie;
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Get server default values for a number of configuration params.
+ * @see ClientProtocol#getServerDefaults()
+ */
+ public FsServerDefaults getServerDefaults() throws IOException {
+ checkOpen();
+ long now = Time.monotonicNow();
+ if ((serverDefaults == null) ||
+ (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD)) {
+ serverDefaults = namenode.getServerDefaults();
+ serverDefaultsLastUpdate = now;
+ }
+ assert serverDefaults != null;
+ return serverDefaults;
+ }
+
+ /**
+ * Get a canonical token service name for this client's tokens. Null should
+ * be returned if the client is not using tokens.
+ * @return the token service for the client
+ */
+ @InterfaceAudience.LimitedPrivate( { "HDFS" })
+ public String getCanonicalServiceName() {
+ return (dtService != null) ? dtService.toString() : null;
+ }
+
+ /**
+ * @see ClientProtocol#getDelegationToken(Text)
+ */
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+ throws IOException {
+ assert dtService != null;
+ TraceScope scope = tracer.newScope("getDelegationToken");
+ try {
+ Token<DelegationTokenIdentifier> token =
+ namenode.getDelegationToken(renewer);
+ if (token != null) {
+ token.setService(this.dtService);
+ LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
+ } else {
+ LOG.info("Cannot get delegation token from " + renewer);
+ }
+ return token;
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Renew a delegation token
+ * @param token the token to renew
+ * @return the new expiration time
+ * @throws InvalidToken
+ * @throws IOException
+ * @deprecated Use Token.renew instead.
+ */
+ @Deprecated
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
+ throws InvalidToken, IOException {
+ LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token));
+ try {
+ return token.renew(conf);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("caught interrupted", ie);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(InvalidToken.class,
+ AccessControlException.class);
+ }
+ }
+
+ /**
+ * Cancel a delegation token
+ * @param token the token to cancel
+ * @throws InvalidToken
+ * @throws IOException
+ * @deprecated Use Token.cancel instead.
+ */
+ @Deprecated
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
+ throws InvalidToken, IOException {
+ LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token));
+ try {
+ token.cancel(conf);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("caught interrupted", ie);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(InvalidToken.class,
+ AccessControlException.class);
+ }
+ }
+
+ @InterfaceAudience.Private
+ public static class Renewer extends TokenRenewer {
+
+ static {
+ //Ensure that HDFS Configuration files are loaded before trying to use
+ // the renewer.
+ HdfsConfigurationLoader.init();
+ }
+
+ @Override
+ public boolean handleKind(Text kind) {
+ return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public long renew(Token<?> token, Configuration conf) throws IOException {
+ Token<DelegationTokenIdentifier> delToken =
+ (Token<DelegationTokenIdentifier>) token;
+ ClientProtocol nn = getNNProxy(delToken, conf);
+ try {
+ return nn.renewDelegationToken(delToken);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(InvalidToken.class,
+ AccessControlException.class);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void cancel(Token<?> token, Configuration conf) throws IOException {
+ Token<DelegationTokenIdentifier> delToken =
+ (Token<DelegationTokenIdentifier>) token;
+ LOG.info("Cancelling " +
+ DelegationTokenIdentifier.stringifyToken(delToken));
+ ClientProtocol nn = getNNProxy(delToken, conf);
+ try {
+ nn.cancelDelegationToken(delToken);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(InvalidToken.class,
+ AccessControlException.class);
+ }
+ }
+
+ private static ClientProtocol getNNProxy(
+ Token<DelegationTokenIdentifier> token, Configuration conf)
+ throws IOException {
+ URI uri = HAUtilClient.getServiceUriFromToken(
+ HdfsConstants.HDFS_URI_SCHEME, token);
+ if (HAUtilClient.isTokenForLogicalUri(token) &&
+ !HAUtilClient.isLogicalUri(conf, uri)) {
+ // If the token is for a logical nameservice, but the configuration
+ // we have disagrees about that, we can't actually renew it.
+ // This can be the case in MR, for example, if the RM doesn't
+ // have all of the HA clusters configured in its configuration.
+ throw new IOException("Unable to map logical nameservice URI '" +
+ uri + "' to a NameNode. Local configuration does not have " +
+ "a failover proxy provider configured.");
+ }
+
+ ProxyAndInfo<ClientProtocol> info =
+ NameNodeProxiesClient.createProxyWithClientProtocol(conf, uri, null);
+ assert info.getDelegationTokenService().equals(token.getService()) :
+ "Returned service '" + info.getDelegationTokenService().toString() +
+ "' doesn't match expected service '" +
+ token.getService().toString() + "'";
+
+ return info.getProxy();
+ }
+
+ @Override
+ public boolean isManaged(Token<?> token) throws IOException {
+ return true;
+ }
+
+ }
+
+ /**
+ * Report corrupt blocks that were discovered by the client.
+ * @see ClientProtocol#reportBadBlocks(LocatedBlock[])
+ */
+ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+ checkOpen();
+ namenode.reportBadBlocks(blocks);
+ }
+
+ public LocatedBlocks getLocatedBlocks(String src, long start)
+ throws IOException {
+ return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
+ }
+
+ /*
+ * This is just a wrapper around callGetBlockLocations, but non-static so that
+ * we can stub it out for tests.
+ */
+ @VisibleForTesting
+ public LocatedBlocks getLocatedBlocks(String src, long start, long length)
+ throws IOException {
+ TraceScope scope = newPathTraceScope("getBlockLocations", src);
+ try {
+ return callGetBlockLocations(namenode, src, start, length);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * @see ClientProtocol#getBlockLocations(String, long, long)
+ */
+ static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
+ String src, long start, long length)
+ throws IOException {
+ try {
+ return namenode.getBlockLocations(src, start, length);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ UnresolvedPathException.class);
+ }
+ }
+
+ /**
+ * Recover a file's lease
+ * @param src a file's path
+ * @return true if the file is already closed
+ * @throws IOException
+ */
+ boolean recoverLease(String src) throws IOException {
+ checkOpen();
+
+ TraceScope scope = newPathTraceScope("recoverLease", src);
+ try {
+ return namenode.recoverLease(src, clientName);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(FileNotFoundException.class,
+ AccessControlException.class,
+ UnresolvedPathException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Get block location info about file
+ *
+ * getBlockLocations() returns a list of hostnames that store
+ * data for a specific file region. It returns a set of hostnames
+ * for every block within the indicated region.
+ *
+ * This function is very useful when writing code that considers
+ * data-placement when performing operations. For example, the
+ * MapReduce system tries to schedule tasks on the same machines
+ * as the data-block the task processes.
+ */
+ public BlockLocation[] getBlockLocations(String src, long start,
+ long length) throws IOException, UnresolvedLinkException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("getBlockLocations", src);
+ try {
+ LocatedBlocks blocks = getLocatedBlocks(src, start, length);
+ BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
+ HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
+ for (int i = 0; i < locations.length; i++) {
+ hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
+ }
+ return hdfsLocations;
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Decrypts a EDEK by consulting the KeyProvider.
+ */
+ private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
+ feInfo) throws IOException {
+ TraceScope scope = tracer.newScope("decryptEDEK");
+ try {
+ KeyProvider provider = getKeyProvider();
+ if (provider == null) {
+ throw new IOException("No KeyProvider is configured, cannot access" +
+ " an encrypted file");
+ }
+ EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
+ feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
+ feInfo.getEncryptedDataEncryptionKey());
+ try {
+ KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
+ .createKeyProviderCryptoExtension(provider);
+ return cryptoProvider.decryptEncryptedKey(ekv);
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Obtain the crypto protocol version from the provided FileEncryptionInfo,
+ * checking to see if this version is supported by.
+ *
+ * @param feInfo FileEncryptionInfo
+ * @return CryptoProtocolVersion from the feInfo
+ * @throws IOException if the protocol version is unsupported.
+ */
+ private static CryptoProtocolVersion getCryptoProtocolVersion
+ (FileEncryptionInfo feInfo) throws IOException {
+ final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion();
+ if (!CryptoProtocolVersion.supports(version)) {
+ throw new IOException("Client does not support specified " +
+ "CryptoProtocolVersion " + version.getDescription() + " version " +
+ "number" + version.getVersion());
+ }
+ return version;
+ }
+
+ /**
+ * Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo
+ * and the available CryptoCodecs configured in the Configuration.
+ *
+ * @param conf Configuration
+ * @param feInfo FileEncryptionInfo
+ * @return CryptoCodec
+ * @throws IOException if no suitable CryptoCodec for the CipherSuite is
+ * available.
+ */
+ private static CryptoCodec getCryptoCodec(Configuration conf,
+ FileEncryptionInfo feInfo) throws IOException {
+ final CipherSuite suite = feInfo.getCipherSuite();
+ if (suite.equals(CipherSuite.UNKNOWN)) {
+ throw new IOException("NameNode specified unknown CipherSuite with ID "
+ + suite.getUnknownValue() + ", cannot instantiate CryptoCodec.");
+ }
+ final CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
+ if (codec == null) {
+ throw new UnknownCipherSuiteException(
+ "No configuration found for the cipher suite "
+ + suite.getConfigSuffix() + " prefixed with "
+ + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX
+ + ". Please see the example configuration "
+ + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE "
+ + "at core-default.xml for details.");
+ }
+ return codec;
+ }
+
+ /**
+ * Wraps the stream in a CryptoInputStream if the underlying file is
+ * encrypted.
+ */
+ public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
+ throws IOException {
+ final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
+ if (feInfo != null) {
+ // File is encrypted, wrap the stream in a crypto stream.
+ // Currently only one version, so no special logic based on the version #
+ getCryptoProtocolVersion(feInfo);
+ final CryptoCodec codec = getCryptoCodec(conf, feInfo);
+ final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
+ final CryptoInputStream cryptoIn =
+ new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
+ feInfo.getIV());
+ return new HdfsDataInputStream(cryptoIn);
+ } else {
+ // No FileEncryptionInfo so no encryption.
+ return new HdfsDataInputStream(dfsis);
+ }
+ }
+
+ /**
+ * Wraps the stream in a CryptoOutputStream if the underlying file is
+ * encrypted.
+ */
+ public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
+ FileSystem.Statistics statistics) throws IOException {
+ return createWrappedOutputStream(dfsos, statistics, 0);
+ }
+
+ /**
+ * Wraps the stream in a CryptoOutputStream if the underlying file is
+ * encrypted.
+ */
+ public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
+ FileSystem.Statistics statistics, long startPos) throws IOException {
+ final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo();
+ if (feInfo != null) {
+ // File is encrypted, wrap the stream in a crypto stream.
+ // Currently only one version, so no special logic based on the version #
+ getCryptoProtocolVersion(feInfo);
+ final CryptoCodec codec = getCryptoCodec(conf, feInfo);
+ KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
+ final CryptoOutputStream cryptoOut =
+ new CryptoOutputStream(dfsos, codec,
+ decrypted.getMaterial(), feInfo.getIV(), startPos);
+ return new HdfsDataOutputStream(cryptoOut, statistics, startPos);
+ } else {
+ // No FileEncryptionInfo present so no encryption.
+ return new HdfsDataOutputStream(dfsos, statistics, startPos);
+ }
+ }
+
+ public DFSInputStream open(String src)
+ throws IOException, UnresolvedLinkException {
+ return open(src, dfsClientConf.getIoBufferSize(), true, null);
+ }
+
+ /**
+ * Create an input stream that obtains a nodelist from the
+ * namenode, and then reads from all the right places. Creates
+ * inner subclass of InputStream that does the right out-of-band
+ * work.
+ * @deprecated Use {@link #open(String, int, boolean)} instead.
+ */
+ @Deprecated
+ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
+ FileSystem.Statistics stats)
+ throws IOException, UnresolvedLinkException {
+ return open(src, buffersize, verifyChecksum);
+ }
+
+
+ /**
+ * Create an input stream that obtains a nodelist from the
+ * namenode, and then reads from all the right places. Creates
+ * inner subclass of InputStream that does the right out-of-band
+ * work.
+ */
+ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
+ throws IOException, UnresolvedLinkException {
+ checkOpen();
+ // Get block info from namenode
+ TraceScope scope = newPathTraceScope("newDFSInputStream", src);
+ try {
- return new DFSInputStream(this, src, verifyChecksum, null);
++ LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);
++ if (locatedBlocks != null) {
++ ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy();
++ if (ecPolicy != null) {
++ return new DFSStripedInputStream(this, src, verifyChecksum, ecPolicy,
++ locatedBlocks);
++ }
++ return new DFSInputStream(this, src, verifyChecksum, locatedBlocks);
++ } else {
++ throw new IOException("Cannot open filename " + src);
++ }
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Get the namenode associated with this DFSClient object
+ * @return the namenode associated with this DFSClient object
+ */
+ public ClientProtocol getNamenode() {
+ return namenode;
+ }
+
+ /**
+ * Call {@link #create(String, boolean, short, long, Progressable)} with
+ * default <code>replication</code> and <code>blockSize<code> and null <code>
+ * progress</code>.
+ */
+ public OutputStream create(String src, boolean overwrite)
+ throws IOException {
+ return create(src, overwrite, dfsClientConf.getDefaultReplication(),
+ dfsClientConf.getDefaultBlockSize(), null);
+ }
+
+ /**
+ * Call {@link #create(String, boolean, short, long, Progressable)} with
+ * default <code>replication</code> and <code>blockSize<code>.
+ */
+ public OutputStream create(String src,
+ boolean overwrite,
+ Progressable progress) throws IOException {
+ return create(src, overwrite, dfsClientConf.getDefaultReplication(),
+ dfsClientConf.getDefaultBlockSize(), progress);
+ }
+
+ /**
+ * Call {@link #create(String, boolean, short, long, Progressable)} with
+ * null <code>progress</code>.
+ */
+ public OutputStream create(String src,
+ boolean overwrite,
+ short replication,
+ long blockSize) throws IOException {
+ return create(src, overwrite, replication, blockSize, null);
+ }
+
+ /**
+ * Call {@link #create(String, boolean, short, long, Progressable, int)}
+ * with default bufferSize.
+ */
+ public OutputStream create(String src, boolean overwrite, short replication,
+ long blockSize, Progressable progress) throws IOException {
+ return create(src, overwrite, replication, blockSize, progress,
+ dfsClientConf.getIoBufferSize());
+ }
+
+ /**
+ * Call {@link #create(String, FsPermission, EnumSet, short, long,
+ * Progressable, int, ChecksumOpt)} with default <code>permission</code>
+ * {@link FsPermission#getFileDefault()}.
+ *
+ * @param src File name
+ * @param overwrite overwrite an existing file if true
+ * @param replication replication factor for the file
+ * @param blockSize maximum block size
+ * @param progress interface for reporting client progress
+ * @param buffersize underlying buffersize
+ *
+ * @return output stream
+ */
+ public OutputStream create(String src,
+ boolean overwrite,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ int buffersize)
+ throws IOException {
+ return create(src, FsPermission.getFileDefault(),
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+ buffersize, null);
+ }
+
+ /**
+ * Call {@link #create(String, FsPermission, EnumSet, boolean, short,
+ * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
+ * set to true.
+ */
+ public DFSOutputStream create(String src,
+ FsPermission permission,
+ EnumSet<CreateFlag> flag,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ int buffersize,
+ ChecksumOpt checksumOpt)
+ throws IOException {
+ return create(src, permission, flag, true,
+ replication, blockSize, progress, buffersize, checksumOpt, null);
+ }
+
+ /**
+ * Create a new dfs file with the specified block replication
+ * with write-progress reporting and return an output stream for writing
+ * into the file.
+ *
+ * @param src File name
+ * @param permission The permission of the directory being created.
+ * If null, use default permission {@link FsPermission#getFileDefault()}
+ * @param flag indicates create a new file or create/overwrite an
+ * existing file or append to an existing file
+ * @param createParent create missing parent directory if true
+ * @param replication block replication
+ * @param blockSize maximum block size
+ * @param progress interface for reporting client progress
+ * @param buffersize underlying buffer size
+ * @param checksumOpt checksum options
+ *
+ * @return output stream
+ *
+ * @see ClientProtocol#create for detailed description of exceptions thrown
+ */
+ public DFSOutputStream create(String src,
+ FsPermission permission,
+ EnumSet<CreateFlag> flag,
+ boolean createParent,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ int buffersize,
+ ChecksumOpt checksumOpt) throws IOException {
- return create(src, permission, flag, createParent, replication, blockSize,
++ return create(src, permission, flag, createParent, replication, blockSize,
+ progress, buffersize, checksumOpt, null);
+ }
+
+ private FsPermission applyUMask(FsPermission permission) {
+ if (permission == null) {
+ permission = FsPermission.getFileDefault();
+ }
+ return permission.applyUMask(dfsClientConf.getUMask());
+ }
+
+ /**
+ * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
+ * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
+ * a hint to where the namenode should place the file blocks.
+ * The favored nodes hint is not persisted in HDFS. Hence it may be honored
+ * at the creation time only. HDFS could move the blocks during balancing or
+ * replication, to move the blocks from favored nodes. A value of null means
+ * no favored nodes for this create
+ */
+ public DFSOutputStream create(String src,
+ FsPermission permission,
+ EnumSet<CreateFlag> flag,
+ boolean createParent,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ int buffersize,
+ ChecksumOpt checksumOpt,
+ InetSocketAddress[] favoredNodes) throws IOException {
+ checkOpen();
+ final FsPermission masked = applyUMask(permission);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(src + ": masked=" + masked);
+ }
+ final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
+ src, masked, flag, createParent, replication, blockSize, progress,
+ buffersize, dfsClientConf.createChecksum(checksumOpt),
+ getFavoredNodesStr(favoredNodes));
+ beginFileLease(result.getFileId(), result);
+ return result;
+ }
+
+ private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) {
+ String[] favoredNodeStrs = null;
+ if (favoredNodes != null) {
+ favoredNodeStrs = new String[favoredNodes.length];
+ for (int i = 0; i < favoredNodes.length; i++) {
+ favoredNodeStrs[i] =
+ favoredNodes[i].getHostName() + ":"
+ + favoredNodes[i].getPort();
+ }
+ }
+ return favoredNodeStrs;
+ }
+
+ /**
+ * Append to an existing file if {@link CreateFlag#APPEND} is present
+ */
+ private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
+ int buffersize, Progressable progress) throws IOException {
+ if (flag.contains(CreateFlag.APPEND)) {
+ HdfsFileStatus stat = getFileInfo(src);
+ if (stat == null) { // No file to append to
+ // New file needs to be created if create option is present
+ if (!flag.contains(CreateFlag.CREATE)) {
+ throw new FileNotFoundException("failed to append to non-existent file "
+ + src + " on client " + clientName);
+ }
+ return null;
+ }
+ return callAppend(src, buffersize, flag, progress, null);
+ }
+ return null;
+ }
+
+ /**
+ * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
+ * Progressable, int, ChecksumOpt)} except that the permission
+ * is absolute (ie has already been masked with umask.
+ */
+ public DFSOutputStream primitiveCreate(String src,
+ FsPermission absPermission,
+ EnumSet<CreateFlag> flag,
+ boolean createParent,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ int buffersize,
+ ChecksumOpt checksumOpt)
+ throws IOException, UnresolvedLinkException {
+ checkOpen();
+ CreateFlag.validate(flag);
+ DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
+ if (result == null) {
+ DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
+ result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
+ flag, createParent, replication, blockSize, progress, buffersize,
+ checksum, null);
+ }
+ beginFileLease(result.getFileId(), result);
+ return result;
+ }
+
+ /**
+ * Creates a symbolic link.
+ *
+ * @see ClientProtocol#createSymlink(String, String,FsPermission, boolean)
+ */
+ public void createSymlink(String target, String link, boolean createParent)
+ throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("createSymlink", target);
+ try {
+ final FsPermission dirPerm = applyUMask(null);
+ namenode.createSymlink(target, link, dirPerm, createParent);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileAlreadyExistsException.class,
+ FileNotFoundException.class,
+ ParentNotDirectoryException.class,
+ NSQuotaExceededException.class,
+ DSQuotaExceededException.class,
+ QuotaByStorageTypeExceededException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Resolve the *first* symlink, if any, in the path.
+ *
+ * @see ClientProtocol#getLinkTarget(String)
+ */
+ public String getLinkTarget(String path) throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("getLinkTarget", path);
+ try {
+ return namenode.getLinkTarget(path);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /** Method to get stream returned by append call */
+ private DFSOutputStream callAppend(String src, int buffersize,
+ EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes)
+ throws IOException {
+ CreateFlag.validateForAppend(flag);
+ try {
+ LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
+ new EnumSetWritable<>(flag, CreateFlag.class));
+ return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize,
+ progress, blkWithStatus.getLastBlock(),
+ blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(null),
+ favoredNodes);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ SafeModeException.class,
+ DSQuotaExceededException.class,
+ QuotaByStorageTypeExceededException.class,
+ UnsupportedOperationException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class);
+ }
+ }
+
+ /**
+ * Append to an existing HDFS file.
+ *
+ * @param src file name
+ * @param buffersize buffer size
+ * @param flag indicates whether to append data to a new block instead of
+ * the last block
+ * @param progress for reporting write-progress; null is acceptable.
+ * @param statistics file system statistics; null is acceptable.
+ * @return an output stream for writing into the file
+ *
+ * @see ClientProtocol#append(String, String, EnumSetWritable)
+ */
+ public HdfsDataOutputStream append(final String src, final int buffersize,
+ EnumSet<CreateFlag> flag, final Progressable progress,
+ final FileSystem.Statistics statistics) throws IOException {
+ final DFSOutputStream out = append(src, buffersize, flag, null, progress);
+ return createWrappedOutputStream(out, statistics, out.getInitialLen());
+ }
+
+ /**
+ * Append to an existing HDFS file.
+ *
+ * @param src file name
+ * @param buffersize buffer size
+ * @param flag indicates whether to append data to a new block instead of the
+ * last block
+ * @param progress for reporting write-progress; null is acceptable.
+ * @param statistics file system statistics; null is acceptable.
+ * @param favoredNodes FavoredNodes for new blocks
+ * @return an output stream for writing into the file
+ * @see ClientProtocol#append(String, String, EnumSetWritable)
+ */
+ public HdfsDataOutputStream append(final String src, final int buffersize,
+ EnumSet<CreateFlag> flag, final Progressable progress,
+ final FileSystem.Statistics statistics,
+ final InetSocketAddress[] favoredNodes) throws IOException {
+ final DFSOutputStream out = append(src, buffersize, flag,
+ getFavoredNodesStr(favoredNodes), progress);
+ return createWrappedOutputStream(out, statistics, out.getInitialLen());
+ }
+
+ private DFSOutputStream append(String src, int buffersize,
+ EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress)
+ throws IOException {
+ checkOpen();
+ final DFSOutputStream result = callAppend(src, buffersize, flag, progress,
+ favoredNodes);
+ beginFileLease(result.getFileId(), result);
+ return result;
+ }
+
+ /**
+ * Set replication for an existing file.
+ * @param src file name
+ * @param replication replication to set the file to
+ *
+ * @see ClientProtocol#setReplication(String, short)
+ */
+ public boolean setReplication(String src, short replication)
+ throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("setReplication", src);
+ try {
+ return namenode.setReplication(src, replication);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ SafeModeException.class,
+ DSQuotaExceededException.class,
+ QuotaByStorageTypeExceededException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Set storage policy for an existing file/directory
+ * @param src file/directory name
+ * @param policyName name of the storage policy
+ */
+ public void setStoragePolicy(String src, String policyName)
+ throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("setStoragePolicy", src);
+ try {
+ namenode.setStoragePolicy(src, policyName);
+ } catch (RemoteException e) {
+ throw e.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ SafeModeException.class,
+ NSQuotaExceededException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * @param path file/directory name
+ * @return Get the storage policy for specified path
+ */
+ public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("getStoragePolicy", path);
+ try {
+ return namenode.getStoragePolicy(path);
+ } catch (RemoteException e) {
+ throw e.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ SafeModeException.class,
+ UnresolvedPathException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * @return All the existing storage policies
+ */
+ public BlockStoragePolicy[] getStoragePolicies() throws IOException {
+ checkOpen();
+ TraceScope scope = tracer.newScope("getStoragePolicies");
+ try {
+ return namenode.getStoragePolicies();
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Rename file or directory.
+ * @see ClientProtocol#rename(String, String)
+ * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
+ */
+ @Deprecated
+ public boolean rename(String src, String dst) throws IOException {
+ checkOpen();
+ TraceScope scope = newSrcDstTraceScope("rename", src, dst);
+ try {
+ return namenode.rename(src, dst);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ NSQuotaExceededException.class,
+ DSQuotaExceededException.class,
+ QuotaByStorageTypeExceededException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Move blocks from src to trg and delete src
+ * See {@link ClientProtocol#concat}.
+ */
+ public void concat(String trg, String [] srcs) throws IOException {
+ checkOpen();
+ TraceScope scope = tracer.newScope("concat");
+ try {
+ namenode.concat(trg, srcs);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
+ }
+ }
+ /**
+ * Rename file or directory.
+ * @see ClientProtocol#rename2(String, String, Options.Rename...)
+ */
+ public void rename(String src, String dst, Options.Rename... options)
+ throws IOException {
+ checkOpen();
+ TraceScope scope = newSrcDstTraceScope("rename2", src, dst);
+ try {
+ namenode.rename2(src, dst, options);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ DSQuotaExceededException.class,
+ QuotaByStorageTypeExceededException.class,
+ FileAlreadyExistsException.class,
+ FileNotFoundException.class,
+ ParentNotDirectoryException.class,
+ SafeModeException.class,
+ NSQuotaExceededException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Truncate a file to an indicated size
+ * See {@link ClientProtocol#truncate}.
+ */
+ public boolean truncate(String src, long newLength) throws IOException {
+ checkOpen();
+ if (newLength < 0) {
+ throw new HadoopIllegalArgumentException(
+ "Cannot truncate to a negative file size: " + newLength + ".");
+ }
+ TraceScope scope = newPathTraceScope("truncate", src);
+ try {
+ return namenode.truncate(src, newLength, clientName);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ UnresolvedPathException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Delete file or directory.
+ * See {@link ClientProtocol#delete(String, boolean)}.
+ */
+ @Deprecated
+ public boolean delete(String src) throws IOException {
+ checkOpen();
+ return delete(src, true);
+ }
+
+ /**
+ * delete file or directory.
+ * delete contents of the directory if non empty and recursive
+ * set to true
+ *
+ * @see ClientProtocol#delete(String, boolean)
+ */
+ public boolean delete(String src, boolean recursive) throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("delete", src);
+ try {
+ return namenode.delete(src, recursive);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ SafeModeException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /** Implemented using getFileInfo(src)
+ */
+ public boolean exists(String src) throws IOException {
+ checkOpen();
+ return getFileInfo(src) != null;
+ }
+
+ /**
+ * Get a partial listing of the indicated directory
+ * No block locations need to be fetched
+ */
+ public DirectoryListing listPaths(String src, byte[] startAfter)
+ throws IOException {
+ return listPaths(src, startAfter, false);
+ }
+
+ /**
+ * Get a partial listing of the indicated directory
+ *
+ * Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter
+ * if the application wants to fetch a listing starting from
+ * the first entry in the directory
+ *
+ * @see ClientProtocol#getListing(String, byte[], boolean)
+ */
+ public DirectoryListing listPaths(String src, byte[] startAfter,
+ boolean needLocation) throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("listPaths", src);
+ try {
+ return namenode.getListing(src, startAfter, needLocation);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ UnresolvedPathException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Get the file info for a specific file or directory.
+ * @param src The string representation of the path to the file
+ * @return object containing information regarding the file
+ * or null if file not found
+ *
+ * @see ClientProtocol#getFileInfo(String) for description of exceptions
+ */
+ public HdfsFileStatus getFileInfo(String src) throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("getFileInfo", src);
+ try {
+ return namenode.getFileInfo(src);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ UnresolvedPathException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Close status of a file
+ * @return true if file is already closed
+ */
+ public boolean isFileClosed(String src) throws IOException{
+ checkOpen();
+ TraceScope scope = newPathTraceScope("isFileClosed", src);
+ try {
+ return namenode.isFileClosed(src);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ UnresolvedPathException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Get the file info for a specific file or directory. If src
+ * refers to a symlink then the FileStatus of the link is returned.
+ * @param src path to a file or directory.
+ *
+ * For description of exceptions thrown
+ * @see ClientProtocol#getFileLinkInfo(String)
+ */
+ public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("getFileLinkInfo", src);
+ try {
+ return namenode.getFileLinkInfo(src);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ UnresolvedPathException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ @InterfaceAudience.Private
+ public void clearDataEncryptionKey() {
+ LOG.debug("Clearing encryption key");
+ synchronized (this) {
+ encryptionKey = null;
+ }
+ }
+
+ /**
+ * @return true if data sent between this client and DNs should be encrypted,
+ * false otherwise.
+ * @throws IOException in the event of error communicating with the NN
+ */
+ boolean shouldEncryptData() throws IOException {
+ FsServerDefaults d = getServerDefaults();
+ return d == null ? false : d.getEncryptDataTransfer();
+ }
+
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() throws IOException {
+ if (shouldEncryptData()) {
+ synchronized (this) {
+ if (encryptionKey == null ||
+ encryptionKey.expiryDate < Time.now()) {
+ LOG.debug("Getting new encryption token from NN");
+ encryptionKey = namenode.getDataEncryptionKey();
+ }
+ return encryptionKey;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get the checksum of the whole file of a range of the file. Note that the
+ * range always starts from the beginning of the file.
+ * @param src The file path
+ * @param length the length of the range, i.e., the range is [0, length]
+ * @return The checksum
+ * @see DistributedFileSystem#getFileChecksum(Path)
+ */
+ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
+ throws IOException {
+ checkOpen();
+ Preconditions.checkArgument(length >= 0);
+ //get block locations for the file range
+ LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
+ length);
+ if (null == blockLocations) {
+ throw new FileNotFoundException("File does not exist: " + src);
+ }
+ if (blockLocations.isUnderConstruction()) {
+ throw new IOException("Fail to get checksum, since file " + src
+ + " is under construction.");
+ }
+ List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
+ final DataOutputBuffer md5out = new DataOutputBuffer();
+ int bytesPerCRC = -1;
+ DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
+ long crcPerBlock = 0;
+ boolean refetchBlocks = false;
+ int lastRetriedIndex = -1;
+
+ // get block checksum for each block
+ long remaining = length;
+ if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
+ remaining = Math.min(length, blockLocations.getFileLength());
+ }
+ for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
+ if (refetchBlocks) { // refetch to get fresh tokens
+ blockLocations = callGetBlockLocations(namenode, src, 0, length);
+ if (null == blockLocations) {
+ throw new FileNotFoundException("File does not exist: " + src);
+ }
+ if (blockLocations.isUnderConstruction()) {
+ throw new IOException("Fail to get checksum, since file " + src
+ + " is under construction.");
+ }
+ locatedblocks = blockLocations.getLocatedBlocks();
+ refetchBlocks = false;
+ }
+ LocatedBlock lb = locatedblocks.get(i);
+ final ExtendedBlock block = lb.getBlock();
+ if (remaining < block.getNumBytes()) {
+ block.setNumBytes(remaining);
+ }
+ remaining -= block.getNumBytes();
+ final DatanodeInfo[] datanodes = lb.getLocations();
+
+ //try each datanode location of the block
+ final int timeout = 3000*datanodes.length + dfsClientConf.getSocketTimeout();
+ boolean done = false;
+ for(int j = 0; !done && j < datanodes.length; j++) {
+ DataOutputStream out = null;
+ DataInputStream in = null;
+
+ try {
+ //connect to a datanode
+ IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
+ out = new DataOutputStream(new BufferedOutputStream(pair.out,
+ smallBufferSize));
+ in = new DataInputStream(pair.in);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("write to " + datanodes[j] + ": "
+ + Op.BLOCK_CHECKSUM + ", block=" + block);
+ }
+ // get block MD5
+ new Sender(out).blockChecksum(block, lb.getBlockToken());
+
+ final BlockOpResponseProto reply =
+ BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
+
+ String logInfo = "for block " + block + " from datanode " + datanodes[j];
+ DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
+
+ OpBlockChecksumResponseProto checksumData =
+ reply.getChecksumResponse();
+
+ //read byte-per-checksum
+ final int bpc = checksumData.getBytesPerCrc();
+ if (i == 0) { //first block
+ bytesPerCRC = bpc;
+ }
+ else if (bpc != bytesPerCRC) {
+ throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ + " but bytesPerCRC=" + bytesPerCRC);
+ }
+
+ //read crc-per-block
+ final long cpb = checksumData.getCrcPerBlock();
+ if (locatedblocks.size() > 1 && i == 0) {
+ crcPerBlock = cpb;
+ }
+
+ //read md5
+ final MD5Hash md5 = new MD5Hash(
+ checksumData.getMd5().toByteArray());
+ md5.write(md5out);
+
+ // read crc-type
+ final DataChecksum.Type ct;
+ if (checksumData.hasCrcType()) {
+ ct = PBHelperClient.convert(checksumData
+ .getCrcType());
+ } else {
+ LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+ "inferring checksum by reading first byte");
+ ct = inferChecksumTypeByReading(lb, datanodes[j]);
+ }
+
+ if (i == 0) { // first block
+ crcType = ct;
+ } else if (crcType != DataChecksum.Type.MIXED
+ && crcType != ct) {
+ // if crc types are mixed in a file
+ crcType = DataChecksum.Type.MIXED;
+ }
+
+ done = true;
+
+ if (LOG.isDebugEnabled()) {
+ if (i == 0) {
+ LOG.debug("set bytesPerCRC=" + bytesPerCRC
+ + ", crcPerBlock=" + crcPerBlock);
+ }
+ LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
+ }
+ } catch (InvalidBlockTokenException ibte) {
+ if (i > lastRetriedIndex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ + "for file " + src + " for block " + block
+ + " from datanode " + datanodes[j]
+ + ". Will retry the block once.");
+ }
+ lastRetriedIndex = i;
+ done = true; // actually it's not done; but we'll retry
+ i--; // repeat at i-th block
+ refetchBlocks = true;
+ break;
+ }
+ } catch (IOException ie) {
+ LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
+ } finally {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ }
+ }
+
+ if (!done) {
+ throw new IOException("Fail to get block MD5 for " + block);
+ }
+ }
+
+ //compute file MD5
+ final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
+ switch (crcType) {
+ case CRC32:
+ return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
+ crcPerBlock, fileMD5);
+ case CRC32C:
+ return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
+ crcPerBlock, fileMD5);
+ default:
+ // If there is no block allocated for the file,
+ // return one with the magic entry that matches what previous
+ // hdfs versions return.
+ if (locatedblocks.size() == 0) {
+ return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
+ }
+
+ // we should never get here since the validity was checked
+ // when getCrcType() was called above.
+ return null;
+ }
+ }
+
+ /**
+ * Connect to the given datanode's datantrasfer port, and return
+ * the resulting IOStreamPair. This includes encryption wrapping, etc.
+ */
+ private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
+ LocatedBlock lb) throws IOException {
+ boolean success = false;
+ Socket sock = null;
+ try {
+ sock = socketFactory.createSocket();
+ String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to datanode " + dnAddr);
+ }
+ NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
+ sock.setSoTimeout(timeout);
+
+ OutputStream unbufOut = NetUtils.getOutputStream(sock);
+ InputStream unbufIn = NetUtils.getInputStream(sock);
+ IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
+ lb.getBlockToken(), dn);
+ success = true;
+ return ret;
+ } finally {
+ if (!success) {
+ IOUtils.closeSocket(sock);
+ }
+ }
+ }
+
+ /**
+ * Infer the checksum type for a replica by sending an OP_READ_BLOCK
+ * for the first byte of that replica. This is used for compatibility
+ * with older HDFS versions which did not include the checksum type in
+ * OpBlockChecksumResponseProto.
+ *
+ * @param lb the located block
+ * @param dn the connected datanode
+ * @return the inferred checksum type
+ * @throws IOException if an error occurs
+ */
+ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
+ throws IOException {
+ IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb);
+
+ try {
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
+ smallBufferSize));
+ DataInputStream in = new DataInputStream(pair.in);
+
+ new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
+ 0, 1, true, CachingStrategy.newDefaultStrategy());
+ final BlockOpResponseProto reply =
+ BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
+ String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn;
+ DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
+
+ return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
+ } finally {
+ IOUtilsClient.cleanup(null, pair.in, pair.out);
+ }
+ }
+
+ /**
+ * Set permissions to a file or directory.
+ * @param src path name.
+ * @param permission permission to set to
+ *
+ * @see ClientProtocol#setPermission(String, FsPermission)
+ */
+ public void setPermission(String src, FsPermission permission)
+ throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("setPermission", src);
+ try {
+ namenode.setPermission(src, permission);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ SafeModeException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Set file or directory owner.
+ * @param src path name.
+ * @param username user id.
+ * @param groupname user group.
+ *
+ * @see ClientProtocol#setOwner(String, String, String)
+ */
+ public void setOwner(String src, String username, String groupname)
+ throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("setOwner", src);
+ try {
+ namenode.setOwner(src, username, groupname);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class,
+ SafeModeException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ private long[] callGetStats() throws IOException {
+ checkOpen();
+ TraceScope scope = tracer.newScope("getStats");
+ try {
+ return namenode.getStats();
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * @see ClientProtocol#getStats()
+ */
+ public FsStatus getDiskStatus() throws IOException {
+ long rawNums[] = callGetStats();
+ return new FsStatus(rawNums[0], rawNums[1], rawNums[2]);
+ }
+
+ /**
+ * Returns count of blocks with no good replicas left. Normally should be
+ * zero.
+ * @throws IOException
+ */
+ public long getMissingBlocksCount() throws IOException {
+ return callGetStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX];
+ }
+
+ /**
+ * Returns count of blocks with replication factor 1 and have
+ * lost the only replica.
+ * @throws IOException
+ */
+ public long getMissingReplOneBlocksCount() throws IOException {
+ return callGetStats()[ClientProtocol.
+ GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX];
+ }
+
+ /**
+ * Returns count of blocks with one of more replica missing.
+ * @throws IOException
+ */
+ public long getUnderReplicatedBlocksCount() throws IOException {
+ return callGetStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX];
+ }
+
+ /**
+ * Returns count of blocks with at least one replica marked corrupt.
+ * @throws IOException
+ */
+ public long getCorruptBlocksCount() throws IOException {
+ return callGetStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX];
+ }
+
+ /**
+ * @return a list in which each entry describes a corrupt file/block
+ * @throws IOException
+ */
+ public CorruptFileBlocks listCorruptFileBlocks(String path,
+ String cookie)
+ throws IOException {
+ checkOpen();
+ TraceScope scope = newPathTraceScope("listCorruptFileBlocks", path);
+ try {
+ return namenode.listCorruptFileBlocks(path, cookie);
+ } finally {
+ scope.close();
+ }
+ }
+
+ public DatanodeInfo[] datanodeReport(DatanodeReportType type)
+ throws IOException {
+ checkOpen();
+ TraceScope scope = tracer.newScope("datanodeReport");
+ try {
+ return namenode.getDatanodeReport(type);
+ } finally {
+ scope.close();
+ }
+ }
+
+ public DatanodeStorageReport[] getDatanodeStorageReport(
+ DatanodeReportType type) throws IOException {
+ checkOpen();
+ TraceScope scope =
+ tracer.newScope("datanodeStorageReport");
+ try {
+ return namenode.getDatanodeStorageReport(type);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Enter, leave or get safe mode.
+ *
+ * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction,boolean)
+ */
+ public boolean setSafeMode(SafeModeAction action) throws IOException {
+ checkOpen();
+ return setSafeMode(action, false);
+ }
+
+ /**
+ * Enter, leave or get safe mode.
+ *
+ * @param action
+ * One of SafeModeAction.GET, SafeModeAction.ENTER and
+ * SafeModeActiob.LEAVE
+ * @param isChecked
+ * If true, then check only active namenode's safemode status, else
+ * check first namenode's status.
+ * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
+ */
+ public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{
+ TraceScope scope =
+ tracer.newScope("setSafeMode");
+ try {
+ return namenode.setSafeMode(action, isChecked);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Create one snapshot.
+ *
+ * @param snapshotRoot The directory where the snapshot is to be taken
+ * @param snapshotName Name of the snapshot
+ * @return the snapshot path.
+ * @see ClientProtocol#createSnapshot(String, String)
+ */
+ public String createSnapshot(String snapshotRoot, String snapshotName)
+ throws IOException {
+ checkOpen();
+ TraceScope scope = tracer.newScope("createSnapshot");
+ try {
+ return namenode.createSnapshot(snapshotRoot, snapshotName);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Delete a snapshot of a snapshottable directory.
+ *
+ * @param snapshotRoot The snapshottable directory that the
+ * to-be-deleted snapshot belongs to
+ * @param snapshotName The name of the to-be-deleted snapshot
+ * @throws IOException
+ * @see ClientProtocol#deleteSnapshot(String, String)
+ */
+ public void deleteSnapshot(String snapshotRoot, String snapshotName)
+ throws IOException {
+ checkOpen();
+ TraceScope scope = tracer.newScope("deleteSnapshot");
+ try {
+ namenode.deleteSnapshot(snapshotRoot, snapshotName);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Rename a snapshot.
+ * @param snapshotDir The directory path where the snapshot was taken
+ * @param snapshotOldName Old name of the snapshot
+ * @param snapshotNewName New name of the snapshot
+ * @throws IOException
+ * @see ClientProtocol#renameSnapshot(String, String, String)
+ */
+ public void renameSnapshot(String snapshotDir, String snapshotOldName,
+ String snapshotNewName) throws IOException {
+ checkOpen();
+ TraceScope scope = tracer.newScope("renameSnapshot");
+ try {
+ namenode.renameSnapshot(snapshotDir, snapshotOldName, snapshotNewName);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Get all the current snapshottable directories.
+ * @return All the c
<TRUNCATED>