You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:05 UTC
[05/58] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream
and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
deleted file mode 100644
index 5bd48af..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ /dev/null
@@ -1,3141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.CipherSuite;
-import org.apache.hadoop.crypto.CryptoCodec;
-import org.apache.hadoop.crypto.CryptoInputStream;
-import org.apache.hadoop.crypto.CryptoOutputStream;
-import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.crypto.key.KeyProvider;
-import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
-import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
-import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.CacheFlag;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.FsStatus;
-import org.apache.hadoop.fs.HdfsBlockLocation;
-import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Options.ChecksumOpt;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.XAttr;
-import org.apache.hadoop.fs.XAttrSetFlag;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.AclException;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
-import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.CachePoolIterator;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
-import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.EncryptionZone;
-import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
-import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
-import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
-import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
-import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenRenewer;
-import org.apache.hadoop.tracing.SpanReceiverHost;
-import org.apache.hadoop.tracing.TraceUtils;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.DataChecksum.Type;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.Time;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.SamplerBuilder;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.net.InetAddresses;
-
-/********************************************************
- * DFSClient can connect to a Hadoop Filesystem and
- * perform basic file tasks. It uses the ClientProtocol
- * to communicate with a NameNode daemon, and connects
- * directly to DataNodes to read/write block data.
- *
- * Hadoop DFS users should obtain an instance of
- * DistributedFileSystem, which uses DFSClient to handle
- * filesystem tasks.
- *
- ********************************************************/
-@InterfaceAudience.Private
-public class DFSClient implements java.io.Closeable, RemotePeerFactory,
- DataEncryptionKeyFactory {
- public static final Log LOG = LogFactory.getLog(DFSClient.class);
- public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
-
- private final Configuration conf;
- private final DfsClientConf dfsClientConf;
- final ClientProtocol namenode;
- /* The service used for delegation tokens */
- private Text dtService;
-
- final UserGroupInformation ugi;
- volatile boolean clientRunning = true;
- volatile long lastLeaseRenewal;
- private volatile FsServerDefaults serverDefaults;
- private volatile long serverDefaultsLastUpdate;
- final String clientName;
- final SocketFactory socketFactory;
- final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
- final FileSystem.Statistics stats;
- private final String authority;
- private final Random r = new Random();
- private SocketAddress[] localInterfaceAddrs;
- private DataEncryptionKey encryptionKey;
- final SaslDataTransferClient saslClient;
- private final CachingStrategy defaultReadCachingStrategy;
- private final CachingStrategy defaultWriteCachingStrategy;
- private final ClientContext clientContext;
-
- private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
- new DFSHedgedReadMetrics();
- private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
- private final Sampler<?> traceSampler;
- private final int smallBufferSize;
-
- public DfsClientConf getConf() {
- return dfsClientConf;
- }
-
- Configuration getConfiguration() {
- return conf;
- }
-
- /**
- * A map from file names to {@link DFSOutputStream} objects
- * that are currently being written by this client.
- * Note that a file can only be written by a single client.
- */
- private final Map<Long, DFSOutputStream> filesBeingWritten
- = new HashMap<Long, DFSOutputStream>();
-
- /**
- * Same as this(NameNode.getNNAddress(conf), conf);
- * @see #DFSClient(InetSocketAddress, Configuration)
- * @deprecated Deprecated at 0.21
- */
- @Deprecated
- public DFSClient(Configuration conf) throws IOException {
- this(DFSUtilClient.getNNAddress(conf), conf);
- }
-
- public DFSClient(InetSocketAddress address, Configuration conf) throws IOException {
- this(DFSUtilClient.getNNUri(address), conf);
- }
-
- /**
- * Same as this(nameNodeUri, conf, null);
- * @see #DFSClient(URI, Configuration, FileSystem.Statistics)
- */
- public DFSClient(URI nameNodeUri, Configuration conf
- ) throws IOException {
- this(nameNodeUri, conf, null);
- }
-
- /**
- * Same as this(nameNodeUri, null, conf, stats);
- * @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics)
- */
- public DFSClient(URI nameNodeUri, Configuration conf,
- FileSystem.Statistics stats)
- throws IOException {
- this(nameNodeUri, null, conf, stats);
- }
-
- /**
- * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
- * If HA is enabled and a positive value is set for
- * {@link HdfsClientConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY}
- * in the configuration, the DFSClient will use
- * {@link LossyRetryInvocationHandler} as its RetryInvocationHandler.
- * Otherwise one of nameNodeUri or rpcNamenode must be null.
- */
- @VisibleForTesting
- public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
- Configuration conf, FileSystem.Statistics stats)
- throws IOException {
- SpanReceiverHost.get(conf, HdfsClientConfigKeys.DFS_CLIENT_HTRACE_PREFIX);
- traceSampler = new SamplerBuilder(TraceUtils.
- wrapHadoopConf(HdfsClientConfigKeys.DFS_CLIENT_HTRACE_PREFIX, conf))
- .build();
- // Copy only the required DFSClient configuration
- this.dfsClientConf = new DfsClientConf(conf);
- this.conf = conf;
- this.stats = stats;
- this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
- this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
- this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);
-
- this.ugi = UserGroupInformation.getCurrentUser();
-
- this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
- this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
- ThreadLocalRandom.current().nextInt() + "_" +
- Thread.currentThread().getId();
- int numResponseToDrop = conf.getInt(
- HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
- HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
- ProxyAndInfo<ClientProtocol> proxyInfo = null;
- AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
- if (numResponseToDrop > 0) {
- // This case is used for testing.
- LOG.warn(HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
- + " is set to " + numResponseToDrop
- + ", this hacked client will proactively drop responses");
- proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
- nameNodeUri, ClientProtocol.class, numResponseToDrop,
- nnFallbackToSimpleAuth);
- }
-
- if (proxyInfo != null) {
- this.dtService = proxyInfo.getDelegationTokenService();
- this.namenode = proxyInfo.getProxy();
- } else if (rpcNamenode != null) {
- // This case is used for testing.
- Preconditions.checkArgument(nameNodeUri == null);
- this.namenode = rpcNamenode;
- dtService = null;
- } else {
- Preconditions.checkArgument(nameNodeUri != null,
- "null URI");
- proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf,
- nameNodeUri, nnFallbackToSimpleAuth);
- this.dtService = proxyInfo.getDelegationTokenService();
- this.namenode = proxyInfo.getProxy();
- }
-
- String localInterfaces[] =
- conf.getTrimmedStrings(HdfsClientConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
- localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
- if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
- LOG.debug("Using local interfaces [" +
- Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
- Joiner.on(',').join(localInterfaceAddrs) + "]");
- }
-
- Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
- null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
- Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
- null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
- Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
- null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
- this.defaultReadCachingStrategy =
- new CachingStrategy(readDropBehind, readahead);
- this.defaultWriteCachingStrategy =
- new CachingStrategy(writeDropBehind, readahead);
- this.clientContext = ClientContext.get(
- conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
- dfsClientConf);
-
- if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
- this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());
- }
- this.saslClient = new SaslDataTransferClient(
- conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
- TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
- }
-
- /**
- * Return the socket addresses to use with each configured
- * local interface. Local interfaces may be specified by IP
- * address, IP address range using CIDR notation, interface
- * name (e.g. eth0) or sub-interface name (e.g. eth0:0).
- * The socket addresses consist of the IPs for the interfaces
- * and the ephemeral port (port 0). If an IP, IP range, or
- * interface name matches an interface with sub-interfaces
- * only the IP of the interface is used. Sub-interfaces can
- * be used by specifying them explicitly (by IP or name).
- *
- * @return SocketAddresses for the configured local interfaces,
- * or an empty array if none are configured
- * @throws UnknownHostException if a given interface name is invalid
- */
- private static SocketAddress[] getLocalInterfaceAddrs(
- String interfaceNames[]) throws UnknownHostException {
- List<SocketAddress> localAddrs = new ArrayList<SocketAddress>();
- for (String interfaceName : interfaceNames) {
- if (InetAddresses.isInetAddress(interfaceName)) {
- localAddrs.add(new InetSocketAddress(interfaceName, 0));
- } else if (NetUtils.isValidSubnet(interfaceName)) {
- for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
- localAddrs.add(new InetSocketAddress(addr, 0));
- }
- } else {
- for (String ip : DNS.getIPs(interfaceName, false)) {
- localAddrs.add(new InetSocketAddress(ip, 0));
- }
- }
- }
- return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
- }
-
- /**
- * Select one of the configured local interfaces at random. We use a random
- * interface because other policies like round-robin are less effective
- * given that we cache connections to datanodes.
- *
- * @return one of the local interface addresses at random, or null if no
- * local interfaces are configured
- */
- SocketAddress getRandomLocalInterfaceAddr() {
- if (localInterfaceAddrs.length == 0) {
- return null;
- }
- final int idx = r.nextInt(localInterfaceAddrs.length);
- final SocketAddress addr = localInterfaceAddrs[idx];
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using local interface " + addr);
- }
- return addr;
- }
-
- /**
- * Return the timeout that clients should use when writing to datanodes.
- * @param numNodes the number of nodes in the pipeline.
- */
- int getDatanodeWriteTimeout(int numNodes) {
- final int t = dfsClientConf.getDatanodeSocketWriteTimeout();
- return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
- }
-
- int getDatanodeReadTimeout(int numNodes) {
- final int t = dfsClientConf.getSocketTimeout();
- return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
- }
-
- @VisibleForTesting
- public String getClientName() {
- return clientName;
- }
-
- void checkOpen() throws IOException {
- if (!clientRunning) {
- IOException result = new IOException("Filesystem closed");
- throw result;
- }
- }
-
- /** Return the lease renewer instance. The renewer thread won't start
- * until the first output stream is created. The same instance will
- * be returned until all output streams are closed.
- */
- public LeaseRenewer getLeaseRenewer() throws IOException {
- return LeaseRenewer.getInstance(authority, ugi, this);
- }
-
- /** Get a lease and start automatic renewal */
- private void beginFileLease(final long inodeId, final DFSOutputStream out)
- throws IOException {
- getLeaseRenewer().put(inodeId, out, this);
- }
-
- /** Stop renewal of lease for the file. */
- void endFileLease(final long inodeId) throws IOException {
- getLeaseRenewer().closeFile(inodeId, this);
- }
-
-
- /** Put a file. Only called from LeaseRenewer, where proper locking is
- * enforced to consistently update its local dfsclients array and
- * client's filesBeingWritten map.
- */
- public void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
- synchronized(filesBeingWritten) {
- filesBeingWritten.put(inodeId, out);
- // update the last lease renewal time only when there was no
- // writes. once there is one write stream open, the lease renewer
- // thread keeps it updated well with in anyone's expiration time.
- if (lastLeaseRenewal == 0) {
- updateLastLeaseRenewal();
- }
- }
- }
-
- /** Remove a file. Only called from LeaseRenewer. */
- public void removeFileBeingWritten(final long inodeId) {
- synchronized(filesBeingWritten) {
- filesBeingWritten.remove(inodeId);
- if (filesBeingWritten.isEmpty()) {
- lastLeaseRenewal = 0;
- }
- }
- }
-
- /** Is file-being-written map empty? */
- public boolean isFilesBeingWrittenEmpty() {
- synchronized(filesBeingWritten) {
- return filesBeingWritten.isEmpty();
- }
- }
-
- /** @return true if the client is running */
- public boolean isClientRunning() {
- return clientRunning;
- }
-
- long getLastLeaseRenewal() {
- return lastLeaseRenewal;
- }
-
- void updateLastLeaseRenewal() {
- synchronized(filesBeingWritten) {
- if (filesBeingWritten.isEmpty()) {
- return;
- }
- lastLeaseRenewal = Time.monotonicNow();
- }
- }
-
- /**
- * Renew leases.
- * @return true if lease was renewed. May return false if this
- * client has been closed or has no files open.
- **/
- public boolean renewLease() throws IOException {
- if (clientRunning && !isFilesBeingWrittenEmpty()) {
- try {
- namenode.renewLease(clientName);
- updateLastLeaseRenewal();
- return true;
- } catch (IOException e) {
- // Abort if the lease has already expired.
- final long elapsed = Time.monotonicNow() - getLastLeaseRenewal();
- if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) {
- LOG.warn("Failed to renew lease for " + clientName + " for "
- + (elapsed/1000) + " seconds (>= hard-limit ="
- + (HdfsConstants.LEASE_HARDLIMIT_PERIOD / 1000) + " seconds.) "
- + "Closing all files being written ...", e);
- closeAllFilesBeingWritten(true);
- } else {
- // Let the lease renewer handle it and retry.
- throw e;
- }
- }
- }
- return false;
- }
-
- /**
- * Close connections the Namenode.
- */
- void closeConnectionToNamenode() {
- RPC.stopProxy(namenode);
- }
-
- /** Close/abort all files being written. */
- public void closeAllFilesBeingWritten(final boolean abort) {
- for(;;) {
- final long inodeId;
- final DFSOutputStream out;
- synchronized(filesBeingWritten) {
- if (filesBeingWritten.isEmpty()) {
- return;
- }
- inodeId = filesBeingWritten.keySet().iterator().next();
- out = filesBeingWritten.remove(inodeId);
- }
- if (out != null) {
- try {
- if (abort) {
- out.abort();
- } else {
- out.close();
- }
- } catch(IOException ie) {
- LOG.error("Failed to " + (abort ? "abort" : "close") + " file: "
- + out.getSrc() + " with inode: " + inodeId, ie);
- }
- }
- }
- }
-
- /**
- * Close the file system, abandoning all of the leases and files being
- * created and close connections to the namenode.
- */
- @Override
- public synchronized void close() throws IOException {
- if(clientRunning) {
- closeAllFilesBeingWritten(false);
- clientRunning = false;
- getLeaseRenewer().closeClient(this);
- // close connections to the namenode
- closeConnectionToNamenode();
- }
- }
-
- /**
- * Close all open streams, abandoning all of the leases and files being
- * created.
- * @param abort whether streams should be gracefully closed
- */
- public void closeOutputStreams(boolean abort) {
- if (clientRunning) {
- closeAllFilesBeingWritten(abort);
- }
- }
-
- /**
- * @see ClientProtocol#getPreferredBlockSize(String)
- */
- public long getBlockSize(String f) throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("getBlockSize", f);
- try {
- return namenode.getPreferredBlockSize(f);
- } catch (IOException ie) {
- LOG.warn("Problem getting block size", ie);
- throw ie;
- } finally {
- scope.close();
- }
- }
-
- /**
- * Get server default values for a number of configuration params.
- * @see ClientProtocol#getServerDefaults()
- */
- public FsServerDefaults getServerDefaults() throws IOException {
- checkOpen();
- long now = Time.monotonicNow();
- if ((serverDefaults == null) ||
- (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD)) {
- serverDefaults = namenode.getServerDefaults();
- serverDefaultsLastUpdate = now;
- }
- assert serverDefaults != null;
- return serverDefaults;
- }
-
- /**
- * Get a canonical token service name for this client's tokens. Null should
- * be returned if the client is not using tokens.
- * @return the token service for the client
- */
- @InterfaceAudience.LimitedPrivate( { "HDFS" })
- public String getCanonicalServiceName() {
- return (dtService != null) ? dtService.toString() : null;
- }
-
- /**
- * @see ClientProtocol#getDelegationToken(Text)
- */
- public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
- throws IOException {
- assert dtService != null;
- TraceScope scope = Trace.startSpan("getDelegationToken", traceSampler);
- try {
- Token<DelegationTokenIdentifier> token =
- namenode.getDelegationToken(renewer);
- if (token != null) {
- token.setService(this.dtService);
- LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
- } else {
- LOG.info("Cannot get delegation token from " + renewer);
- }
- return token;
- } finally {
- scope.close();
- }
- }
-
- /**
- * Renew a delegation token
- * @param token the token to renew
- * @return the new expiration time
- * @throws InvalidToken
- * @throws IOException
- * @deprecated Use Token.renew instead.
- */
- @Deprecated
- public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
- throws InvalidToken, IOException {
- LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token));
- try {
- return token.renew(conf);
- } catch (InterruptedException ie) {
- throw new RuntimeException("caught interrupted", ie);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(InvalidToken.class,
- AccessControlException.class);
- }
- }
-
- /**
- * Cancel a delegation token
- * @param token the token to cancel
- * @throws InvalidToken
- * @throws IOException
- * @deprecated Use Token.cancel instead.
- */
- @Deprecated
- public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
- throws InvalidToken, IOException {
- LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token));
- try {
- token.cancel(conf);
- } catch (InterruptedException ie) {
- throw new RuntimeException("caught interrupted", ie);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(InvalidToken.class,
- AccessControlException.class);
- }
- }
-
- @InterfaceAudience.Private
- public static class Renewer extends TokenRenewer {
-
- static {
- //Ensure that HDFS Configuration files are loaded before trying to use
- // the renewer.
- HdfsConfiguration.init();
- }
-
- @Override
- public boolean handleKind(Text kind) {
- return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public long renew(Token<?> token, Configuration conf) throws IOException {
- Token<DelegationTokenIdentifier> delToken =
- (Token<DelegationTokenIdentifier>) token;
- ClientProtocol nn = getNNProxy(delToken, conf);
- try {
- return nn.renewDelegationToken(delToken);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(InvalidToken.class,
- AccessControlException.class);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void cancel(Token<?> token, Configuration conf) throws IOException {
- Token<DelegationTokenIdentifier> delToken =
- (Token<DelegationTokenIdentifier>) token;
- LOG.info("Cancelling " +
- DelegationTokenIdentifier.stringifyToken(delToken));
- ClientProtocol nn = getNNProxy(delToken, conf);
- try {
- nn.cancelDelegationToken(delToken);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(InvalidToken.class,
- AccessControlException.class);
- }
- }
-
- private static ClientProtocol getNNProxy(
- Token<DelegationTokenIdentifier> token, Configuration conf)
- throws IOException {
- URI uri = HAUtilClient.getServiceUriFromToken(
- HdfsConstants.HDFS_URI_SCHEME, token);
- if (HAUtilClient.isTokenForLogicalUri(token) &&
- !HAUtilClient.isLogicalUri(conf, uri)) {
- // If the token is for a logical nameservice, but the configuration
- // we have disagrees about that, we can't actually renew it.
- // This can be the case in MR, for example, if the RM doesn't
- // have all of the HA clusters configured in its configuration.
- throw new IOException("Unable to map logical nameservice URI '" +
- uri + "' to a NameNode. Local configuration does not have " +
- "a failover proxy provider configured.");
- }
-
- ProxyAndInfo<ClientProtocol> info =
- NameNodeProxiesClient.createProxyWithClientProtocol(conf, uri, null);
- assert info.getDelegationTokenService().equals(token.getService()) :
- "Returned service '" + info.getDelegationTokenService().toString() +
- "' doesn't match expected service '" +
- token.getService().toString() + "'";
-
- return info.getProxy();
- }
-
- @Override
- public boolean isManaged(Token<?> token) throws IOException {
- return true;
- }
-
- }
-
- /**
- * Report corrupt blocks that were discovered by the client.
- * @see ClientProtocol#reportBadBlocks(LocatedBlock[])
- */
- public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
- checkOpen();
- namenode.reportBadBlocks(blocks);
- }
-
- public LocatedBlocks getLocatedBlocks(String src, long start)
- throws IOException {
- return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
- }
-
- /*
- * This is just a wrapper around callGetBlockLocations, but non-static so that
- * we can stub it out for tests.
- */
- @VisibleForTesting
- public LocatedBlocks getLocatedBlocks(String src, long start, long length)
- throws IOException {
- TraceScope scope = getPathTraceScope("getBlockLocations", src);
- try {
- return callGetBlockLocations(namenode, src, start, length);
- } finally {
- scope.close();
- }
- }
-
- /**
- * @see ClientProtocol#getBlockLocations(String, long, long)
- */
- static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
- String src, long start, long length)
- throws IOException {
- try {
- return namenode.getBlockLocations(src, start, length);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- UnresolvedPathException.class);
- }
- }
-
- /**
- * Recover a file's lease
- * @param src a file's path
- * @return true if the file is already closed
- * @throws IOException
- */
- boolean recoverLease(String src) throws IOException {
- checkOpen();
-
- TraceScope scope = getPathTraceScope("recoverLease", src);
- try {
- return namenode.recoverLease(src, clientName);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(FileNotFoundException.class,
- AccessControlException.class,
- UnresolvedPathException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Get block location info about file
- *
- * getBlockLocations() returns a list of hostnames that store
- * data for a specific file region. It returns a set of hostnames
- * for every block within the indicated region.
- *
- * This function is very useful when writing code that considers
- * data-placement when performing operations. For example, the
- * MapReduce system tries to schedule tasks on the same machines
- * as the data-block the task processes.
- */
- public BlockLocation[] getBlockLocations(String src, long start,
- long length) throws IOException, UnresolvedLinkException {
- checkOpen();
- TraceScope scope = getPathTraceScope("getBlockLocations", src);
- try {
- LocatedBlocks blocks = getLocatedBlocks(src, start, length);
- BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
- HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
- for (int i = 0; i < locations.length; i++) {
- hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
- }
- return hdfsLocations;
- } finally {
- scope.close();
- }
- }
-
- /**
- * Decrypts a EDEK by consulting the KeyProvider.
- */
- private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
- feInfo) throws IOException {
- TraceScope scope = Trace.startSpan("decryptEDEK", traceSampler);
- try {
- KeyProvider provider = getKeyProvider();
- if (provider == null) {
- throw new IOException("No KeyProvider is configured, cannot access" +
- " an encrypted file");
- }
- EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
- feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
- feInfo.getEncryptedDataEncryptionKey());
- try {
- KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
- .createKeyProviderCryptoExtension(provider);
- return cryptoProvider.decryptEncryptedKey(ekv);
- } catch (GeneralSecurityException e) {
- throw new IOException(e);
- }
- } finally {
- scope.close();
- }
- }
-
- /**
- * Obtain the crypto protocol version from the provided FileEncryptionInfo,
- * checking to see if this version is supported by.
- *
- * @param feInfo FileEncryptionInfo
- * @return CryptoProtocolVersion from the feInfo
- * @throws IOException if the protocol version is unsupported.
- */
- private static CryptoProtocolVersion getCryptoProtocolVersion
- (FileEncryptionInfo feInfo) throws IOException {
- final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion();
- if (!CryptoProtocolVersion.supports(version)) {
- throw new IOException("Client does not support specified " +
- "CryptoProtocolVersion " + version.getDescription() + " version " +
- "number" + version.getVersion());
- }
- return version;
- }
-
- /**
- * Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo
- * and the available CryptoCodecs configured in the Configuration.
- *
- * @param conf Configuration
- * @param feInfo FileEncryptionInfo
- * @return CryptoCodec
- * @throws IOException if no suitable CryptoCodec for the CipherSuite is
- * available.
- */
- private static CryptoCodec getCryptoCodec(Configuration conf,
- FileEncryptionInfo feInfo) throws IOException {
- final CipherSuite suite = feInfo.getCipherSuite();
- if (suite.equals(CipherSuite.UNKNOWN)) {
- throw new IOException("NameNode specified unknown CipherSuite with ID "
- + suite.getUnknownValue() + ", cannot instantiate CryptoCodec.");
- }
- final CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
- if (codec == null) {
- throw new UnknownCipherSuiteException(
- "No configuration found for the cipher suite "
- + suite.getConfigSuffix() + " prefixed with "
- + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX
- + ". Please see the example configuration "
- + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE "
- + "at core-default.xml for details.");
- }
- return codec;
- }
-
- /**
- * Wraps the stream in a CryptoInputStream if the underlying file is
- * encrypted.
- */
- public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
- throws IOException {
- final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
- if (feInfo != null) {
- // File is encrypted, wrap the stream in a crypto stream.
- // Currently only one version, so no special logic based on the version #
- getCryptoProtocolVersion(feInfo);
- final CryptoCodec codec = getCryptoCodec(conf, feInfo);
- final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
- final CryptoInputStream cryptoIn =
- new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
- feInfo.getIV());
- return new HdfsDataInputStream(cryptoIn);
- } else {
- // No FileEncryptionInfo so no encryption.
- return new HdfsDataInputStream(dfsis);
- }
- }
-
- /**
- * Wraps the stream in a CryptoOutputStream if the underlying file is
- * encrypted.
- */
- public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
- FileSystem.Statistics statistics) throws IOException {
- return createWrappedOutputStream(dfsos, statistics, 0);
- }
-
- /**
- * Wraps the stream in a CryptoOutputStream if the underlying file is
- * encrypted.
- */
- public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
- FileSystem.Statistics statistics, long startPos) throws IOException {
- final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo();
- if (feInfo != null) {
- // File is encrypted, wrap the stream in a crypto stream.
- // Currently only one version, so no special logic based on the version #
- getCryptoProtocolVersion(feInfo);
- final CryptoCodec codec = getCryptoCodec(conf, feInfo);
- KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
- final CryptoOutputStream cryptoOut =
- new CryptoOutputStream(dfsos, codec,
- decrypted.getMaterial(), feInfo.getIV(), startPos);
- return new HdfsDataOutputStream(cryptoOut, statistics, startPos);
- } else {
- // No FileEncryptionInfo present so no encryption.
- return new HdfsDataOutputStream(dfsos, statistics, startPos);
- }
- }
-
- public DFSInputStream open(String src)
- throws IOException, UnresolvedLinkException {
- return open(src, dfsClientConf.getIoBufferSize(), true, null);
- }
-
- /**
- * Create an input stream that obtains a nodelist from the
- * namenode, and then reads from all the right places. Creates
- * inner subclass of InputStream that does the right out-of-band
- * work.
- * @deprecated Use {@link #open(String, int, boolean)} instead.
- */
- @Deprecated
- public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
- FileSystem.Statistics stats)
- throws IOException, UnresolvedLinkException {
- return open(src, buffersize, verifyChecksum);
- }
-
-
- /**
- * Create an input stream that obtains a nodelist from the
- * namenode, and then reads from all the right places. Creates
- * inner subclass of InputStream that does the right out-of-band
- * work.
- */
- public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
- throws IOException, UnresolvedLinkException {
- checkOpen();
- // Get block info from namenode
- TraceScope scope = getPathTraceScope("newDFSInputStream", src);
- try {
- return new DFSInputStream(this, src, verifyChecksum, null);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Get the namenode associated with this DFSClient object
- * @return the namenode associated with this DFSClient object
- */
- public ClientProtocol getNamenode() {
- return namenode;
- }
-
- /**
- * Call {@link #create(String, boolean, short, long, Progressable)} with
- * default <code>replication</code> and <code>blockSize<code> and null <code>
- * progress</code>.
- */
- public OutputStream create(String src, boolean overwrite)
- throws IOException {
- return create(src, overwrite, dfsClientConf.getDefaultReplication(),
- dfsClientConf.getDefaultBlockSize(), null);
- }
-
- /**
- * Call {@link #create(String, boolean, short, long, Progressable)} with
- * default <code>replication</code> and <code>blockSize<code>.
- */
- public OutputStream create(String src,
- boolean overwrite,
- Progressable progress) throws IOException {
- return create(src, overwrite, dfsClientConf.getDefaultReplication(),
- dfsClientConf.getDefaultBlockSize(), progress);
- }
-
- /**
- * Call {@link #create(String, boolean, short, long, Progressable)} with
- * null <code>progress</code>.
- */
- public OutputStream create(String src,
- boolean overwrite,
- short replication,
- long blockSize) throws IOException {
- return create(src, overwrite, replication, blockSize, null);
- }
-
- /**
- * Call {@link #create(String, boolean, short, long, Progressable, int)}
- * with default bufferSize.
- */
- public OutputStream create(String src, boolean overwrite, short replication,
- long blockSize, Progressable progress) throws IOException {
- return create(src, overwrite, replication, blockSize, progress,
- dfsClientConf.getIoBufferSize());
- }
-
- /**
- * Call {@link #create(String, FsPermission, EnumSet, short, long,
- * Progressable, int, ChecksumOpt)} with default <code>permission</code>
- * {@link FsPermission#getFileDefault()}.
- *
- * @param src File name
- * @param overwrite overwrite an existing file if true
- * @param replication replication factor for the file
- * @param blockSize maximum block size
- * @param progress interface for reporting client progress
- * @param buffersize underlying buffersize
- *
- * @return output stream
- */
- public OutputStream create(String src,
- boolean overwrite,
- short replication,
- long blockSize,
- Progressable progress,
- int buffersize)
- throws IOException {
- return create(src, FsPermission.getFileDefault(),
- overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
- buffersize, null);
- }
-
- /**
- * Call {@link #create(String, FsPermission, EnumSet, boolean, short,
- * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
- * set to true.
- */
- public DFSOutputStream create(String src,
- FsPermission permission,
- EnumSet<CreateFlag> flag,
- short replication,
- long blockSize,
- Progressable progress,
- int buffersize,
- ChecksumOpt checksumOpt)
- throws IOException {
- return create(src, permission, flag, true,
- replication, blockSize, progress, buffersize, checksumOpt, null);
- }
-
- /**
- * Create a new dfs file with the specified block replication
- * with write-progress reporting and return an output stream for writing
- * into the file.
- *
- * @param src File name
- * @param permission The permission of the directory being created.
- * If null, use default permission {@link FsPermission#getFileDefault()}
- * @param flag indicates create a new file or create/overwrite an
- * existing file or append to an existing file
- * @param createParent create missing parent directory if true
- * @param replication block replication
- * @param blockSize maximum block size
- * @param progress interface for reporting client progress
- * @param buffersize underlying buffer size
- * @param checksumOpt checksum options
- *
- * @return output stream
- *
- * @see ClientProtocol#create for detailed description of exceptions thrown
- */
- public DFSOutputStream create(String src,
- FsPermission permission,
- EnumSet<CreateFlag> flag,
- boolean createParent,
- short replication,
- long blockSize,
- Progressable progress,
- int buffersize,
- ChecksumOpt checksumOpt) throws IOException {
- return create(src, permission, flag, createParent, replication, blockSize,
- progress, buffersize, checksumOpt, null);
- }
-
- private FsPermission applyUMask(FsPermission permission) {
- if (permission == null) {
- permission = FsPermission.getFileDefault();
- }
- return permission.applyUMask(dfsClientConf.getUMask());
- }
-
- /**
- * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
- * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
- * a hint to where the namenode should place the file blocks.
- * The favored nodes hint is not persisted in HDFS. Hence it may be honored
- * at the creation time only. HDFS could move the blocks during balancing or
- * replication, to move the blocks from favored nodes. A value of null means
- * no favored nodes for this create
- */
- public DFSOutputStream create(String src,
- FsPermission permission,
- EnumSet<CreateFlag> flag,
- boolean createParent,
- short replication,
- long blockSize,
- Progressable progress,
- int buffersize,
- ChecksumOpt checksumOpt,
- InetSocketAddress[] favoredNodes) throws IOException {
- checkOpen();
- final FsPermission masked = applyUMask(permission);
- if(LOG.isDebugEnabled()) {
- LOG.debug(src + ": masked=" + masked);
- }
- final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
- src, masked, flag, createParent, replication, blockSize, progress,
- buffersize, dfsClientConf.createChecksum(checksumOpt),
- getFavoredNodesStr(favoredNodes));
- beginFileLease(result.getFileId(), result);
- return result;
- }
-
- private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) {
- String[] favoredNodeStrs = null;
- if (favoredNodes != null) {
- favoredNodeStrs = new String[favoredNodes.length];
- for (int i = 0; i < favoredNodes.length; i++) {
- favoredNodeStrs[i] =
- favoredNodes[i].getHostName() + ":"
- + favoredNodes[i].getPort();
- }
- }
- return favoredNodeStrs;
- }
-
- /**
- * Append to an existing file if {@link CreateFlag#APPEND} is present
- */
- private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
- int buffersize, Progressable progress) throws IOException {
- if (flag.contains(CreateFlag.APPEND)) {
- HdfsFileStatus stat = getFileInfo(src);
- if (stat == null) { // No file to append to
- // New file needs to be created if create option is present
- if (!flag.contains(CreateFlag.CREATE)) {
- throw new FileNotFoundException("failed to append to non-existent file "
- + src + " on client " + clientName);
- }
- return null;
- }
- return callAppend(src, buffersize, flag, progress, null);
- }
- return null;
- }
-
- /**
- * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
- * Progressable, int, ChecksumOpt)} except that the permission
- * is absolute (ie has already been masked with umask.
- */
- public DFSOutputStream primitiveCreate(String src,
- FsPermission absPermission,
- EnumSet<CreateFlag> flag,
- boolean createParent,
- short replication,
- long blockSize,
- Progressable progress,
- int buffersize,
- ChecksumOpt checksumOpt)
- throws IOException, UnresolvedLinkException {
- checkOpen();
- CreateFlag.validate(flag);
- DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
- if (result == null) {
- DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
- result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
- flag, createParent, replication, blockSize, progress, buffersize,
- checksum, null);
- }
- beginFileLease(result.getFileId(), result);
- return result;
- }
-
- /**
- * Creates a symbolic link.
- *
- * @see ClientProtocol#createSymlink(String, String,FsPermission, boolean)
- */
- public void createSymlink(String target, String link, boolean createParent)
- throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("createSymlink", target);
- try {
- final FsPermission dirPerm = applyUMask(null);
- namenode.createSymlink(target, link, dirPerm, createParent);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileAlreadyExistsException.class,
- FileNotFoundException.class,
- ParentNotDirectoryException.class,
- NSQuotaExceededException.class,
- DSQuotaExceededException.class,
- QuotaByStorageTypeExceededException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Resolve the *first* symlink, if any, in the path.
- *
- * @see ClientProtocol#getLinkTarget(String)
- */
- public String getLinkTarget(String path) throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("getLinkTarget", path);
- try {
- return namenode.getLinkTarget(path);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class);
- } finally {
- scope.close();
- }
- }
-
- /** Method to get stream returned by append call */
- private DFSOutputStream callAppend(String src, int buffersize,
- EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes)
- throws IOException {
- CreateFlag.validateForAppend(flag);
- try {
- LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
- new EnumSetWritable<>(flag, CreateFlag.class));
- return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize,
- progress, blkWithStatus.getLastBlock(),
- blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(null),
- favoredNodes);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- SafeModeException.class,
- DSQuotaExceededException.class,
- QuotaByStorageTypeExceededException.class,
- UnsupportedOperationException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class);
- }
- }
-
- /**
- * Append to an existing HDFS file.
- *
- * @param src file name
- * @param buffersize buffer size
- * @param flag indicates whether to append data to a new block instead of
- * the last block
- * @param progress for reporting write-progress; null is acceptable.
- * @param statistics file system statistics; null is acceptable.
- * @return an output stream for writing into the file
- *
- * @see ClientProtocol#append(String, String, EnumSetWritable)
- */
- public HdfsDataOutputStream append(final String src, final int buffersize,
- EnumSet<CreateFlag> flag, final Progressable progress,
- final FileSystem.Statistics statistics) throws IOException {
- final DFSOutputStream out = append(src, buffersize, flag, null, progress);
- return createWrappedOutputStream(out, statistics, out.getInitialLen());
- }
-
- /**
- * Append to an existing HDFS file.
- *
- * @param src file name
- * @param buffersize buffer size
- * @param flag indicates whether to append data to a new block instead of the
- * last block
- * @param progress for reporting write-progress; null is acceptable.
- * @param statistics file system statistics; null is acceptable.
- * @param favoredNodes FavoredNodes for new blocks
- * @return an output stream for writing into the file
- * @see ClientProtocol#append(String, String, EnumSetWritable)
- */
- public HdfsDataOutputStream append(final String src, final int buffersize,
- EnumSet<CreateFlag> flag, final Progressable progress,
- final FileSystem.Statistics statistics,
- final InetSocketAddress[] favoredNodes) throws IOException {
- final DFSOutputStream out = append(src, buffersize, flag,
- getFavoredNodesStr(favoredNodes), progress);
- return createWrappedOutputStream(out, statistics, out.getInitialLen());
- }
-
- private DFSOutputStream append(String src, int buffersize,
- EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress)
- throws IOException {
- checkOpen();
- final DFSOutputStream result = callAppend(src, buffersize, flag, progress,
- favoredNodes);
- beginFileLease(result.getFileId(), result);
- return result;
- }
-
- /**
- * Set replication for an existing file.
- * @param src file name
- * @param replication replication to set the file to
- *
- * @see ClientProtocol#setReplication(String, short)
- */
- public boolean setReplication(String src, short replication)
- throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("setReplication", src);
- try {
- return namenode.setReplication(src, replication);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- SafeModeException.class,
- DSQuotaExceededException.class,
- QuotaByStorageTypeExceededException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Set storage policy for an existing file/directory
- * @param src file/directory name
- * @param policyName name of the storage policy
- */
- public void setStoragePolicy(String src, String policyName)
- throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("setStoragePolicy", src);
- try {
- namenode.setStoragePolicy(src, policyName);
- } catch (RemoteException e) {
- throw e.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- SafeModeException.class,
- NSQuotaExceededException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * @param path file/directory name
- * @return Get the storage policy for specified path
- */
- public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("getStoragePolicy", path);
- try {
- return namenode.getStoragePolicy(path);
- } catch (RemoteException e) {
- throw e.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- SafeModeException.class,
- UnresolvedPathException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * @return All the existing storage policies
- */
- public BlockStoragePolicy[] getStoragePolicies() throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("getStoragePolicies", traceSampler);
- try {
- return namenode.getStoragePolicies();
- } finally {
- scope.close();
- }
- }
-
- /**
- * Rename file or directory.
- * @see ClientProtocol#rename(String, String)
- * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
- */
- @Deprecated
- public boolean rename(String src, String dst) throws IOException {
- checkOpen();
- TraceScope scope = getSrcDstTraceScope("rename", src, dst);
- try {
- return namenode.rename(src, dst);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- NSQuotaExceededException.class,
- DSQuotaExceededException.class,
- QuotaByStorageTypeExceededException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Move blocks from src to trg and delete src
- * See {@link ClientProtocol#concat}.
- */
- public void concat(String trg, String [] srcs) throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("concat", traceSampler);
- try {
- namenode.concat(trg, srcs);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class);
- } finally {
- scope.close();
- }
- }
- /**
- * Rename file or directory.
- * @see ClientProtocol#rename2(String, String, Options.Rename...)
- */
- public void rename(String src, String dst, Options.Rename... options)
- throws IOException {
- checkOpen();
- TraceScope scope = getSrcDstTraceScope("rename2", src, dst);
- try {
- namenode.rename2(src, dst, options);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- DSQuotaExceededException.class,
- QuotaByStorageTypeExceededException.class,
- FileAlreadyExistsException.class,
- FileNotFoundException.class,
- ParentNotDirectoryException.class,
- SafeModeException.class,
- NSQuotaExceededException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Truncate a file to an indicated size
- * See {@link ClientProtocol#truncate}.
- */
- public boolean truncate(String src, long newLength) throws IOException {
- checkOpen();
- if (newLength < 0) {
- throw new HadoopIllegalArgumentException(
- "Cannot truncate to a negative file size: " + newLength + ".");
- }
- TraceScope scope = getPathTraceScope("truncate", src);
- try {
- return namenode.truncate(src, newLength, clientName);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- UnresolvedPathException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Delete file or directory.
- * See {@link ClientProtocol#delete(String, boolean)}.
- */
- @Deprecated
- public boolean delete(String src) throws IOException {
- checkOpen();
- return delete(src, true);
- }
-
- /**
- * delete file or directory.
- * delete contents of the directory if non empty and recursive
- * set to true
- *
- * @see ClientProtocol#delete(String, boolean)
- */
- public boolean delete(String src, boolean recursive) throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("delete", src);
- try {
- return namenode.delete(src, recursive);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- SafeModeException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class);
- } finally {
- scope.close();
- }
- }
-
- /** Implemented using getFileInfo(src)
- */
- public boolean exists(String src) throws IOException {
- checkOpen();
- return getFileInfo(src) != null;
- }
-
- /**
- * Get a partial listing of the indicated directory
- * No block locations need to be fetched
- */
- public DirectoryListing listPaths(String src, byte[] startAfter)
- throws IOException {
- return listPaths(src, startAfter, false);
- }
-
- /**
- * Get a partial listing of the indicated directory
- *
- * Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter
- * if the application wants to fetch a listing starting from
- * the first entry in the directory
- *
- * @see ClientProtocol#getListing(String, byte[], boolean)
- */
- public DirectoryListing listPaths(String src, byte[] startAfter,
- boolean needLocation) throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("listPaths", src);
- try {
- return namenode.getListing(src, startAfter, needLocation);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- UnresolvedPathException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Get the file info for a specific file or directory.
- * @param src The string representation of the path to the file
- * @return object containing information regarding the file
- * or null if file not found
- *
- * @see ClientProtocol#getFileInfo(String) for description of exceptions
- */
- public HdfsFileStatus getFileInfo(String src) throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("getFileInfo", src);
- try {
- return namenode.getFileInfo(src);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- UnresolvedPathException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Close status of a file
- * @return true if file is already closed
- */
- public boolean isFileClosed(String src) throws IOException{
- checkOpen();
- TraceScope scope = getPathTraceScope("isFileClosed", src);
- try {
- return namenode.isFileClosed(src);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- UnresolvedPathException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Get the file info for a specific file or directory. If src
- * refers to a symlink then the FileStatus of the link is returned.
- * @param src path to a file or directory.
- *
- * For description of exceptions thrown
- * @see ClientProtocol#getFileLinkInfo(String)
- */
- public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("getFileLinkInfo", src);
- try {
- return namenode.getFileLinkInfo(src);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- UnresolvedPathException.class);
- } finally {
- scope.close();
- }
- }
-
- @InterfaceAudience.Private
- public void clearDataEncryptionKey() {
- LOG.debug("Clearing encryption key");
- synchronized (this) {
- encryptionKey = null;
- }
- }
-
- /**
- * @return true if data sent between this client and DNs should be encrypted,
- * false otherwise.
- * @throws IOException in the event of error communicating with the NN
- */
- boolean shouldEncryptData() throws IOException {
- FsServerDefaults d = getServerDefaults();
- return d == null ? false : d.getEncryptDataTransfer();
- }
-
- @Override
- public DataEncryptionKey newDataEncryptionKey() throws IOException {
- if (shouldEncryptData()) {
- synchronized (this) {
- if (encryptionKey == null ||
- encryptionKey.expiryDate < Time.now()) {
- LOG.debug("Getting new encryption token from NN");
- encryptionKey = namenode.getDataEncryptionKey();
- }
- return encryptionKey;
- }
- } else {
- return null;
- }
- }
-
- /**
- * Get the checksum of the whole file of a range of the file. Note that the
- * range always starts from the beginning of the file.
- * @param src The file path
- * @param length the length of the range, i.e., the range is [0, length]
- * @return The checksum
- * @see DistributedFileSystem#getFileChecksum(Path)
- */
- public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
- throws IOException {
- checkOpen();
- Preconditions.checkArgument(length >= 0);
- //get block locations for the file range
- LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
- length);
- if (null == blockLocations) {
- throw new FileNotFoundException("File does not exist: " + src);
- }
- if (blockLocations.isUnderConstruction()) {
- throw new IOException("Fail to get checksum, since file " + src
- + " is under construction.");
- }
- List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
- final DataOutputBuffer md5out = new DataOutputBuffer();
- int bytesPerCRC = -1;
- DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
- long crcPerBlock = 0;
- boolean refetchBlocks = false;
- int lastRetriedIndex = -1;
-
- // get block checksum for each block
- long remaining = length;
- if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
- remaining = Math.min(length, blockLocations.getFileLength());
- }
- for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
- if (refetchBlocks) { // refetch to get fresh tokens
- blockLocations = callGetBlockLocations(namenode, src, 0, length);
- if (null == blockLocations) {
- throw new FileNotFoundException("File does not exist: " + src);
- }
- if (blockLocations.isUnderConstruction()) {
- throw new IOException("Fail to get checksum, since file " + src
- + " is under construction.");
- }
- locatedblocks = blockLocations.getLocatedBlocks();
- refetchBlocks = false;
- }
- LocatedBlock lb = locatedblocks.get(i);
- final ExtendedBlock block = lb.getBlock();
- if (remaining < block.getNumBytes()) {
- block.setNumBytes(remaining);
- }
- remaining -= block.getNumBytes();
- final DatanodeInfo[] datanodes = lb.getLocations();
-
- //try each datanode location of the block
- final int timeout = 3000*datanodes.length + dfsClientConf.getSocketTimeout();
- boolean done = false;
- for(int j = 0; !done && j < datanodes.length; j++) {
- DataOutputStream out = null;
- DataInputStream in = null;
-
- try {
- //connect to a datanode
- IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
- out = new DataOutputStream(new BufferedOutputStream(pair.out,
- smallBufferSize));
- in = new DataInputStream(pair.in);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("write to " + datanodes[j] + ": "
- + Op.BLOCK_CHECKSUM + ", block=" + block);
- }
- // get block MD5
- new Sender(out).blockChecksum(block, lb.getBlockToken());
-
- final BlockOpResponseProto reply =
- BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
-
- String logInfo = "for block " + block + " from datanode " + datanodes[j];
- DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
-
- OpBlockChecksumResponseProto checksumData =
- reply.getChecksumResponse();
-
- //read byte-per-checksum
- final int bpc = checksumData.getBytesPerCrc();
- if (i == 0) { //first block
- bytesPerCRC = bpc;
- }
- else if (bpc != bytesPerCRC) {
- throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
- + " but bytesPerCRC=" + bytesPerCRC);
- }
-
- //read crc-per-block
- final long cpb = checksumData.getCrcPerBlock();
- if (locatedblocks.size() > 1 && i == 0) {
- crcPerBlock = cpb;
- }
-
- //read md5
- final MD5Hash md5 = new MD5Hash(
- checksumData.getMd5().toByteArray());
- md5.write(md5out);
-
- // read crc-type
- final DataChecksum.Type ct;
- if (checksumData.hasCrcType()) {
- ct = PBHelperClient.convert(checksumData
- .getCrcType());
- } else {
- LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
- "inferring checksum by reading first byte");
- ct = inferChecksumTypeByReading(lb, datanodes[j]);
- }
-
- if (i == 0) { // first block
- crcType = ct;
- } else if (crcType != DataChecksum.Type.MIXED
- && crcType != ct) {
- // if crc types are mixed in a file
- crcType = DataChecksum.Type.MIXED;
- }
-
- done = true;
-
- if (LOG.isDebugEnabled()) {
- if (i == 0) {
- LOG.debug("set bytesPerCRC=" + bytesPerCRC
- + ", crcPerBlock=" + crcPerBlock);
- }
- LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
- }
- } catch (InvalidBlockTokenException ibte) {
- if (i > lastRetriedIndex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
- + "for file " + src + " for block " + block
- + " from datanode " + datanodes[j]
- + ". Will retry the block once.");
- }
- lastRetriedIndex = i;
- done = true; // actually it's not done; but we'll retry
- i--; // repeat at i-th block
- refetchBlocks = true;
- break;
- }
- } catch (IOException ie) {
- LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
- } finally {
- IOUtils.closeStream(in);
- IOUtils.closeStream(out);
- }
- }
-
- if (!done) {
- throw new IOException("Fail to get block MD5 for " + block);
- }
- }
-
- //compute file MD5
- final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
- switch (crcType) {
- case CRC32:
- return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
- crcPerBlock, fileMD5);
- case CRC32C:
- return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
- crcPerBlock, fileMD5);
- default:
- // If there is no block allocated for the file,
- // return one with the magic entry that matches what previous
- // hdfs versions return.
- if (locatedblocks.size() == 0) {
- return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
- }
-
- // we should never get here since the validity was checked
- // when getCrcType() was called above.
- return null;
- }
- }
-
- /**
- * Connect to the given datanode's datantrasfer port, and return
- * the resulting IOStreamPair. This includes encryption wrapping, etc.
- */
- private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
- LocatedBlock lb) throws IOException {
- boolean success = false;
- Socket sock = null;
- try {
- sock = socketFactory.createSocket();
- String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to datanode " + dnAddr);
- }
- NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
- sock.setSoTimeout(timeout);
-
- OutputStream unbufOut = NetUtils.getOutputStream(sock);
- InputStream unbufIn = NetUtils.getInputStream(sock);
- IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
- lb.getBlockToken(), dn);
- success = true;
- return ret;
- } finally {
- if (!success) {
- IOUtils.closeSocket(sock);
- }
- }
- }
-
- /**
- * Infer the checksum type for a replica by sending an OP_READ_BLOCK
- * for the first byte of that replica. This is used for compatibility
- * with older HDFS versions which did not include the checksum type in
- * OpBlockChecksumResponseProto.
- *
- * @param lb the located block
- * @param dn the connected datanode
- * @return the inferred checksum type
- * @throws IOException if an error occurs
- */
- private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
- throws IOException {
- IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb);
-
- try {
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
- smallBufferSize));
- DataInputStream in = new DataInputStream(pair.in);
-
- new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
- 0, 1, true, CachingStrategy.newDefaultStrategy());
- final BlockOpResponseProto reply =
- BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
- String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn;
- DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
-
- return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
- } finally {
- IOUtils.cleanup(null, pair.in, pair.out);
- }
- }
-
- /**
- * Set permissions to a file or directory.
- * @param src path name.
- * @param permission permission to set to
- *
- * @see ClientProtocol#setPermission(String, FsPermission)
- */
- public void setPermission(String src, FsPermission permission)
- throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("setPermission", src);
- try {
- namenode.setPermission(src, permission);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- SafeModeException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Set file or directory owner.
- * @param src path name.
- * @param username user id.
- * @param groupname user group.
- *
- * @see ClientProtocol#setOwner(String, String, String)
- */
- public void setOwner(String src, String username, String groupname)
- throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("setOwner", src);
- try {
- namenode.setOwner(src, username, groupname);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- FileNotFoundException.class,
- SafeModeException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class);
- } finally {
- scope.close();
- }
- }
-
- private long[] callGetStats() throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("getStats", traceSampler);
- try {
- return namenode.getStats();
- } finally {
- scope.close();
- }
- }
-
- /**
- * @see ClientProtocol#getStats()
- */
- public FsStatus getDiskStatus() throws IOException {
- long rawNums[] = callGetStats();
- return new FsStatus(rawNums[0], rawNums[1], rawNums[2]);
- }
-
- /**
- * Returns count of blocks with no good replicas left. Normally should be
- * zero.
- * @throws IOException
- */
- public long getMissingBlocksCount() throws IOException {
- return callGetStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX];
- }
-
- /**
- * Returns count of blocks with replication factor 1 and have
- * lost the only replica.
- * @throws IOException
- */
- public long getMissingReplOneBlocksCount() throws IOException {
- return callGetStats()[ClientProtocol.
- GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX];
- }
-
- /**
- * Returns count of blocks with one of more replica missing.
- * @throws IOException
- */
- public long getUnderReplicatedBlocksCount() throws IOException {
- return callGetStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX];
- }
-
- /**
- * Returns count of blocks with at least one replica marked corrupt.
- * @throws IOException
- */
- public long getCorruptBlocksCount() throws IOException {
- return callGetStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX];
- }
-
- /**
- * @return a list in which each entry describes a corrupt file/block
- * @throws IOException
- */
- public CorruptFileBlocks listCorruptFileBlocks(String path,
- String cookie)
- throws IOException {
- checkOpen();
- TraceScope scope = getPathTraceScope("listCorruptFileBlocks", path);
- try {
- return namenode.listCorruptFileBlocks(path, cookie);
- } finally {
- scope.close();
- }
- }
-
- public DatanodeInfo[] datanodeReport(DatanodeReportType type)
- throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("datanodeReport", traceSampler);
- try {
- return namenode.getDatanodeReport(type);
- } finally {
- scope.close();
- }
- }
-
- public DatanodeStorageReport[] getDatanodeStorageReport(
- DatanodeReportType type) throws IOException {
- checkOpen();
- TraceScope scope =
- Trace.startSpan("datanodeStorageReport", traceSampler);
- try {
- return namenode.getDatanodeStorageReport(type);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Enter, leave or get safe mode.
- *
- * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction,boolean)
- */
- public boolean setSafeMode(SafeModeAction action) throws IOException {
- checkOpen();
- return setSafeMode(action, false);
- }
-
- /**
- * Enter, leave or get safe mode.
- *
- * @param action
- * One of SafeModeAction.GET, SafeModeAction.ENTER and
- * SafeModeActiob.LEAVE
- * @param isChecked
- * If true, then check only active namenode's safemode status, else
- * check first namenode's status.
- * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
- */
- public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{
- TraceScope scope =
- Trace.startSpan("setSafeMode", traceSampler);
- try {
- return namenode.setSafeMode(action, isChecked);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Create one snapshot.
- *
- * @param snapshotRoot The directory where the snapshot is to be taken
- * @param snapshotName Name of the snapshot
- * @return the snapshot path.
- * @see ClientProtocol#createSnapshot(String, String)
- */
- public String createSnapshot(String snapshotRoot, String snapshotName)
- throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("createSnapshot", traceSampler);
- try {
- return namenode.createSnapshot(snapshotRoot, snapshotName);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException();
- } finally {
- scope.close();
- }
- }
-
- /**
- * Delete a snapshot of a snapshottable directory.
- *
- * @param snapshotRoot The snapshottable directory that the
- * to-be-deleted snapshot belongs to
- * @param snapshotName The name of the to-be-deleted snapshot
- * @throws IOException
- * @see ClientProtocol#deleteSnapshot(String, String)
- */
- public void deleteSnapshot(String snapshotRoot, String snapshotName)
- throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("deleteSnapshot", traceSampler);
- try {
- namenode.deleteSnapshot(snapshotRoot, snapshotName);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException();
- } finally {
- scope.close();
- }
- }
-
- /**
- * Rename a snapshot.
- * @param snapshotDir The directory path where the snapshot was taken
- * @param snapshotOldName Old name of the snapshot
- * @param snapshotNewName New name of the snapshot
- * @throws IOException
- * @see ClientProtocol#renameSnapshot(String, String, String)
- */
- public void renameSnapshot(String snapshotDir, String snapshotOldName,
- String snapshotNewName) throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("renameSnapshot", traceSampler);
- try {
- namenode.renameSnapshot(snapshotDir, snapshotOldName, snapshotNewName);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException();
- } finally {
- scope.close();
- }
- }
-
- /**
- * Get all the current snapshottable directories.
- * @return All the current snapshottable directories
- * @throws IOException
- * @see ClientProtocol#getSnapshottableDirListing()
- */
- public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
- throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("getSnapshottableDirListing",
- traceSampler);
- try {
- return namenode.getSnapshottableDirListing();
- } catch(RemoteException re) {
- throw re.unwrapRemoteException();
- } finally {
- scope.close();
- }
- }
-
- /**
- * Allow snapshot on a directory.
- *
- * @see ClientProtocol#allowSnapshot(String snapshotRoot)
- */
- public void allowSnapshot(String snapshotRoot) throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("allowSnapshot", traceSampler);
- try {
- namenode.allowSnapshot(snapshotRoot);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException();
- } finally {
- scope.close();
- }
- }
-
- /**
- * Disallow snapshot on a directory.
- *
- * @see ClientProtocol#disallowSnapshot(String snapshotRoot)
- */
- public void disallowSnapshot(String snapshotRoot) throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("disallowSnapshot", traceSampler);
- try {
- namenode.disallowSnapshot(snapshotRoot);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException();
- } finally {
- scope.close();
- }
- }
-
- /**
- * Get the difference between two snapshots, or between a snapshot and the
- * current tree of a directory.
- * @see ClientProtocol#getSnapshotDiffReport(String, String, String)
- */
- public SnapshotDiffReport getSnapshotDiffReport(String snapshotDir,
- String fromSnapshot, String toSnapshot) throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("getSnapshotDiffReport", traceSampler);
- try {
- return namenode.getSnapshotDiffReport(snapshotDir,
- fromSnapshot, toSnapshot);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException();
- } finally {
- scope.close();
- }
- }
-
- public long addCacheDirective(
- CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("addCacheDirective", traceSampler);
- try {
- return namenode.addCacheDirective(info, flags);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException();
- } finally {
- scope.close();
- }
- }
-
- public void modifyCacheDirective(
- CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
- checkOpen();
- TraceScope scope = Trace.startSpan("modifyCacheDirect
<TRUNCATED>