You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/18 00:35:31 UTC
[03/50] [abbrv] hadoop git commit: HDFS-8100. Refactor DFSClient.Conf
to a standalone class and separates short-circuit related conf to
ShortCircuitConf.
HDFS-8100. Refactor DFSClient.Conf to a standalone class and separates short-circuit related conf to ShortCircuitConf.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ffe48015
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ffe48015
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ffe48015
Branch: refs/heads/YARN-2928
Commit: ffe48015c341d5db908e8efed96876bd5ad53c1a
Parents: f5d1172
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri Apr 10 14:48:45 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Fri Apr 17 15:29:39 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../apache/hadoop/hdfs/BlockReaderFactory.java | 31 +-
.../apache/hadoop/hdfs/BlockReaderLocal.java | 10 +-
.../hadoop/hdfs/BlockReaderLocalLegacy.java | 27 +-
.../org/apache/hadoop/hdfs/ClientContext.java | 75 +-
.../java/org/apache/hadoop/hdfs/DFSClient.java | 446 +----------
.../org/apache/hadoop/hdfs/DFSInputStream.java | 21 +-
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 22 +-
.../org/apache/hadoop/hdfs/DataStreamer.java | 34 +-
.../hadoop/hdfs/DistributedFileSystem.java | 4 +-
.../org/apache/hadoop/hdfs/LeaseRenewer.java | 15 +-
.../org/apache/hadoop/hdfs/NameNodeProxies.java | 10 +-
.../hadoop/hdfs/client/impl/DfsClientConf.java | 738 +++++++++++++++++++
.../hdfs/shortcircuit/DomainSocketFactory.java | 8 +-
.../hdfs/shortcircuit/ShortCircuitCache.java | 12 +
.../hadoop/fs/TestEnhancedByteBufferAccess.java | 6 +-
.../hadoop/hdfs/TestBlockReaderLocal.java | 3 +-
.../hadoop/hdfs/TestDFSClientRetries.java | 2 +-
.../apache/hadoop/hdfs/TestDFSOutputStream.java | 3 +-
.../apache/hadoop/hdfs/TestLeaseRenewer.java | 13 +-
.../hadoop/hdfs/TestParallelReadUtil.java | 3 +-
.../blockmanagement/TestBlockTokenWithDFS.java | 3 +-
.../datanode/TestDataNodeVolumeFailure.java | 4 +-
.../shortcircuit/TestShortCircuitCache.java | 2 +-
24 files changed, 930 insertions(+), 565 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 333a1b1..c2f0363 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -415,6 +415,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8102. Separate webhdfs retry configuration keys from DFSConfigKeys.
(wheat9)
+ HDFS-8100. Refactor DFSClient.Conf to a standalone class and separates
+ short-circuit related conf to ShortCircuitConf. (szetszwo)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 8f33899..5175a87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -81,7 +83,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
static ShortCircuitReplicaCreator
createShortCircuitReplicaInfoCallback = null;
- private final DFSClient.Conf conf;
+ private final DfsClientConf conf;
/**
* Injects failures into specific operations during unit tests.
@@ -180,10 +182,10 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
*/
private int remainingCacheTries;
- public BlockReaderFactory(DFSClient.Conf conf) {
+ public BlockReaderFactory(DfsClientConf conf) {
this.conf = conf;
- this.failureInjector = conf.brfFailureInjector;
- this.remainingCacheTries = conf.nCachedConnRetry;
+ this.failureInjector = conf.getShortCircuitConf().brfFailureInjector;
+ this.remainingCacheTries = conf.getNumCachedConnRetry();
}
public BlockReaderFactory setFileName(String fileName) {
@@ -317,7 +319,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
BlockReader reader = null;
Preconditions.checkNotNull(configuration);
- if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) {
+ final ShortCircuitConf scConf = conf.getShortCircuitConf();
+ if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
if (clientContext.getUseLegacyBlockReaderLocal()) {
reader = getLegacyBlockReaderLocal();
if (reader != null) {
@@ -336,7 +339,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
}
}
}
- if (conf.domainSocketDataTraffic) {
+ if (scConf.isDomainSocketDataTraffic()) {
reader = getRemoteBlockReaderFromDomain();
if (reader != null) {
if (LOG.isTraceEnabled()) {
@@ -406,8 +409,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
"for short-circuit reads.");
}
if (pathInfo == null) {
- pathInfo = clientContext.getDomainSocketFactory().
- getPathInfo(inetSocketAddress, conf);
+ pathInfo = clientContext.getDomainSocketFactory()
+ .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
}
if (!pathInfo.getPathState().getUsableForShortCircuit()) {
PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
@@ -431,7 +434,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
"BlockReaderLocal via {}", this, pathInfo.getPath());
return null;
}
- return new BlockReaderLocal.Builder(conf).
+ return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
setFilename(fileName).
setBlock(block).
setStartOffset(startOffset).
@@ -604,8 +607,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
*/
private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
if (pathInfo == null) {
- pathInfo = clientContext.getDomainSocketFactory().
- getPathInfo(inetSocketAddress, conf);
+ pathInfo = clientContext.getDomainSocketFactory()
+ .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
}
if (!pathInfo.getPathState().getUsableForDataTransfer()) {
PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
@@ -744,7 +747,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
}
}
DomainSocket sock = clientContext.getDomainSocketFactory().
- createSocket(pathInfo, conf.socketTimeout);
+ createSocket(pathInfo, conf.getSocketTimeout());
if (sock == null) return null;
return new BlockReaderPeer(new DomainPeer(sock), false);
}
@@ -803,9 +806,9 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
@SuppressWarnings("deprecation")
private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
- if (conf.useLegacyBlockReader) {
+ if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
return RemoteBlockReader.newBlockReader(fileName,
- block, token, startOffset, length, conf.ioBufferSize,
+ block, token, startOffset, length, conf.getIoBufferSize(),
verifyChecksum, clientName, peer, datanode,
clientContext.getPeerCache(), cachingStrategy);
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index ab93441..d913f3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -27,14 +27,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DirectBufferPool;
import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
@@ -74,10 +74,10 @@ class BlockReaderLocal implements BlockReader {
private ExtendedBlock block;
private StorageType storageType;
- public Builder(Conf conf) {
+ public Builder(ShortCircuitConf conf) {
this.maxReadahead = Integer.MAX_VALUE;
- this.verifyChecksum = !conf.skipShortCircuitChecksums;
- this.bufferSize = conf.shortCircuitBufferSize;
+ this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
+ this.bufferSize = conf.getShortCircuitBufferSize();
}
public Builder setVerifyChecksum(boolean verifyChecksum) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index 0c9ec45..8df44f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -42,12 +44,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DirectBufferPool;
import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
@@ -180,12 +182,13 @@ class BlockReaderLocalLegacy implements BlockReader {
/**
* The only way this object can be instantiated.
*/
- static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf,
+ static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
UserGroupInformation userGroupInformation,
Configuration configuration, String file, ExtendedBlock blk,
Token<BlockTokenIdentifier> token, DatanodeInfo node,
long startOffset, long length, StorageType storageType)
throws IOException {
+ final ShortCircuitConf scConf = conf.getShortCircuitConf();
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort());
// check the cache first
@@ -195,8 +198,8 @@ class BlockReaderLocalLegacy implements BlockReader {
userGroupInformation = UserGroupInformation.getCurrentUser();
}
pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
- configuration, conf.socketTimeout, token,
- conf.connectToDnViaHostname, storageType);
+ configuration, conf.getSocketTimeout(), token,
+ conf.isConnectToDnViaHostname(), storageType);
}
// check to see if the file exists. It may so happen that the
@@ -208,8 +211,8 @@ class BlockReaderLocalLegacy implements BlockReader {
FileInputStream dataIn = null;
FileInputStream checksumIn = null;
BlockReaderLocalLegacy localBlockReader = null;
- boolean skipChecksumCheck = conf.skipShortCircuitChecksums ||
- storageType.isTransient();
+ final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums()
+ || storageType.isTransient();
try {
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
@@ -230,11 +233,11 @@ class BlockReaderLocalLegacy implements BlockReader {
new DataInputStream(checksumIn), blk);
long firstChunkOffset = startOffset
- (startOffset % checksum.getBytesPerChecksum());
- localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
+ localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
startOffset, length, pathinfo, checksum, true, dataIn,
firstChunkOffset, checksumIn);
} else {
- localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
+ localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
startOffset, length, pathinfo, dataIn);
}
} catch (IOException e) {
@@ -312,7 +315,7 @@ class BlockReaderLocalLegacy implements BlockReader {
return bufferSizeBytes / bytesPerChecksum;
}
- private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
+ private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
throws IOException {
@@ -321,7 +324,7 @@ class BlockReaderLocalLegacy implements BlockReader {
dataIn, startOffset, null);
}
- private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
+ private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
@@ -339,8 +342,8 @@ class BlockReaderLocalLegacy implements BlockReader {
this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
- int chunksPerChecksumRead = getSlowReadBufferNumChunks(
- conf.shortCircuitBufferSize, bytesPerChecksum);
+ final int chunksPerChecksumRead = getSlowReadBufferNumChunks(
+ conf.getShortCircuitBufferSize(), bytesPerChecksum);
slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
// Initially the buffers have nothing to read.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index af7c095..6359def 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -23,13 +23,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
/**
* ClientContext contains context information for a client.
@@ -99,59 +99,24 @@ public class ClientContext {
*/
private boolean printedConfWarning = false;
- private ClientContext(String name, Conf conf) {
- this.name = name;
- this.confString = confAsString(conf);
- this.shortCircuitCache = new ShortCircuitCache(
- conf.shortCircuitStreamsCacheSize,
- conf.shortCircuitStreamsCacheExpiryMs,
- conf.shortCircuitMmapCacheSize,
- conf.shortCircuitMmapCacheExpiryMs,
- conf.shortCircuitMmapCacheRetryTimeout,
- conf.shortCircuitCacheStaleThresholdMs,
- conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
- this.peerCache =
- new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
- this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs);
- this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
- this.domainSocketFactory = new DomainSocketFactory(conf);
-
- this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf);
- }
+ private ClientContext(String name, DfsClientConf conf) {
+ final ShortCircuitConf scConf = conf.getShortCircuitConf();
- public static String confAsString(Conf conf) {
- StringBuilder builder = new StringBuilder();
- builder.append("shortCircuitStreamsCacheSize = ").
- append(conf.shortCircuitStreamsCacheSize).
- append(", shortCircuitStreamsCacheExpiryMs = ").
- append(conf.shortCircuitStreamsCacheExpiryMs).
- append(", shortCircuitMmapCacheSize = ").
- append(conf.shortCircuitMmapCacheSize).
- append(", shortCircuitMmapCacheExpiryMs = ").
- append(conf.shortCircuitMmapCacheExpiryMs).
- append(", shortCircuitMmapCacheRetryTimeout = ").
- append(conf.shortCircuitMmapCacheRetryTimeout).
- append(", shortCircuitCacheStaleThresholdMs = ").
- append(conf.shortCircuitCacheStaleThresholdMs).
- append(", socketCacheCapacity = ").
- append(conf.socketCacheCapacity).
- append(", socketCacheExpiry = ").
- append(conf.socketCacheExpiry).
- append(", shortCircuitLocalReads = ").
- append(conf.shortCircuitLocalReads).
- append(", useLegacyBlockReaderLocal = ").
- append(conf.useLegacyBlockReaderLocal).
- append(", domainSocketDataTraffic = ").
- append(conf.domainSocketDataTraffic).
- append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = ").
- append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs).
- append(", keyProviderCacheExpiryMs = ").
- append(conf.keyProviderCacheExpiryMs);
-
- return builder.toString();
+ this.name = name;
+ this.confString = scConf.confAsString();
+ this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
+ this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
+ scConf.getSocketCacheExpiry());
+ this.keyProviderCache = new KeyProviderCache(
+ scConf.getKeyProviderCacheExpiryMs());
+ this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal();
+ this.domainSocketFactory = new DomainSocketFactory(scConf);
+
+ this.byteArrayManager = ByteArrayManager.newInstance(
+ conf.getWriteByteArrayManagerConf());
}
- public static ClientContext get(String name, Conf conf) {
+ public static ClientContext get(String name, DfsClientConf conf) {
ClientContext context;
synchronized(ClientContext.class) {
context = CACHES.get(name);
@@ -175,12 +140,12 @@ public class ClientContext {
public static ClientContext getFromConf(Configuration conf) {
return get(conf.get(DFSConfigKeys.DFS_CLIENT_CONTEXT,
DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
- new DFSClient.Conf(conf));
+ new DfsClientConf(conf));
}
- private void printConfWarningIfNeeded(Conf conf) {
+ private void printConfWarningIfNeeded(DfsClientConf conf) {
String existing = this.getConfString();
- String requested = confAsString(conf);
+ String requested = conf.getShortCircuitConf().confAsString();
if (!existing.equals(requested)) {
if (!printedConfWarning) {
printedConfWarning = true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index d43e7de..f79d160 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -18,48 +18,11 @@
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -109,7 +72,6 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.CacheFlag;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -136,9 +98,9 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.AclException;
@@ -195,14 +157,12 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
-import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
@@ -250,7 +210,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
private final Configuration conf;
- private final Conf dfsClientConf;
+ private final DfsClientConf dfsClientConf;
final ClientProtocol namenode;
/* The service used for delegation tokens */
private Text dtService;
@@ -278,307 +238,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
private final Sampler<?> traceSampler;
- /**
- * DFSClient configuration
- */
- public static class Conf {
- final int hdfsTimeout; // timeout value for a DFS operation.
-
- final int maxFailoverAttempts;
- final int maxRetryAttempts;
- final int failoverSleepBaseMillis;
- final int failoverSleepMaxMillis;
- final int maxBlockAcquireFailures;
- final int confTime;
- final int ioBufferSize;
- final ChecksumOpt defaultChecksumOpt;
- final int writePacketSize;
- final int writeMaxPackets;
- final ByteArrayManager.Conf writeByteArrayManagerConf;
- final int socketTimeout;
- final int socketCacheCapacity;
- final long socketCacheExpiry;
- final long excludedNodesCacheExpiry;
- /** Wait time window (in msec) if BlockMissingException is caught */
- final int timeWindow;
- final int nCachedConnRetry;
- final int nBlockWriteRetry;
- final int nBlockWriteLocateFollowingRetry;
- final int blockWriteLocateFollowingInitialDelayMs;
- final long defaultBlockSize;
- final long prefetchSize;
- final short defaultReplication;
- final String taskId;
- final FsPermission uMask;
- final boolean connectToDnViaHostname;
- final boolean getHdfsBlocksMetadataEnabled;
- final int getFileBlockStorageLocationsNumThreads;
- final int getFileBlockStorageLocationsTimeoutMs;
- final int retryTimesForGetLastBlockLength;
- final int retryIntervalForGetLastBlockLength;
- final long datanodeRestartTimeout;
- final long dfsclientSlowIoWarningThresholdMs;
-
- final boolean useLegacyBlockReader;
- final boolean useLegacyBlockReaderLocal;
- final String domainSocketPath;
- final boolean skipShortCircuitChecksums;
- final int shortCircuitBufferSize;
- final boolean shortCircuitLocalReads;
- final boolean domainSocketDataTraffic;
- final int shortCircuitStreamsCacheSize;
- final long shortCircuitStreamsCacheExpiryMs;
- final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
-
- final boolean shortCircuitMmapEnabled;
- final int shortCircuitMmapCacheSize;
- final long shortCircuitMmapCacheExpiryMs;
- final long shortCircuitMmapCacheRetryTimeout;
- final long shortCircuitCacheStaleThresholdMs;
-
- final long keyProviderCacheExpiryMs;
- public BlockReaderFactory.FailureInjector brfFailureInjector =
- new BlockReaderFactory.FailureInjector();
-
- public Conf(Configuration conf) {
- // The hdfsTimeout is currently the same as the ipc timeout
- hdfsTimeout = Client.getTimeout(conf);
- maxFailoverAttempts = conf.getInt(
- DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
- DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
- maxRetryAttempts = conf.getInt(
- HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
- HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
- failoverSleepBaseMillis = conf.getInt(
- DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
- DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
- failoverSleepMaxMillis = conf.getInt(
- DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
- DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
-
- maxBlockAcquireFailures = conf.getInt(
- DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
- DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
- confTime = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
- HdfsServerConstants.WRITE_TIMEOUT);
- ioBufferSize = conf.getInt(
- CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
- CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
- defaultChecksumOpt = getChecksumOptFromConf(conf);
- socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
- HdfsServerConstants.READ_TIMEOUT);
- /** dfs.write.packet.size is an internal config variable */
- writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
- DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
- writeMaxPackets = conf.getInt(
- DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY,
- DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT);
-
- final boolean byteArrayManagerEnabled = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY,
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT);
- if (!byteArrayManagerEnabled) {
- writeByteArrayManagerConf = null;
- } else {
- final int countThreshold = conf.getInt(
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY,
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT);
- final int countLimit = conf.getInt(
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY,
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT);
- final long countResetTimePeriodMs = conf.getLong(
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY,
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
- writeByteArrayManagerConf = new ByteArrayManager.Conf(
- countThreshold, countLimit, countResetTimePeriodMs);
- }
-
-
- defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
- DFS_BLOCK_SIZE_DEFAULT);
- defaultReplication = (short) conf.getInt(
- DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
- taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
- socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
- DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
- socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
- DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
- excludedNodesCacheExpiry = conf.getLong(
- DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
- DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
- prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
- 10 * defaultBlockSize);
- timeWindow = conf.getInt(
- HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY,
- HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT);
- nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
- DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
- nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
- DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
- nBlockWriteLocateFollowingRetry = conf.getInt(
- DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
- DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
- blockWriteLocateFollowingInitialDelayMs = conf.getInt(
- DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
- DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT);
- uMask = FsPermission.getUMask(conf);
- connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
- DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
- getHdfsBlocksMetadataEnabled = conf.getBoolean(
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
- getFileBlockStorageLocationsNumThreads = conf.getInt(
- DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
- DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
- getFileBlockStorageLocationsTimeoutMs = conf.getInt(
- DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
- DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
- retryTimesForGetLastBlockLength = conf.getInt(
- HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
- HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
- retryIntervalForGetLastBlockLength = conf.getInt(
- HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
- HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
-
- useLegacyBlockReader = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
- useLegacyBlockReaderLocal = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
- shortCircuitLocalReads = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
- domainSocketDataTraffic = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
- domainSocketPath = conf.getTrimmed(
- DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
- DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
-
- if (BlockReaderLocal.LOG.isDebugEnabled()) {
- BlockReaderLocal.LOG.debug(
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
- + " = " + useLegacyBlockReaderLocal);
- BlockReaderLocal.LOG.debug(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
- + " = " + shortCircuitLocalReads);
- BlockReaderLocal.LOG.debug(
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
- + " = " + domainSocketDataTraffic);
- BlockReaderLocal.LOG.debug(
- DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
- + " = " + domainSocketPath);
- }
-
- skipShortCircuitChecksums = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
- shortCircuitBufferSize = conf.getInt(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
- shortCircuitStreamsCacheSize = conf.getInt(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
- shortCircuitStreamsCacheExpiryMs = conf.getLong(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
- shortCircuitMmapEnabled = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED,
- DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT);
- shortCircuitMmapCacheSize = conf.getInt(
- DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
- DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
- shortCircuitMmapCacheExpiryMs = conf.getLong(
- DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
- DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
- shortCircuitMmapCacheRetryTimeout = conf.getLong(
- DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
- DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT);
- shortCircuitCacheStaleThresholdMs = conf.getLong(
- DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
- DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
- shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
- DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
- DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
-
- datanodeRestartTimeout = conf.getLong(
- DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
- DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
- dfsclientSlowIoWarningThresholdMs = conf.getLong(
- DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
- DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
-
- keyProviderCacheExpiryMs = conf.getLong(
- DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
- DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
- }
-
- public boolean isUseLegacyBlockReaderLocal() {
- return useLegacyBlockReaderLocal;
- }
-
- public String getDomainSocketPath() {
- return domainSocketPath;
- }
-
- public boolean isShortCircuitLocalReads() {
- return shortCircuitLocalReads;
- }
-
- public boolean isDomainSocketDataTraffic() {
- return domainSocketDataTraffic;
- }
-
- private DataChecksum.Type getChecksumType(Configuration conf) {
- final String checksum = conf.get(
- DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
- DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
- try {
- return DataChecksum.Type.valueOf(checksum);
- } catch(IllegalArgumentException iae) {
- LOG.warn("Bad checksum type: " + checksum + ". Using default "
- + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
- return DataChecksum.Type.valueOf(
- DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
- }
- }
-
- // Construct a checksum option from conf
- private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
- DataChecksum.Type type = getChecksumType(conf);
- int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
- DFS_BYTES_PER_CHECKSUM_DEFAULT);
- return new ChecksumOpt(type, bytesPerChecksum);
- }
-
- // create a DataChecksum with the default option.
- private DataChecksum createChecksum() throws IOException {
- return createChecksum(null);
- }
-
- private DataChecksum createChecksum(ChecksumOpt userOpt) {
- // Fill in any missing field with the default.
- ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
- defaultChecksumOpt, userOpt);
- DataChecksum dataChecksum = DataChecksum.newDataChecksum(
- myOpt.getChecksumType(),
- myOpt.getBytesPerChecksum());
- if (dataChecksum == null) {
- throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
- + userOpt + ", default=" + defaultChecksumOpt
- + ", effective=null");
- }
- return dataChecksum;
- }
-
- @VisibleForTesting
- public int getBlockWriteLocateFollowingInitialDelayMs() {
- return blockWriteLocateFollowingInitialDelayMs;
- }
- }
-
- public Conf getConf() {
+ public DfsClientConf getConf() {
return dfsClientConf;
}
@@ -642,10 +302,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SpanReceiverHost.getInstance(conf);
traceSampler = new SamplerBuilder(TraceUtils.wrapHadoopConf(conf)).build();
// Copy only the required DFSClient configuration
- this.dfsClientConf = new Conf(conf);
- if (this.dfsClientConf.useLegacyBlockReaderLocal) {
- LOG.debug("Using legacy short-circuit local reads.");
- }
+ this.dfsClientConf = new DfsClientConf(conf);
this.conf = conf;
this.stats = stats;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
@@ -654,7 +311,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.ugi = UserGroupInformation.getCurrentUser();
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
- this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
+ this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
int numResponseToDrop = conf.getInt(
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
@@ -779,30 +436,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
/**
- * Return the number of times the client should go back to the namenode
- * to retrieve block locations when reading.
- */
- int getMaxBlockAcquireFailures() {
- return dfsClientConf.maxBlockAcquireFailures;
- }
-
- /**
* Return the timeout that clients should use when writing to datanodes.
* @param numNodes the number of nodes in the pipeline.
*/
int getDatanodeWriteTimeout(int numNodes) {
- return (dfsClientConf.confTime > 0) ?
- (dfsClientConf.confTime + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
+ final int t = dfsClientConf.getDatanodeSocketWriteTimeout();
+ return t > 0? t + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
}
int getDatanodeReadTimeout(int numNodes) {
- return dfsClientConf.socketTimeout > 0 ?
- (HdfsServerConstants.READ_TIMEOUT_EXTENSION * numNodes +
- dfsClientConf.socketTimeout) : 0;
- }
-
- int getHdfsTimeout() {
- return dfsClientConf.hdfsTimeout;
+ final int t = dfsClientConf.getSocketTimeout();
+ return t > 0? HdfsServerConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
}
@VisibleForTesting
@@ -992,14 +636,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
/**
- * Get the default block size for this cluster
- * @return the default block size in bytes
- */
- public long getDefaultBlockSize() {
- return dfsClientConf.defaultBlockSize;
- }
-
- /**
* @see ClientProtocol#getPreferredBlockSize(String)
*/
public long getBlockSize(String f) throws IOException {
@@ -1211,13 +847,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
namenode.reportBadBlocks(blocks);
}
- public short getDefaultReplication() {
- return dfsClientConf.defaultReplication;
- }
-
public LocatedBlocks getLocatedBlocks(String src, long start)
throws IOException {
- return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
+ return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
}
/*
@@ -1319,7 +951,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public BlockStorageLocation[] getBlockStorageLocations(
List<BlockLocation> blockLocations) throws IOException,
UnsupportedOperationException, InvalidBlockTokenException {
- if (!getConf().getHdfsBlocksMetadataEnabled) {
+ if (!getConf().isHdfsBlocksMetadataEnabled()) {
throw new UnsupportedOperationException("Datanode-side support for " +
"getVolumeBlockLocations() must also be enabled in the client " +
"configuration.");
@@ -1356,9 +988,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
try {
metadatas = BlockStorageLocationUtil.
queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
- getConf().getFileBlockStorageLocationsNumThreads,
- getConf().getFileBlockStorageLocationsTimeoutMs,
- getConf().connectToDnViaHostname);
+ getConf().getFileBlockStorageLocationsNumThreads(),
+ getConf().getFileBlockStorageLocationsTimeoutMs(),
+ getConf().isConnectToDnViaHostname());
if (LOG.isTraceEnabled()) {
LOG.trace("metadata returned: "
+ Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
@@ -1512,7 +1144,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public DFSInputStream open(String src)
throws IOException, UnresolvedLinkException {
- return open(src, dfsClientConf.ioBufferSize, true, null);
+ return open(src, dfsClientConf.getIoBufferSize(), true, null);
}
/**
@@ -1563,8 +1195,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public OutputStream create(String src, boolean overwrite)
throws IOException {
- return create(src, overwrite, dfsClientConf.defaultReplication,
- dfsClientConf.defaultBlockSize, null);
+ return create(src, overwrite, dfsClientConf.getDefaultReplication(),
+ dfsClientConf.getDefaultBlockSize(), null);
}
/**
@@ -1574,8 +1206,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public OutputStream create(String src,
boolean overwrite,
Progressable progress) throws IOException {
- return create(src, overwrite, dfsClientConf.defaultReplication,
- dfsClientConf.defaultBlockSize, progress);
+ return create(src, overwrite, dfsClientConf.getDefaultReplication(),
+ dfsClientConf.getDefaultBlockSize(), progress);
}
/**
@@ -1596,7 +1228,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public OutputStream create(String src, boolean overwrite, short replication,
long blockSize, Progressable progress) throws IOException {
return create(src, overwrite, replication, blockSize, progress,
- dfsClientConf.ioBufferSize);
+ dfsClientConf.getIoBufferSize());
}
/**
@@ -1678,6 +1310,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
progress, buffersize, checksumOpt, null);
}
+ private FsPermission applyUMask(FsPermission permission) {
+ if (permission == null) {
+ permission = FsPermission.getFileDefault();
+ }
+ return permission.applyUMask(dfsClientConf.getUMask());
+ }
+
/**
* Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
* Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
@@ -1698,10 +1337,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
ChecksumOpt checksumOpt,
InetSocketAddress[] favoredNodes) throws IOException {
checkOpen();
- if (permission == null) {
- permission = FsPermission.getFileDefault();
- }
- FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
+ final FsPermission masked = applyUMask(permission);
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
@@ -1783,8 +1419,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throws IOException {
TraceScope scope = getPathTraceScope("createSymlink", target);
try {
- FsPermission dirPerm =
- FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
+ final FsPermission dirPerm = applyUMask(null);
namenode.createSymlink(target, link, dirPerm, createParent);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
@@ -1828,7 +1463,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
new EnumSetWritable<>(flag, CreateFlag.class));
return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize,
progress, blkWithStatus.getLastBlock(),
- blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(),
+ blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(null),
favoredNodes);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
@@ -2253,7 +1888,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final DatanodeInfo[] datanodes = lb.getLocations();
//try each datanode location of the block
- final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout;
+ final int timeout = 3000*datanodes.length + dfsClientConf.getSocketTimeout();
boolean done = false;
for(int j = 0; !done && j < datanodes.length; j++) {
DataOutputStream out = null;
@@ -2391,7 +2026,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
Socket sock = null;
try {
sock = socketFactory.createSocket();
- String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname);
+ String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
@@ -2424,7 +2059,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
throws IOException {
- IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb);
+ IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb);
try {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
@@ -2979,10 +2614,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public boolean mkdirs(String src, FsPermission permission,
boolean createParent) throws IOException {
- if (permission == null) {
- permission = FsPermission.getDefault();
- }
- FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
+ final FsPermission masked = applyUMask(permission);
return primitiveMkdir(src, masked, createParent);
}
@@ -3004,8 +2636,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throws IOException {
checkOpen();
if (absPermission == null) {
- absPermission =
- FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
+ absPermission = applyUMask(null);
}
if(LOG.isDebugEnabled()) {
@@ -3447,14 +3078,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
Peer peer = null;
boolean success = false;
Socket sock = null;
+ final int socketTimeout = dfsClientConf.getSocketTimeout();
try {
sock = socketFactory.createSocket();
- NetUtils.connect(sock, addr,
- getRandomLocalInterfaceAddr(),
- dfsClientConf.socketTimeout);
+ NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
blockToken, datanodeId);
- peer.setReadTimeout(dfsClientConf.socketTimeout);
+ peer.setReadTimeout(socketTimeout);
success = true;
return peer;
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 41b9d50..dd0f6fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -52,14 +52,15 @@ import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
@@ -265,9 +266,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Grab the open-file info from namenode
*/
void openInfo() throws IOException, UnresolvedLinkException {
+ final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) {
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
- int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
+ int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
@@ -277,7 +279,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
- waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
+ waitFor(conf.getRetryIntervalForGetLastBlockLength());
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
} else {
break;
@@ -346,13 +348,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
assert locatedblock != null : "LocatedBlock cannot be null";
int replicaNotFoundCount = locatedblock.getLocations().length;
+ final DfsClientConf conf = dfsClient.getConf();
for(DatanodeInfo datanode : locatedblock.getLocations()) {
ClientDatanodeProtocol cdp = null;
try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode,
- dfsClient.getConfiguration(), dfsClient.getConf().socketTimeout,
- dfsClient.getConf().connectToDnViaHostname, locatedblock);
+ dfsClient.getConfiguration(), conf.getSocketTimeout(),
+ conf.isConnectToDnViaHostname(), locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@@ -938,7 +941,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
- if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
+ if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
String description = "Could not obtain block: " + blockInfo;
DFSClient.LOG.warn(description + errMsg
+ ". Throwing a BlockMissingException");
@@ -963,7 +966,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// alleviating the request rate from the server. Similarly the 3rd retry
// will wait 6000ms grace period before retry and the waiting window is
// expanded to 9000ms.
- final int timeWindow = dfsClient.getConf().timeWindow;
+ final int timeWindow = dfsClient.getConf().getTimeWindow();
double waitTime = timeWindow * failures + // grace period for the last round of attempt
timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
@@ -1012,7 +1015,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
", ignoredNodes = " + ignoredNodes);
}
final String dnAddr =
- chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
+ chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
@@ -1706,7 +1709,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
ByteBuffer buffer = null;
- if (dfsClient.getConf().shortCircuitMmapEnabled) {
+ if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) {
buffer = tryReadZeroCopy(maxLength, opts);
}
if (buffer != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index f6733e3..8cde274 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -33,10 +33,11 @@ import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -211,7 +212,7 @@ public class DFSOutputStream extends FSOutputSummer
this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
- computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
+ computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
cachingStrategy, byteArrayManager);
@@ -297,7 +298,7 @@ public class DFSOutputStream extends FSOutputSummer
adjustPacketChunkSize(stat);
streamer.setPipelineInConstruction(lastBlock);
} else {
- computePacketChunkSize(dfsClient.getConf().writePacketSize,
+ computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum);
streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager);
@@ -334,7 +335,8 @@ public class DFSOutputStream extends FSOutputSummer
// that expected size of of a packet, then create
// smaller size packet.
//
- computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock),
+ computePacketChunkSize(
+ Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock),
bytesPerChecksum);
}
}
@@ -445,7 +447,7 @@ public class DFSOutputStream extends FSOutputSummer
if (!streamer.getAppendChunk()) {
int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()),
- dfsClient.getConf().writePacketSize);
+ dfsClient.getConf().getWritePacketSize());
computePacketChunkSize(psize, bytesPerChecksum);
}
}
@@ -717,7 +719,7 @@ public class DFSOutputStream extends FSOutputSummer
return;
}
streamer.setLastException(new IOException("Lease timeout of "
- + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
+ + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
closeThreads(true);
dfsClient.endFileLease(fileId);
}
@@ -806,15 +808,15 @@ public class DFSOutputStream extends FSOutputSummer
// be called during unit tests
protected void completeFile(ExtendedBlock last) throws IOException {
long localstart = Time.monotonicNow();
- long sleeptime = dfsClient.getConf().
- blockWriteLocateFollowingInitialDelayMs;
+ final DfsClientConf conf = dfsClient.getConf();
+ long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
boolean fileComplete = false;
- int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
+ int retries = conf.getNumBlockWriteLocateFollowingRetry();
while (!fileComplete) {
fileComplete =
dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
if (!fileComplete) {
- final int hdfsTimeout = dfsClient.getHdfsTimeout();
+ final int hdfsTimeout = conf.getHdfsTimeout();
if (!dfsClient.clientRunning
|| (hdfsTimeout > 0
&& localstart + hdfsTimeout < Time.monotonicNow())) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 0c6b4a3..405f775 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -84,6 +85,7 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope;
+
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -123,15 +125,15 @@ class DataStreamer extends Daemon {
*/
static Socket createSocketForPipeline(final DatanodeInfo first,
final int length, final DFSClient client) throws IOException {
- final String dnAddr = first.getXferAddr(
- client.getConf().connectToDnViaHostname);
+ final DfsClientConf conf = client.getConf();
+ final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
final Socket sock = client.socketFactory.createSocket();
final int timeout = client.getDatanodeReadTimeout(length);
- NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout);
+ NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
sock.setSoTimeout(timeout);
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
if(DFSClient.LOG.isDebugEnabled()) {
@@ -244,7 +246,7 @@ class DataStreamer extends Daemon {
this.byteArrayManager = byteArrayManage;
isLazyPersistFile = isLazyPersist(stat);
this.dfsclientSlowLogThresholdMs =
- dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
+ dfsClient.getConf().getSlowIoWarningThresholdMs();
excludedNodes = initExcludedNodes();
}
@@ -368,6 +370,7 @@ class DataStreamer extends Daemon {
doSleep = processDatanodeError();
}
+ final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.monotonicNow();
@@ -375,8 +378,8 @@ class DataStreamer extends Daemon {
&& dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING &&
- now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
- long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
+ now - lastPacket < halfSocketTimeout)) || doSleep ) {
+ long timeout = halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
@@ -627,7 +630,7 @@ class DataStreamer extends Daemon {
boolean firstWait = true;
try {
while (!streamerClosed && dataQueue.size() + ackQueue.size() >
- dfsClient.getConf().writeMaxPackets) {
+ dfsClient.getConf().getWriteMaxPackets()) {
if (firstWait) {
Span span = Trace.currentSpan();
if (span != null) {
@@ -842,7 +845,7 @@ class DataStreamer extends Daemon {
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) &&
shouldWaitForRestart(i)) {
- restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+ restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
+ Time.monotonicNow();
setRestartingNodeIndex(i);
String message = "A datanode is restarting: " + targets[i];
@@ -1158,7 +1161,7 @@ class DataStreamer extends Daemon {
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
// interval until timeout or success.
- long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
+ long delay = Math.min(dfsClient.getConf().getDatanodeRestartTimeout(),
4000L);
try {
Thread.sleep(delay);
@@ -1311,7 +1314,7 @@ class DataStreamer extends Daemon {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
StorageType[] storageTypes = null;
- int count = dfsClient.getConf().nBlockWriteRetry;
+ int count = dfsClient.getConf().getNumBlockWriteRetry();
boolean success = false;
ExtendedBlock oldBlock = block;
do {
@@ -1471,7 +1474,7 @@ class DataStreamer extends Daemon {
}
// Check whether there is a restart worth waiting for.
if (checkRestart && shouldWaitForRestart(errorIndex)) {
- restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+ restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
+ Time.monotonicNow();
restartingNodeIndex.set(errorIndex);
errorIndex = -1;
@@ -1524,9 +1527,9 @@ class DataStreamer extends Daemon {
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
- int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
- long sleeptime = dfsClient.getConf().
- blockWriteLocateFollowingInitialDelayMs;
+ final DfsClientConf conf = dfsClient.getConf();
+ int retries = conf.getNumBlockWriteLocateFollowingRetry();
+ long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
while (true) {
long localstart = Time.monotonicNow();
while (true) {
@@ -1674,7 +1677,8 @@ class DataStreamer extends Daemon {
private LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes() {
return CacheBuilder.newBuilder().expireAfterWrite(
- dfsClient.getConf().excludedNodesCacheExpiry, TimeUnit.MILLISECONDS)
+ dfsClient.getConf().getExcludedNodesCacheExpiry(),
+ TimeUnit.MILLISECONDS)
.removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
@Override
public void onRemoval(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 3edab48..21f5107 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -160,12 +160,12 @@ public class DistributedFileSystem extends FileSystem {
@Override
public long getDefaultBlockSize() {
- return dfs.getDefaultBlockSize();
+ return dfs.getConf().getDefaultBlockSize();
}
@Override
public short getDefaultReplication() {
- return dfs.getDefaultReplication();
+ return dfs.getConf().getDefaultReplication();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
index 3e0abce..511bddb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
@@ -225,8 +225,9 @@ class LeaseRenewer {
dfsclients.add(dfsc);
//update renewal time
- if (dfsc.getHdfsTimeout() > 0) {
- final long half = dfsc.getHdfsTimeout()/2;
+ final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
+ if (hdfsTimeout > 0) {
+ final long half = hdfsTimeout/2;
if (half < renewal) {
this.renewal = half;
}
@@ -368,14 +369,12 @@ class LeaseRenewer {
}
//update renewal time
- if (renewal == dfsc.getHdfsTimeout()/2) {
+ if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
for(DFSClient c : dfsclients) {
- if (c.getHdfsTimeout() > 0) {
- final long timeout = c.getHdfsTimeout();
- if (timeout < min) {
- min = timeout;
- }
+ final int timeout = c.getConf().getHdfsTimeout();
+ if (timeout > 0 && timeout < min) {
+ min = timeout;
}
}
renewal = min/2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index ec2223f..5a929fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -40,8 +40,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -178,12 +178,12 @@ public class NameNodeProxies {
UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
} else {
// HA case
- Conf config = new Conf(conf);
+ DfsClientConf config = new DfsClientConf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
- RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
- config.maxRetryAttempts, config.failoverSleepBaseMillis,
- config.failoverSleepMaxMillis));
+ RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(),
+ config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(),
+ config.getFailoverSleepMaxMillis()));
Text dtService;
if (failoverProxyProvider.useLogicalURI()) {