You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/06/20 06:50:25 UTC

svn commit: r1494855 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/common/ src/main/java/org/apache/hadoop/hdfs/server/datanode...

Author: szetszwo
Date: Thu Jun 20 04:50:24 2013
New Revision: 1494855

URL: http://svn.apache.org/r1494855
Log:
svn merge -c 1494854 from trunk for HDFS-4914. Use DFSClient.Conf instead of Configuration.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1494854

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jun 20 04:50:24 2013
@@ -178,6 +178,8 @@ Release 2.1.0-beta - UNRELEASED
     HDFS-3009. Remove duplicate code in DFSClient#isLocalAddress by using 
     NetUtils. (Hari Mankude via suresh)
 
+    HDFS-4914. Use DFSClient.Conf instead of Configuration.  (szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1494854

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Thu Jun 20 04:50:24 2013
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -40,7 +39,6 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 
@@ -75,9 +73,7 @@ public class BlockReaderFactory {
    *                                     should be allowed.
    * @return New BlockReader instance
    */
-  @SuppressWarnings("deprecation")
-  public static BlockReader newBlockReader(
-                                     Configuration conf,
+  public static BlockReader newBlockReader(DFSClient.Conf conf,
                                      String file,
                                      ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
@@ -91,14 +87,11 @@ public class BlockReaderFactory {
                                      FileInputStreamCache fisCache,
                                      boolean allowShortCircuitLocalReads)
   throws IOException {
-    peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
-        HdfsServerConstants.READ_TIMEOUT));
+    peer.setReadTimeout(conf.socketTimeout);
     peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
 
     if (peer.getDomainSocket() != null) {
-      if (allowShortCircuitLocalReads &&
-         (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
-            DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
+      if (allowShortCircuitLocalReads && !conf.useLegacyBlockReaderLocal) {
         // If this is a domain socket, and short-circuit local reads are 
         // enabled, try to set up a BlockReaderLocal.
         BlockReader reader = newShortCircuitBlockReader(conf, file,
@@ -118,21 +111,19 @@ public class BlockReaderFactory {
       // If this is a domain socket and we couldn't (or didn't want to) set
       // up a BlockReaderLocal, check that we are allowed to pass data traffic
       // over the socket before proceeding.
-      if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
-            DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
+      if (!conf.domainSocketDataTraffic) {
         throw new IOException("Because we can't do short-circuit access, " +
           "and data traffic over domain sockets is disabled, " +
           "we cannot use this socket to talk to " + datanodeID);
       }
     }
 
-    if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
-        DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) {
-      return RemoteBlockReader.newBlockReader(file,
-          block, blockToken, startOffset, len,
-          conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
-              DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
+    if (conf.useLegacyBlockReader) {
+      @SuppressWarnings("deprecation")
+      RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
+          block, blockToken, startOffset, len, conf.ioBufferSize,
           verifyChecksum, clientName, peer, datanodeID, peerCache);
+      return reader;
     } else {
       return RemoteBlockReader2.newBlockReader(
           file, block, blockToken, startOffset, len,
@@ -173,7 +164,7 @@ public class BlockReaderFactory {
    * @throws IOException       If there was a communication error.
    */
   private static BlockReaderLocal newShortCircuitBlockReader(
-      Configuration conf, String file, ExtendedBlock block,
+      DFSClient.Conf conf, String file, ExtendedBlock block,
       Token<BlockTokenIdentifier> blockToken, long startOffset,
       long len, Peer peer, DatanodeID datanodeID,
       DomainSocketFactory domSockFactory, boolean verifyChecksum,
@@ -245,15 +236,14 @@ public class BlockReaderFactory {
    * This block reader implements the path-based style of local reads
    * first introduced in HDFS-2246.
    */
-  static BlockReader getLegacyBlockReaderLocal(UserGroupInformation ugi,
-      Configuration conf, String src, ExtendedBlock blk,
+  static BlockReader getLegacyBlockReaderLocal(DFSClient dfsClient,
+      String src, ExtendedBlock blk,
       Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
-      int socketTimeout, long offsetIntoBlock,
-      boolean connectToDnViaHostname) throws InvalidToken, IOException {
+      long offsetIntoBlock) throws InvalidToken, IOException {
     try {
-      return BlockReaderLocalLegacy.newBlockReader(ugi, conf, src,
-          blk, accessToken, chosenNode, socketTimeout, offsetIntoBlock,
-          blk.getNumBytes() - offsetIntoBlock, connectToDnViaHostname);
+      final long length = blk.getNumBytes() - offsetIntoBlock;
+      return BlockReaderLocalLegacy.newBlockReader(dfsClient, src, blk,
+          accessToken, chosenNode, offsetIntoBlock, length);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException(InvalidToken.class,
           AccessControlException.class);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Thu Jun 20 04:50:24 2013
@@ -17,10 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.DataInputStream;
-import org.apache.hadoop.conf.Configuration;
-
 import java.io.BufferedInputStream;
+import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -90,13 +88,8 @@ class BlockReaderLocal implements BlockR
   
   private final FileInputStreamCache fisCache;
   
-  private static int getSlowReadBufferNumChunks(Configuration conf,
+  private static int getSlowReadBufferNumChunks(int bufSize,
       int bytesPerChecksum) {
-
-    int bufSize =
-        conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
-            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-
     if (bufSize < bytesPerChecksum) {
       throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
           bufSize + ") is not large enough to hold a single chunk (" +
@@ -108,7 +101,7 @@ class BlockReaderLocal implements BlockR
     return bufSize / bytesPerChecksum;
   }
 
-  public BlockReaderLocal(Configuration conf, String filename,
+  public BlockReaderLocal(DFSClient.Conf conf, String filename,
       ExtendedBlock block, long startOffset, long length,
       FileInputStream dataIn, FileInputStream checksumIn,
       DatanodeID datanodeID, boolean verifyChecksum,
@@ -132,13 +125,7 @@ class BlockReaderLocal implements BlockR
       throw new IOException("Wrong version (" + version + ") of the " +
           "metadata file for " + filename + ".");
     }
-    if (!verifyChecksum) {
-      this.verifyChecksum = false; 
-    } else {
-      this.verifyChecksum = !conf.getBoolean(DFSConfigKeys.
-          DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, 
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
-    }
+    this.verifyChecksum = verifyChecksum && !conf.skipShortCircuitChecksums;
     long firstChunkOffset;
     if (this.verifyChecksum) {
       this.checksum = header.getChecksum();
@@ -148,7 +135,8 @@ class BlockReaderLocal implements BlockR
           - (startOffset % checksum.getBytesPerChecksum());
       this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
 
-      int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+      int chunksPerChecksumRead = getSlowReadBufferNumChunks(
+          conf.shortCircuitBufferSize, bytesPerChecksum);
       slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
       checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
       // Initially the buffers have nothing to read.
@@ -171,7 +159,12 @@ class BlockReaderLocal implements BlockR
       this.dataIn.getChannel().position(firstChunkOffset);
       success = true;
     } finally {
-      if (!success) {
+      if (success) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Created BlockReaderLocal for file " + filename
+              + " block " + block + " in datanode " + datanodeID);
+        }
+      } else {
         if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
         if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
       }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Thu Jun 20 04:50:24 2013
@@ -21,7 +21,6 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
@@ -32,17 +31,15 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -70,7 +67,7 @@ import org.apache.hadoop.util.DataChecks
  * </ul>
  */
 class BlockReaderLocalLegacy implements BlockReader {
-  private static final Log LOG = LogFactory.getLog(DFSClient.class);
+  private static final Log LOG = LogFactory.getLog(BlockReaderLocalLegacy.class);
 
   //Stores the cache and proxy for a local datanode.
   private static class LocalDatanodeInfo {
@@ -173,19 +170,20 @@ class BlockReaderLocalLegacy implements 
   /**
    * The only way this object can be instantiated.
    */
-  static BlockReaderLocalLegacy newBlockReader(UserGroupInformation ugi,
-      Configuration conf, String file, ExtendedBlock blk,
-      Token<BlockTokenIdentifier> token, DatanodeInfo node, int socketTimeout,
-      long startOffset, long length, boolean connectToDnViaHostname)
+  static BlockReaderLocalLegacy newBlockReader(DFSClient dfsClient,
+      String file, ExtendedBlock blk, Token<BlockTokenIdentifier> token,
+      DatanodeInfo node, long startOffset, long length)
       throws IOException {
+    final DFSClient.Conf conf = dfsClient.getConf();
 
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
         .getIpcPort());
     // check the cache first
     BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
     if (pathinfo == null) {
-      pathinfo = getBlockPathInfo(ugi, blk, node, conf, socketTimeout, token,
-          connectToDnViaHostname);
+      pathinfo = getBlockPathInfo(dfsClient.ugi, blk, node,
+          dfsClient.getConfiguration(), dfsClient.getHdfsTimeout(), token,
+          conf.connectToDnViaHostname);
     }
 
     // check to see if the file exists. It may so happen that the
@@ -197,7 +195,7 @@ class BlockReaderLocalLegacy implements 
     FileInputStream dataIn = null;
     FileInputStream checksumIn = null;
     BlockReaderLocalLegacy localBlockReader = null;
-    boolean skipChecksumCheck = skipChecksumCheck(conf);
+    boolean skipChecksumCheck = conf.skipShortCircuitChecksums;
     try {
       // get a local file system
       File blkfile = new File(pathinfo.getBlockPath());
@@ -285,16 +283,8 @@ class BlockReaderLocalLegacy implements 
     return pathinfo;
   }
   
-  private static boolean skipChecksumCheck(Configuration conf) {
-    return conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
-  }
-  
-  private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
-    int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-
+  private static int getSlowReadBufferNumChunks(int bufferSizeBytes,
+      int bytesPerChecksum) {
     if (bufferSizeBytes < bytesPerChecksum) {
       throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
           "buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
@@ -307,7 +297,7 @@ class BlockReaderLocalLegacy implements 
     return bufferSizeBytes / bytesPerChecksum;
   }
 
-  private BlockReaderLocalLegacy(Configuration conf, String hdfsfile,
+  private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
       ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
       long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
       throws IOException {
@@ -316,7 +306,7 @@ class BlockReaderLocalLegacy implements 
         dataIn, startOffset, null);
   }
 
-  private BlockReaderLocalLegacy(Configuration conf, String hdfsfile,
+  private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
       ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
       long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
       boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
@@ -333,7 +323,8 @@ class BlockReaderLocalLegacy implements 
     this.checksumIn = checksumIn;
     this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
 
-    int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+    int chunksPerChecksumRead = getSlowReadBufferNumChunks(
+        conf.shortCircuitBufferSize, bytesPerChecksum);
     slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
     checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
     // Initially the buffers have nothing to read.

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu Jun 20 04:50:24 2013
@@ -178,6 +178,9 @@ public class DFSClient implements java.i
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
   static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
+
+  private final Configuration conf;
+  private final Conf dfsClientConf;
   final ClientProtocol namenode;
   /* The service used for delegation tokens */
   private Text dtService;
@@ -188,14 +191,11 @@ public class DFSClient implements java.i
   private volatile FsServerDefaults serverDefaults;
   private volatile long serverDefaultsLastUpdate;
   final String clientName;
-  Configuration conf;
   SocketFactory socketFactory;
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
-  final int hdfsTimeout;    // timeout value for a DFS operation.
   private final String authority;
   final PeerCache peerCache;
-  final Conf dfsClientConf;
   private Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
@@ -204,7 +204,8 @@ public class DFSClient implements java.i
   /**
    * DFSClient configuration 
    */
-  static class Conf {
+  public static class Conf {
+    final int hdfsTimeout;    // timeout value for a DFS operation.
     final int maxFailoverAttempts;
     final int failoverSleepBaseMillis;
     final int failoverSleepMaxMillis;
@@ -227,18 +228,25 @@ public class DFSClient implements java.i
     final short defaultReplication;
     final String taskId;
     final FsPermission uMask;
-    final boolean useLegacyBlockReaderLocal;
     final boolean connectToDnViaHostname;
     final boolean getHdfsBlocksMetadataEnabled;
     final int getFileBlockStorageLocationsNumThreads;
     final int getFileBlockStorageLocationsTimeout;
+
+    final boolean useLegacyBlockReader;
+    final boolean useLegacyBlockReaderLocal;
     final String domainSocketPath;
     final boolean skipShortCircuitChecksums;
     final int shortCircuitBufferSize;
     final boolean shortCircuitLocalReads;
     final boolean domainSocketDataTraffic;
+    final int shortCircuitStreamsCacheSize;
+    final long shortCircuitStreamsCacheExpiryMs; 
+
+    public Conf(Configuration conf) {
+      // The hdfsTimeout is currently the same as the ipc timeout 
+      hdfsTimeout = Client.getTimeout(conf);
 
-    Conf(Configuration conf) {
       maxFailoverAttempts = conf.getInt(
           DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
           DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
@@ -277,19 +285,15 @@ public class DFSClient implements java.i
           DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
       prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
           10 * defaultBlockSize);
-      timeWindow = conf
-          .getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
+      timeWindow = conf.getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
       nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
           DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
       nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
           DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
-      nBlockWriteLocateFollowingRetry = conf
-          .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
-              DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
+      nBlockWriteLocateFollowingRetry = conf.getInt(
+          DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
+          DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
       uMask = FsPermission.getUMask(conf);
-      useLegacyBlockReaderLocal = conf.getBoolean(
-          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
-          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
       connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
           DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
       getHdfsBlocksMetadataEnabled = conf.getBoolean(
@@ -301,20 +305,50 @@ public class DFSClient implements java.i
       getFileBlockStorageLocationsTimeout = conf.getInt(
           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
-      domainSocketPath = conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+
+      useLegacyBlockReader = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
+          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
+      useLegacyBlockReaderLocal = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
+      shortCircuitLocalReads = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+      domainSocketDataTraffic = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+          DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
+      domainSocketPath = conf.getTrimmed(
+          DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
           DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
+
+      if (BlockReaderLocal.LOG.isDebugEnabled()) {
+        BlockReaderLocal.LOG.debug(
+            DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
+            + " = " + useLegacyBlockReaderLocal);
+        BlockReaderLocal.LOG.debug(
+            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
+            + " = " + shortCircuitLocalReads);
+        BlockReaderLocal.LOG.debug(
+            DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
+            + " = " + domainSocketDataTraffic);
+        BlockReaderLocal.LOG.debug(
+            DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
+            + " = " + domainSocketPath);
+      }
+
       skipShortCircuitChecksums = conf.getBoolean(
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
       shortCircuitBufferSize = conf.getInt(
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-      shortCircuitLocalReads = conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
-      domainSocketDataTraffic = conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
-        DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
+      shortCircuitStreamsCacheSize = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
+      shortCircuitStreamsCacheExpiryMs = conf.getLong(
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
     }
 
     private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -360,10 +394,14 @@ public class DFSClient implements java.i
     }
   }
  
-  Conf getConf() {
+  public Conf getConf() {
     return dfsClientConf;
   }
   
+  Configuration getConfiguration() {
+    return conf;
+  }
+  
   /**
    * A map from file names to {@link DFSOutputStream} objects
    * that are currently being written by this client.
@@ -426,8 +464,6 @@ public class DFSClient implements java.i
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
 
-    // The hdfsTimeout is currently the same as the ipc timeout 
-    this.hdfsTimeout = Client.getTimeout(conf);
     this.ugi = UserGroupInformation.getCurrentUser();
     
     this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
@@ -542,21 +578,13 @@ public class DFSClient implements java.i
   }
   
   int getHdfsTimeout() {
-    return hdfsTimeout;
+    return dfsClientConf.hdfsTimeout;
   }
   
   String getClientName() {
     return clientName;
   }
 
-  /**
-   * @return whether the client should use hostnames instead of IPs
-   *    when connecting to DataNodes
-   */
-  boolean connectToDnViaHostname() {
-    return dfsClientConf.connectToDnViaHostname;
-  }
-
   void checkOpen() throws IOException {
     if (!clientRunning) {
       IOException result = new IOException("Filesystem closed");
@@ -793,6 +821,7 @@ public class DFSClient implements java.i
    * @throws IOException
    * @deprecated Use Token.renew instead.
    */
+  @Deprecated
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
     LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token));
@@ -864,6 +893,7 @@ public class DFSClient implements java.i
    * @throws IOException
    * @deprecated Use Token.cancel instead.
    */
+  @Deprecated
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
     LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token));
@@ -965,6 +995,11 @@ public class DFSClient implements java.i
     return dfsClientConf.defaultReplication;
   }
   
+  public LocatedBlocks getLocatedBlocks(String src, long start)
+      throws IOException {
+    return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
+  }
+
   /*
    * This is just a wrapper around callGetBlockLocations, but non-static so that
    * we can stub it out for tests.
@@ -1693,10 +1728,10 @@ public class DFSClient implements java.i
    * @param socketFactory to create sockets to connect to DNs
    * @param socketTimeout timeout to use when connecting and waiting for a response
    * @param encryptionKey the key needed to communicate with DNs in this cluster
-   * @param connectToDnViaHostname {@link #connectToDnViaHostname()}
+   * @param connectToDnViaHostname whether the client should use hostnames instead of IPs
    * @return The checksum 
    */
-  static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+  private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
       String clientName,
       ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
       DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Jun 20 04:50:24 2013
@@ -72,7 +72,6 @@ public class DFSInputStream extends FSIn
   private final DFSClient dfsClient;
   private boolean closed = false;
   private final String src;
-  private final long prefetchSize;
   private BlockReader blockReader = null;
   private final boolean verifyChecksum;
   private LocatedBlocks locatedBlocks = null;
@@ -163,7 +162,6 @@ public class DFSInputStream extends FSIn
    * capped at maxBlockAcquireFailures
    */
   private int failures = 0;
-  private final int timeWindow;
 
   /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
    * parallel accesses to DFSInputStream (through ptreads) properly */
@@ -173,8 +171,6 @@ public class DFSInputStream extends FSIn
   
   private final byte[] oneByteBuf = new byte[1]; // used for 'int read()'
 
-  private final int nCachedConnRetry;
-
   void addToDeadNodes(DatanodeInfo dnInfo) {
     deadNodes.put(dnInfo, dnInfo);
   }
@@ -187,15 +183,8 @@ public class DFSInputStream extends FSIn
     this.src = src;
     this.peerCache = dfsClient.peerCache;
     this.fileInputStreamCache = new FileInputStreamCache(
-      dfsClient.conf.getInt(DFSConfigKeys.
-        DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
-      dfsClient.conf.getLong(DFSConfigKeys.
-        DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT));
-    prefetchSize = dfsClient.getConf().prefetchSize;
-    timeWindow = dfsClient.getConf().timeWindow;
-    nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
+        dfsClient.getConf().shortCircuitStreamsCacheSize,
+        dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
     openInfo();
   }
 
@@ -236,7 +225,7 @@ public class DFSInputStream extends FSIn
   }
 
   private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
-    LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0, prefetchSize);
+    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("newInfo = " + newInfo);
     }
@@ -280,8 +269,8 @@ public class DFSInputStream extends FSIn
       ClientDatanodeProtocol cdp = null;
       
       try {
-        cdp = DFSUtil.createClientDatanodeProtocolProxy(
-            datanode, dfsClient.conf, dfsClient.getConf().socketTimeout,
+        cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode,
+            dfsClient.getConfiguration(), dfsClient.getConf().socketTimeout,
             dfsClient.getConf().connectToDnViaHostname, locatedblock);
         
         final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@@ -389,8 +378,7 @@ public class DFSInputStream extends FSIn
       if (targetBlockIdx < 0) { // block is not cached
         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
         // fetch more blocks
-        LocatedBlocks newBlocks;
-        newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize);
+        final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
         assert (newBlocks != null) : "Could not find target position " + offset;
         locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
       }
@@ -413,8 +401,7 @@ public class DFSInputStream extends FSIn
       targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
     }
     // fetch blocks
-    LocatedBlocks newBlocks;
-    newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize);
+    final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
     if (newBlocks == null) {
       throw new IOException("Could not find target position " + offset);
     }
@@ -832,7 +819,7 @@ public class DFSInputStream extends FSIn
       try {
         DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
         final String dnAddr =
-            chosenNode.getXferAddr(dfsClient.connectToDnViaHostname());
+            chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
         if (DFSClient.LOG.isDebugEnabled()) {
           DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
         }
@@ -861,6 +848,7 @@ public class DFSInputStream extends FSIn
           // alleviating the request rate from the server. Similarly the 3rd retry
           // will wait 6000ms grace period before retry and the waiting window is
           // expanded to 9000ms. 
+          final int timeWindow = dfsClient.getConf().timeWindow;
           double waitTime = timeWindow * failures +       // grace period for the last round of attempt
             timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
           DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
@@ -1011,7 +999,7 @@ public class DFSInputStream extends FSIn
         DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
             "the FileInputStreamCache.");
       }
-      return new BlockReaderLocal(dfsClient.conf, file,
+      return new BlockReaderLocal(dfsClient.getConf(), file,
         block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum,
         fileInputStreamCache);
     }
@@ -1023,9 +1011,8 @@ public class DFSInputStream extends FSIn
         DFSClient.isLocalAddress(dnAddr) &&
         (!shortCircuitForbidden())) {
       try {
-        return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient.ugi,
-            dfsClient.conf, clientName, block, blockToken, chosenNode,
-            dfsClient.hdfsTimeout, startOffset,dfsClient.connectToDnViaHostname());
+        return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient,
+            clientName, block, blockToken, chosenNode, startOffset);
       } catch (IOException e) {
         DFSClient.LOG.warn("error creating legacy BlockReaderLocal.  " +
             "Disabling legacy local reads.", e);
@@ -1037,6 +1024,7 @@ public class DFSInputStream extends FSIn
     int cacheTries = 0;
     DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
     BlockReader reader = null;
+    final int nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
     for (; cacheTries < nCachedConnRetry; ++cacheTries) {
       Peer peer = peerCache.get(chosenNode, true);
       if (peer == null) break;
@@ -1044,7 +1032,7 @@ public class DFSInputStream extends FSIn
         boolean allowShortCircuitLocalReads = dfsClient.getConf().
             shortCircuitLocalReads && (!shortCircuitForbidden());
         reader = BlockReaderFactory.newBlockReader(
-            dfsClient.conf, file, block, blockToken, startOffset,
+            dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
             dsFactory, peerCache, fileInputStreamCache,
             allowShortCircuitLocalReads);
@@ -1067,7 +1055,7 @@ public class DFSInputStream extends FSIn
         boolean allowShortCircuitLocalReads = dfsClient.getConf().
             shortCircuitLocalReads && (!shortCircuitForbidden());
         reader = BlockReaderFactory.newBlockReader(
-            dfsClient.conf, file, block, blockToken, startOffset,
+            dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode,
             dsFactory, peerCache, fileInputStreamCache,
             allowShortCircuitLocalReads);
@@ -1091,7 +1079,7 @@ public class DFSInputStream extends FSIn
       if (peer == null) break;
       try {
         reader = BlockReaderFactory.newBlockReader(
-            dfsClient.conf, file, block, blockToken, startOffset,
+            dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
             dsFactory, peerCache, fileInputStreamCache, false);
         return reader;
@@ -1110,7 +1098,7 @@ public class DFSInputStream extends FSIn
     // Try to create a new remote peer.
     Peer peer = newTcpPeer(dnAddr);
     return BlockReaderFactory.newBlockReader(
-        dfsClient.conf, file, block, blockToken, startOffset,
+        dfsClient.getConf(), file, block, blockToken, startOffset,
         len, verifyChecksum, clientName, peer, chosenNode, 
         dsFactory, peerCache, fileInputStreamCache, false);
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Jun 20 04:50:24 2013
@@ -1288,7 +1288,8 @@ public class DFSOutputStream extends FSO
    */
   static Socket createSocketForPipeline(final DatanodeInfo first,
       final int length, final DFSClient client) throws IOException {
-    final String dnAddr = first.getXferAddr(client.connectToDnViaHostname());
+    final String dnAddr = first.getXferAddr(
+        client.getConf().connectToDnViaHostname);
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
     }
@@ -1813,8 +1814,8 @@ public class DFSOutputStream extends FSO
     if (closed) {
       return;
     }
-    streamer.setLastException(new IOException("Lease timeout of " +
-                             (dfsClient.hdfsTimeout/1000) + " seconds expired."));
+    streamer.setLastException(new IOException("Lease timeout of "
+        + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
     closeThreads(true);
     dfsClient.endFileLease(src);
   }
@@ -1884,13 +1885,13 @@ public class DFSOutputStream extends FSO
     while (!fileComplete) {
       fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last);
       if (!fileComplete) {
+        final int hdfsTimeout = dfsClient.getHdfsTimeout();
         if (!dfsClient.clientRunning ||
-              (dfsClient.hdfsTimeout > 0 &&
-               localstart + dfsClient.hdfsTimeout < Time.now())) {
+              (hdfsTimeout > 0 && localstart + hdfsTimeout < Time.now())) {
             String msg = "Unable to close file because dfsclient " +
                           " was unable to contact the HDFS servers." +
                           " clientRunning " + dfsClient.clientRunning +
-                          " hdfsTimeout " + dfsClient.hdfsTimeout;
+                          " hdfsTimeout " + hdfsTimeout;
             DFSClient.LOG.info(msg);
             throw new IOException(msg);
         }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java Thu Jun 20 04:50:24 2013
@@ -23,16 +23,15 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
-
 import org.apache.hadoop.net.unix.DomainSocket;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 
 class DomainSocketFactory {
-  public static final Log LOG = LogFactory.getLog(DomainSocketFactory.class);
+  private static final Log LOG = BlockReaderLocal.LOG;
   private final Conf conf;
 
   enum PathStatus {
@@ -51,21 +50,26 @@ class DomainSocketFactory {
   public DomainSocketFactory(Conf conf) {
     this.conf = conf;
 
-    String feature = null;
+    final String feature;
     if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) {
       feature = "The short-circuit local reads feature";
     } else if (conf.domainSocketDataTraffic) {
       feature = "UNIX domain socket data traffic";
+    } else {
+      feature = null;
     }
-    if (feature != null) {
+
+    if (feature == null) {
+      LOG.debug("Both short-circuit local reads and UNIX domain socket are disabled.");
+    } else {
       if (conf.domainSocketPath.isEmpty()) {
-        LOG.warn(feature + " is disabled because you have not set " +
-            DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY);
+        throw new HadoopIllegalArgumentException(feature + " is enabled but "
+            + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set.");
       } else if (DomainSocket.getLoadingFailureReason() != null) {
-        LOG.warn(feature + " is disabled because " +
-              DomainSocket.getLoadingFailureReason());
+        LOG.warn(feature + " cannot be used because "
+            + DomainSocket.getLoadingFailureReason());
       } else {
-        LOG.debug(feature + "is enabled.");
+        LOG.debug(feature + " is enabled.");
       }
     }
   }
@@ -86,8 +90,8 @@ class DomainSocketFactory {
     // sockets.
     if (conf.domainSocketPath.isEmpty()) return null;
     // If we can't do anything with the domain socket, don't create it.
-    if ((conf.domainSocketDataTraffic == false) &&
-        ((!conf.shortCircuitLocalReads) || conf.useLegacyBlockReaderLocal)) {
+    if (!conf.domainSocketDataTraffic &&
+        (!conf.shortCircuitLocalReads || conf.useLegacyBlockReaderLocal)) {
       return null;
     }
     // UNIX domain sockets can only be used to talk to local peers

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Thu Jun 20 04:50:24 2013
@@ -22,7 +22,6 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -462,18 +461,6 @@ public class RemoteBlockReader extends F
                peer.getRemoteAddressString() + ": " + e.getMessage());
     }
   }
-  
-  /**
-   * File name to print when accessing a block directly (from servlets)
-   * @param s Address of the block location
-   * @param poolId Block pool ID of the block
-   * @param blockId Block ID of the block
-   * @return string that has a file name for debug purposes
-   */
-  public static String getFileName(final InetSocketAddress s,
-      final String poolId, final long blockId) {
-    return s.toString() + ":" + poolId + ":" + blockId;
-  }
 
   @Override
   public int read(ByteBuffer buf) throws IOException {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu Jun 20 04:50:24 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -198,7 +199,8 @@ public class JspHelper {
   public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
-      JspWriter out, Configuration conf, DataEncryptionKey encryptionKey)
+      JspWriter out, Configuration conf, DFSClient.Conf dfsConf,
+      DataEncryptionKey encryptionKey)
           throws IOException {
     if (chunkSizeToView == 0) return;
     Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
@@ -209,8 +211,7 @@ public class JspHelper {
       
       // Use the block name for file name. 
     String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
-    BlockReader blockReader = BlockReaderFactory.newBlockReader(
-        conf, file,
+    BlockReader blockReader = BlockReaderFactory.newBlockReader(dfsConf, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
         offsetIntoBlock, amtToRead,  true,
         "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
@@ -218,7 +219,7 @@ public class JspHelper {
             addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
             null, null, false);
         
-    byte[] buf = new byte[(int)amtToRead];
+    final byte[] buf = new byte[amtToRead];
     int readOffset = 0;
     int retries = 2;
     while ( amtToRead > 0 ) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Thu Jun 20 04:50:24 2013
@@ -604,7 +604,8 @@ public class DatanodeJspHelper {
     try {
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
           datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
-          startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
+          startOffset, chunkSizeToView, out, conf, dfs.getConf(),
+          dfs.getDataEncryptionKey());
     } catch (Exception e) {
       out.print(e);
     }
@@ -697,7 +698,8 @@ public class DatanodeJspHelper {
 
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
-        blockSize, startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
+        blockSize, startOffset, chunkSizeToView, out, conf, dfs.getConf(),
+        dfs.getDataEncryptionKey());
     out.print("</textarea>");
     dfs.close();
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Jun 20 04:50:24 2013
@@ -559,8 +559,8 @@ public class NamenodeFsck {
         
         String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
             block.getBlockId());
-        blockReader = BlockReaderFactory.newBlockReader(
-            conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
+        blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
+            file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
             TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
                 getDataEncryptionKey()),
             chosenNode, null, null, null, false);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Thu Jun 20 04:50:24 2013
@@ -150,7 +150,7 @@ public class BlockReaderTestUtil {
     sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
 
     return BlockReaderFactory.newBlockReader(
-      conf,
+      new DFSClient.Conf(conf),
       targetAddr.toString()+ ":" + block.getBlockId(), block,
       testBlock.getBlockToken(), 
       offset, lenToRead,

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Thu Jun 20 04:50:24 2013
@@ -130,7 +130,7 @@ public class TestBlockReaderLocal {
       test.setup(dataFile, checksum);
       dataIn = new FileInputStream(dataFile);
       checkIn = new FileInputStream(metaFile);
-      blockReaderLocal = new BlockReaderLocal(conf,
+      blockReaderLocal = new BlockReaderLocal(new DFSClient.Conf(conf),
           TEST_PATH.getName(), block, 0, -1,
           dataIn, checkIn, datanodeID, checksum, null);
       dataIn = null;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java Thu Jun 20 04:50:24 2013
@@ -325,7 +325,7 @@ public class TestParallelReadUtil {
       testInfo.filepath = new Path("/TestParallelRead.dat." + i);
       testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
       testInfo.dis = dfsClient.open(testInfo.filepath.toString(),
-          dfsClient.dfsClientConf.ioBufferSize, verifyChecksums);
+          dfsClient.getConf().ioBufferSize, verifyChecksums);
 
       for (int j = 0; j < nWorkerEach; ++j) {
         workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Thu Jun 20 04:50:24 2013
@@ -146,7 +146,7 @@ public class TestBlockTokenWithDFS {
       String file = BlockReaderFactory.getFileName(targetAddr, 
           "test-blockpoolid", block.getBlockId());
       blockReader = BlockReaderFactory.newBlockReader(
-          conf, file, block, lblock.getBlockToken(), 0, -1,
+          new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
           true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
           nodes[0], null, null, null, false);
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Thu Jun 20 04:50:24 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -284,7 +285,7 @@ public class TestDataNodeVolumeFailure {
         "test-blockpoolid",
         block.getBlockId());
     BlockReader blockReader =
-      BlockReaderFactory.newBlockReader(conf, file, block,
+      BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
         lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
         TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false);
     blockReader.close();