You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/06/24 00:25:00 UTC

svn commit: r1139097 - in /hadoop/common/trunk/hdfs: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSClient.java src/java/org/apache/hadoop/hdfs/DFSInputStream.java src/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Author: szetszwo
Date: Thu Jun 23 22:24:59 2011
New Revision: 1139097

URL: http://svn.apache.org/viewvc?rev=1139097&view=rev
Log:
HDFS-2092. Remove some object references to Configuration in DFSClient.  Contributed by Bharath Mundlapudi

Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1139097&r1=1139096&r2=1139097&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Thu Jun 23 22:24:59 2011
@@ -523,6 +523,9 @@ Trunk (unreleased changes)
 
     HDFS-2100. Improve TestStorageRestore. (atm)
 
+    HDFS-2092. Remove some object references to Configuration in DFSClient.
+    (Bharath Mundlapudi via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1139097&r1=1139096&r2=1139097&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Jun 23 22:24:59 2011
@@ -128,17 +128,85 @@ public class DFSClient implements FSCons
   static Random r = new Random();
   final String clientName;
   Configuration conf;
-  long defaultBlockSize;
-  private short defaultReplication;
   SocketFactory socketFactory;
-  int socketTimeout;
-  final int writePacketSize;
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
   final LeaseRenewer leaserenewer;
-
   final SocketCache socketCache;
+  final Conf dfsClientConf;
+
+  /**
+   * DFSClient configuration 
+   */
+  static class Conf {
+    final int maxBlockAcquireFailures;
+    final int confTime;
+    final int ioBufferSize;
+    final int bytesPerChecksum;
+    final int writePacketSize;
+    final int socketTimeout;
+    final int socketCacheCapacity;
+    /** Wait time window (in msec) if BlockMissingException is caught */
+    final int timeWindow;
+    final int nCachedConnRetry;
+    final int nBlockWriteRetry;
+    final int nBlockWriteLocateFollowingRetry;
+    final long defaultBlockSize;
+    final long prefetchSize;
+    final short defaultReplication;
+    final String taskId;
+
+    Conf(Configuration conf) {
+      maxBlockAcquireFailures = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
+          DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
+      confTime = conf.getInt(
+          DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+          HdfsConstants.WRITE_TIMEOUT);
+      ioBufferSize = conf.getInt(
+          CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+          CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+      bytesPerChecksum = conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+          DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
+      socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+          HdfsConstants.READ_TIMEOUT);
+      /** dfs.write.packet.size is an internal config variable */
+      writePacketSize = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+      defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+          DEFAULT_BLOCK_SIZE);
+      defaultReplication = (short) conf.getInt(
+          DFSConfigKeys.DFS_REPLICATION_KEY,
+          DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+      taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
+      socketCacheCapacity = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+          DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+      prefetchSize = conf.getLong(
+          DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
+          10 * defaultBlockSize);
+      timeWindow = conf
+          .getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
+      nCachedConnRetry = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
+          DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
+      nBlockWriteRetry = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
+          DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
+      nBlockWriteLocateFollowingRetry = conf
+          .getInt(
+              DFSConfigKeys
+              .DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
+              DFSConfigKeys
+              .DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
+    }
+  }
+ 
+  Conf getConf() {
+    return dfsClientConf;
+  }
   
   /**
    * A map from file names to {@link DFSOutputStream} objects
@@ -257,16 +325,11 @@ public class DFSClient implements FSCons
   DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
       Configuration conf, FileSystem.Statistics stats)
     throws IOException {
+    // Copy only the required DFSClient configuration
+    this.dfsClientConf = new Conf(conf);
     this.conf = conf;
     this.stats = stats;
-    this.socketTimeout = 
-      conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 
-                  HdfsConstants.READ_TIMEOUT);
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
-    // dfs.write.packet.size is an internal config variable
-    this.writePacketSize = 
-      conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
-                  DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
     this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
 
     // The hdfsTimeout is currently the same as the ipc timeout 
@@ -275,19 +338,8 @@ public class DFSClient implements FSCons
     final String authority = nameNodeAddr == null? "null":
         nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
     this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
-    
-    String taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
-    this.clientName = leaserenewer.getClientName(taskId);
-
-    defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
-    defaultReplication = (short) 
-      conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
-                  DFSConfigKeys.DFS_REPLICATION_DEFAULT);
-    
-    this.socketCache = new SocketCache(
-        conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
-            DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT));
-
+    this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
+    this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
     if (nameNodeAddr != null && rpcNamenode == null) {
       this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
       this.namenode = createNamenode(this.rpcNamenode);
@@ -306,8 +358,7 @@ public class DFSClient implements FSCons
    * to retrieve block locations when reading.
    */
   int getMaxBlockAcquireFailures() {
-    return conf.getInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
-                       DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
+    return dfsClientConf.maxBlockAcquireFailures;
   }
 
   /**
@@ -315,18 +366,14 @@ public class DFSClient implements FSCons
    * @param numNodes the number of nodes in the pipeline.
    */
   int getDatanodeWriteTimeout(int numNodes) {
-    int confTime =
-        conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
-                    HdfsConstants.WRITE_TIMEOUT);
-
-    return (confTime > 0) ?
-      (confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
+    return (dfsClientConf.confTime > 0) ?
+      (dfsClientConf.confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
   }
 
   int getDatanodeReadTimeout(int numNodes) {
-    return socketTimeout > 0 ?
+    return dfsClientConf.socketTimeout > 0 ?
         (HdfsConstants.READ_TIMEOUT_EXTENSION * numNodes +
-        socketTimeout) : 0;
+            dfsClientConf.socketTimeout) : 0;
   }
   
   int getHdfsTimeout() {
@@ -430,7 +477,7 @@ public class DFSClient implements FSCons
    * @return the default block size in bytes
    */
   public long getDefaultBlockSize() {
-    return defaultBlockSize;
+    return dfsClientConf.defaultBlockSize;
   }
     
   /**
@@ -528,7 +575,7 @@ public class DFSClient implements FSCons
   }
   
   public short getDefaultReplication() {
-    return defaultReplication;
+    return dfsClientConf.defaultReplication;
   }
 
   /**
@@ -583,7 +630,7 @@ public class DFSClient implements FSCons
   
   public DFSInputStream open(String src) 
       throws IOException, UnresolvedLinkException {
-    return open(src, conf.getInt("io.file.buffer.size", 4096), true, null);
+    return open(src, dfsClientConf.ioBufferSize, true, null);
   }
 
   /**
@@ -629,7 +676,8 @@ public class DFSClient implements FSCons
    */
   public OutputStream create(String src, boolean overwrite) 
       throws IOException {
-    return create(src, overwrite, defaultReplication, defaultBlockSize, null);
+    return create(src, overwrite, dfsClientConf.defaultReplication,
+        dfsClientConf.defaultBlockSize, null);
   }
     
   /**
@@ -639,7 +687,8 @@ public class DFSClient implements FSCons
   public OutputStream create(String src, 
                              boolean overwrite,
                              Progressable progress) throws IOException {
-    return create(src, overwrite, defaultReplication, defaultBlockSize, progress);
+    return create(src, overwrite, dfsClientConf.defaultReplication,
+        dfsClientConf.defaultBlockSize, progress);
   }
     
   /**
@@ -660,7 +709,7 @@ public class DFSClient implements FSCons
   public OutputStream create(String src, boolean overwrite, short replication,
       long blockSize, Progressable progress) throws IOException {
     return create(src, overwrite, replication, blockSize, progress,
-        conf.getInt("io.file.buffer.size", 4096));
+        dfsClientConf.ioBufferSize);
   }
 
   /**
@@ -744,10 +793,9 @@ public class DFSClient implements FSCons
     if(LOG.isDebugEnabled()) {
       LOG.debug(src + ": masked=" + masked);
     }
-    final DFSOutputStream result = new DFSOutputStream(this, src, masked,
-        flag, createParent, replication, blockSize, progress, buffersize,
-        conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
-                    DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+    final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
+        createParent, replication, blockSize, progress, buffersize,
+        dfsClientConf.bytesPerChecksum);
     leaserenewer.put(src, result, this);
     return result;
   }
@@ -851,8 +899,7 @@ public class DFSClient implements FSCons
                                      UnresolvedPathException.class);
     }
     return new DFSOutputStream(this, src, buffersize, progress,
-        lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
-                                     DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+        lastBlock, stat, dfsClientConf.bytesPerChecksum);
   }
   
   /**
@@ -1061,7 +1108,7 @@ public class DFSClient implements FSCons
    */
   public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
     checkOpen();
-    return getFileChecksum(src, namenode, socketFactory, socketTimeout);    
+    return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout);    
   }
 
   /**

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1139097&r1=1139096&r2=1139097&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Jun 23 22:24:59 2011
@@ -83,7 +83,7 @@ public class DFSInputStream extends FSIn
    * capped at maxBlockAcquireFailures
    */
   private int failures = 0;
-  private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught
+  private int timeWindow; 
 
   /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
    * parallel accesses to DFSInputStream (through ptreads) properly */
@@ -106,13 +106,9 @@ public class DFSInputStream extends FSIn
     this.buffersize = buffersize;
     this.src = src;
     this.socketCache = dfsClient.socketCache;
-    prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
-        10 * dfsClient.defaultBlockSize);
-    timeWindow = this.dfsClient.conf.getInt(
-        DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
-    nCachedConnRetry = this.dfsClient.conf.getInt(
-        DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
-        DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
+    prefetchSize = dfsClient.getConf().prefetchSize;
+    timeWindow = dfsClient.getConf().timeWindow;
+    nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
     openInfo();
   }
 
@@ -163,7 +159,7 @@ public class DFSInputStream extends FSIn
       
       try {
         cdp = DFSClient.createClientDatanodeProtocolProxy(
-        datanode, dfsClient.conf, dfsClient.socketTimeout, locatedblock);
+        datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock);
         
         final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
         
@@ -771,8 +767,8 @@ public class DFSInputStream extends FSIn
         // disaster.
         sock.setTcpNoDelay(true);
 
-        NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout);
-        sock.setSoTimeout(dfsClient.socketTimeout);
+        NetUtils.connect(sock, dnAddr, dfsClient.getConf().socketTimeout);
+        sock.setSoTimeout(dfsClient.getConf().socketTimeout);
       }
 
       try {

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1139097&r1=1139096&r2=1139097&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Jun 23 22:24:59 2011
@@ -103,7 +103,6 @@ import org.apache.hadoop.util.StringUtil
 ****************************************************************/
 class DFSOutputStream extends FSOutputSummer implements Syncable {
   private final DFSClient dfsClient;
-  private Configuration conf;
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
   private Socket s;
   // closed is accessed by different threads under different locks.
@@ -355,7 +354,7 @@ class DFSOutputStream extends FSOutputSu
         // that expected size of of a packet, then create 
         // smaller size packet.
         //
-        computePacketChunkSize(Math.min(dfsClient.writePacketSize, freeInLastBlock), 
+        computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), 
             bytesPerChecksum);
       }
 
@@ -426,8 +425,8 @@ class DFSOutputStream extends FSOutputSu
                 && dataQueue.size() == 0 && 
                 (stage != BlockConstructionStage.DATA_STREAMING || 
                  stage == BlockConstructionStage.DATA_STREAMING && 
-                 now - lastPacket < dfsClient.socketTimeout/2)) || doSleep ) {
-              long timeout = dfsClient.socketTimeout/2 - (now-lastPacket);
+                 now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
+              long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
               timeout = timeout <= 0 ? 1000 : timeout;
               timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                  timeout : 1000;
@@ -953,8 +952,7 @@ class DFSOutputStream extends FSOutputSu
     private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
       LocatedBlock lb = null;
       DatanodeInfo[] nodes = null;
-      int count = conf.getInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
-                              DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
+      int count = dfsClient.getConf().nBlockWriteRetry;
       boolean success = false;
       do {
         hasError = false;
@@ -1079,9 +1077,7 @@ class DFSOutputStream extends FSOutputSu
     private LocatedBlock locateFollowingBlock(long start,
         DatanodeInfo[] excludedNodes) 
         throws IOException, UnresolvedLinkException {
-      int retries = 
-        conf.getInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
-                    DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
+      int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
       long sleeptime = 400;
       while (true) {
         long localstart = System.currentTimeMillis();
@@ -1201,7 +1197,6 @@ class DFSOutputStream extends FSOutputSu
       int bytesPerChecksum, short replication) throws IOException {
     super(new PureJavaCrc32(), bytesPerChecksum, 4);
     this.dfsClient = dfsClient;
-    this.conf = dfsClient.conf;
     this.src = src;
     this.blockSize = blockSize;
     this.blockReplication = replication;
@@ -1232,7 +1227,7 @@ class DFSOutputStream extends FSOutputSu
       throws IOException {
     this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
 
-    computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
+    computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
 
     try {
       dfsClient.namenode.create(
@@ -1269,7 +1264,7 @@ class DFSOutputStream extends FSOutputSu
       bytesCurBlock = lastBlock.getBlockSize();
       streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
     } else {
-      computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
+      computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
       streamer = new DataStreamer();
     }
     streamer.start();
@@ -1385,7 +1380,7 @@ class DFSOutputStream extends FSOutputSu
       }
 
       if (!appendChunk) {
-        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.writePacketSize);
+        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
         computePacketChunkSize(psize, bytesPerChecksum);
       }
       //