You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/06/24 00:25:00 UTC
svn commit: r1139097 - in /hadoop/common/trunk/hdfs: CHANGES.txt
src/java/org/apache/hadoop/hdfs/DFSClient.java
src/java/org/apache/hadoop/hdfs/DFSInputStream.java
src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Author: szetszwo
Date: Thu Jun 23 22:24:59 2011
New Revision: 1139097
URL: http://svn.apache.org/viewvc?rev=1139097&view=rev
Log:
HDFS-2092. Remove some object references to Configuration in DFSClient. Contributed by Bharath Mundlapudi
Modified:
hadoop/common/trunk/hdfs/CHANGES.txt
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1139097&r1=1139096&r2=1139097&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Thu Jun 23 22:24:59 2011
@@ -523,6 +523,9 @@ Trunk (unreleased changes)
HDFS-2100. Improve TestStorageRestore. (atm)
+ HDFS-2092. Remove some object references to Configuration in DFSClient.
+ (Bharath Mundlapudi via szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1139097&r1=1139096&r2=1139097&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Jun 23 22:24:59 2011
@@ -128,17 +128,85 @@ public class DFSClient implements FSCons
static Random r = new Random();
final String clientName;
Configuration conf;
- long defaultBlockSize;
- private short defaultReplication;
SocketFactory socketFactory;
- int socketTimeout;
- final int writePacketSize;
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
final LeaseRenewer leaserenewer;
-
final SocketCache socketCache;
+ final Conf dfsClientConf;
+
+ /**
+ * DFSClient configuration
+ */
+ static class Conf {
+ final int maxBlockAcquireFailures;
+ final int confTime;
+ final int ioBufferSize;
+ final int bytesPerChecksum;
+ final int writePacketSize;
+ final int socketTimeout;
+ final int socketCacheCapacity;
+ /** Wait time window (in msec) if BlockMissingException is caught */
+ final int timeWindow;
+ final int nCachedConnRetry;
+ final int nBlockWriteRetry;
+ final int nBlockWriteLocateFollowingRetry;
+ final long defaultBlockSize;
+ final long prefetchSize;
+ final short defaultReplication;
+ final String taskId;
+
+ Conf(Configuration conf) {
+ maxBlockAcquireFailures = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
+ DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
+ confTime = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+ HdfsConstants.WRITE_TIMEOUT);
+ ioBufferSize = conf.getInt(
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+ bytesPerChecksum = conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
+ socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+ HdfsConstants.READ_TIMEOUT);
+ /** dfs.write.packet.size is an internal config variable */
+ writePacketSize = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+ defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+ DEFAULT_BLOCK_SIZE);
+ defaultReplication = (short) conf.getInt(
+ DFSConfigKeys.DFS_REPLICATION_KEY,
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+ taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
+ socketCacheCapacity = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+ prefetchSize = conf.getLong(
+ DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
+ 10 * defaultBlockSize);
+ timeWindow = conf
+ .getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
+ nCachedConnRetry = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
+ DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
+ nBlockWriteRetry = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
+ DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
+ nBlockWriteLocateFollowingRetry = conf
+ .getInt(
+ DFSConfigKeys
+ .DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
+ DFSConfigKeys
+ .DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
+ }
+ }
+
+ Conf getConf() {
+ return dfsClientConf;
+ }
/**
* A map from file names to {@link DFSOutputStream} objects
@@ -257,16 +325,11 @@ public class DFSClient implements FSCons
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
+ // Copy only the required DFSClient configuration
+ this.dfsClientConf = new Conf(conf);
this.conf = conf;
this.stats = stats;
- this.socketTimeout =
- conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
- HdfsConstants.READ_TIMEOUT);
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
- // dfs.write.packet.size is an internal config variable
- this.writePacketSize =
- conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
- DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
// The hdfsTimeout is currently the same as the ipc timeout
@@ -275,19 +338,8 @@ public class DFSClient implements FSCons
final String authority = nameNodeAddr == null? "null":
nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
-
- String taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
- this.clientName = leaserenewer.getClientName(taskId);
-
- defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
- defaultReplication = (short)
- conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
- DFSConfigKeys.DFS_REPLICATION_DEFAULT);
-
- this.socketCache = new SocketCache(
- conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
- DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT));
-
+ this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
+ this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
this.namenode = createNamenode(this.rpcNamenode);
@@ -306,8 +358,7 @@ public class DFSClient implements FSCons
* to retrieve block locations when reading.
*/
int getMaxBlockAcquireFailures() {
- return conf.getInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
- DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
+ return dfsClientConf.maxBlockAcquireFailures;
}
/**
@@ -315,18 +366,14 @@ public class DFSClient implements FSCons
* @param numNodes the number of nodes in the pipeline.
*/
int getDatanodeWriteTimeout(int numNodes) {
- int confTime =
- conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
- HdfsConstants.WRITE_TIMEOUT);
-
- return (confTime > 0) ?
- (confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
+ return (dfsClientConf.confTime > 0) ?
+ (dfsClientConf.confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
}
int getDatanodeReadTimeout(int numNodes) {
- return socketTimeout > 0 ?
+ return dfsClientConf.socketTimeout > 0 ?
(HdfsConstants.READ_TIMEOUT_EXTENSION * numNodes +
- socketTimeout) : 0;
+ dfsClientConf.socketTimeout) : 0;
}
int getHdfsTimeout() {
@@ -430,7 +477,7 @@ public class DFSClient implements FSCons
* @return the default block size in bytes
*/
public long getDefaultBlockSize() {
- return defaultBlockSize;
+ return dfsClientConf.defaultBlockSize;
}
/**
@@ -528,7 +575,7 @@ public class DFSClient implements FSCons
}
public short getDefaultReplication() {
- return defaultReplication;
+ return dfsClientConf.defaultReplication;
}
/**
@@ -583,7 +630,7 @@ public class DFSClient implements FSCons
public DFSInputStream open(String src)
throws IOException, UnresolvedLinkException {
- return open(src, conf.getInt("io.file.buffer.size", 4096), true, null);
+ return open(src, dfsClientConf.ioBufferSize, true, null);
}
/**
@@ -629,7 +676,8 @@ public class DFSClient implements FSCons
*/
public OutputStream create(String src, boolean overwrite)
throws IOException {
- return create(src, overwrite, defaultReplication, defaultBlockSize, null);
+ return create(src, overwrite, dfsClientConf.defaultReplication,
+ dfsClientConf.defaultBlockSize, null);
}
/**
@@ -639,7 +687,8 @@ public class DFSClient implements FSCons
public OutputStream create(String src,
boolean overwrite,
Progressable progress) throws IOException {
- return create(src, overwrite, defaultReplication, defaultBlockSize, progress);
+ return create(src, overwrite, dfsClientConf.defaultReplication,
+ dfsClientConf.defaultBlockSize, progress);
}
/**
@@ -660,7 +709,7 @@ public class DFSClient implements FSCons
public OutputStream create(String src, boolean overwrite, short replication,
long blockSize, Progressable progress) throws IOException {
return create(src, overwrite, replication, blockSize, progress,
- conf.getInt("io.file.buffer.size", 4096));
+ dfsClientConf.ioBufferSize);
}
/**
@@ -744,10 +793,9 @@ public class DFSClient implements FSCons
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
- final DFSOutputStream result = new DFSOutputStream(this, src, masked,
- flag, createParent, replication, blockSize, progress, buffersize,
- conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
- DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+ final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
+ createParent, replication, blockSize, progress, buffersize,
+ dfsClientConf.bytesPerChecksum);
leaserenewer.put(src, result, this);
return result;
}
@@ -851,8 +899,7 @@ public class DFSClient implements FSCons
UnresolvedPathException.class);
}
return new DFSOutputStream(this, src, buffersize, progress,
- lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
- DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+ lastBlock, stat, dfsClientConf.bytesPerChecksum);
}
/**
@@ -1061,7 +1108,7 @@ public class DFSClient implements FSCons
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
- return getFileChecksum(src, namenode, socketFactory, socketTimeout);
+ return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout);
}
/**
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1139097&r1=1139096&r2=1139097&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Jun 23 22:24:59 2011
@@ -83,7 +83,7 @@ public class DFSInputStream extends FSIn
* capped at maxBlockAcquireFailures
*/
private int failures = 0;
- private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught
+ private int timeWindow;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@@ -106,13 +106,9 @@ public class DFSInputStream extends FSIn
this.buffersize = buffersize;
this.src = src;
this.socketCache = dfsClient.socketCache;
- prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
- 10 * dfsClient.defaultBlockSize);
- timeWindow = this.dfsClient.conf.getInt(
- DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
- nCachedConnRetry = this.dfsClient.conf.getInt(
- DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
- DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
+ prefetchSize = dfsClient.getConf().prefetchSize;
+ timeWindow = dfsClient.getConf().timeWindow;
+ nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
openInfo();
}
@@ -163,7 +159,7 @@ public class DFSInputStream extends FSIn
try {
cdp = DFSClient.createClientDatanodeProtocolProxy(
- datanode, dfsClient.conf, dfsClient.socketTimeout, locatedblock);
+ datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@@ -771,8 +767,8 @@ public class DFSInputStream extends FSIn
// disaster.
sock.setTcpNoDelay(true);
- NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout);
- sock.setSoTimeout(dfsClient.socketTimeout);
+ NetUtils.connect(sock, dnAddr, dfsClient.getConf().socketTimeout);
+ sock.setSoTimeout(dfsClient.getConf().socketTimeout);
}
try {
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1139097&r1=1139096&r2=1139097&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Jun 23 22:24:59 2011
@@ -103,7 +103,6 @@ import org.apache.hadoop.util.StringUtil
****************************************************************/
class DFSOutputStream extends FSOutputSummer implements Syncable {
private final DFSClient dfsClient;
- private Configuration conf;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private Socket s;
// closed is accessed by different threads under different locks.
@@ -355,7 +354,7 @@ class DFSOutputStream extends FSOutputSu
// that expected size of of a packet, then create
// smaller size packet.
//
- computePacketChunkSize(Math.min(dfsClient.writePacketSize, freeInLastBlock),
+ computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock),
bytesPerChecksum);
}
@@ -426,8 +425,8 @@ class DFSOutputStream extends FSOutputSu
&& dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING &&
- now - lastPacket < dfsClient.socketTimeout/2)) || doSleep ) {
- long timeout = dfsClient.socketTimeout/2 - (now-lastPacket);
+ now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
+ long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
@@ -953,8 +952,7 @@ class DFSOutputStream extends FSOutputSu
private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
- int count = conf.getInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
- DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
+ int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
do {
hasError = false;
@@ -1079,9 +1077,7 @@ class DFSOutputStream extends FSOutputSu
private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes)
throws IOException, UnresolvedLinkException {
- int retries =
- conf.getInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
- DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
+ int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
@@ -1201,7 +1197,6 @@ class DFSOutputStream extends FSOutputSu
int bytesPerChecksum, short replication) throws IOException {
super(new PureJavaCrc32(), bytesPerChecksum, 4);
this.dfsClient = dfsClient;
- this.conf = dfsClient.conf;
this.src = src;
this.blockSize = blockSize;
this.blockReplication = replication;
@@ -1232,7 +1227,7 @@ class DFSOutputStream extends FSOutputSu
throws IOException {
this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
- computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
+ computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
try {
dfsClient.namenode.create(
@@ -1269,7 +1264,7 @@ class DFSOutputStream extends FSOutputSu
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
} else {
- computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
+ computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
streamer = new DataStreamer();
}
streamer.start();
@@ -1385,7 +1380,7 @@ class DFSOutputStream extends FSOutputSu
}
if (!appendChunk) {
- int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.writePacketSize);
+ int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
//