You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/06/20 06:50:25 UTC
svn commit: r1494855 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/common/
src/main/java/org/apache/hadoop/hdfs/server/datanode...
Author: szetszwo
Date: Thu Jun 20 04:50:24 2013
New Revision: 1494855
URL: http://svn.apache.org/r1494855
Log:
svn merge -c 1494854 from trunk for HDFS-4914. Use DFSClient.Conf instead of Configuration.
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1494854
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jun 20 04:50:24 2013
@@ -178,6 +178,8 @@ Release 2.1.0-beta - UNRELEASED
HDFS-3009. Remove duplicate code in DFSClient#isLocalAddress by using
NetUtils. (Hari Mankude via suresh)
+ HDFS-4914. Use DFSClient.Conf instead of Configuration. (szetszwo)
+
OPTIMIZATIONS
BUG FIXES
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1494854
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Thu Jun 20 04:50:24 2013
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -40,7 +39,6 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
@@ -75,9 +73,7 @@ public class BlockReaderFactory {
* should be allowed.
* @return New BlockReader instance
*/
- @SuppressWarnings("deprecation")
- public static BlockReader newBlockReader(
- Configuration conf,
+ public static BlockReader newBlockReader(DFSClient.Conf conf,
String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
@@ -91,14 +87,11 @@ public class BlockReaderFactory {
FileInputStreamCache fisCache,
boolean allowShortCircuitLocalReads)
throws IOException {
- peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
- HdfsServerConstants.READ_TIMEOUT));
+ peer.setReadTimeout(conf.socketTimeout);
peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
if (peer.getDomainSocket() != null) {
- if (allowShortCircuitLocalReads &&
- (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
+ if (allowShortCircuitLocalReads && !conf.useLegacyBlockReaderLocal) {
// If this is a domain socket, and short-circuit local reads are
// enabled, try to set up a BlockReaderLocal.
BlockReader reader = newShortCircuitBlockReader(conf, file,
@@ -118,21 +111,19 @@ public class BlockReaderFactory {
// If this is a domain socket and we couldn't (or didn't want to) set
// up a BlockReaderLocal, check that we are allowed to pass data traffic
// over the socket before proceeding.
- if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
+ if (!conf.domainSocketDataTraffic) {
throw new IOException("Because we can't do short-circuit access, " +
"and data traffic over domain sockets is disabled, " +
"we cannot use this socket to talk to " + datanodeID);
}
}
- if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) {
- return RemoteBlockReader.newBlockReader(file,
- block, blockToken, startOffset, len,
- conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
- DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
+ if (conf.useLegacyBlockReader) {
+ @SuppressWarnings("deprecation")
+ RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
+ block, blockToken, startOffset, len, conf.ioBufferSize,
verifyChecksum, clientName, peer, datanodeID, peerCache);
+ return reader;
} else {
return RemoteBlockReader2.newBlockReader(
file, block, blockToken, startOffset, len,
@@ -173,7 +164,7 @@ public class BlockReaderFactory {
* @throws IOException If there was a communication error.
*/
private static BlockReaderLocal newShortCircuitBlockReader(
- Configuration conf, String file, ExtendedBlock block,
+ DFSClient.Conf conf, String file, ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken, long startOffset,
long len, Peer peer, DatanodeID datanodeID,
DomainSocketFactory domSockFactory, boolean verifyChecksum,
@@ -245,15 +236,14 @@ public class BlockReaderFactory {
* This block reader implements the path-based style of local reads
* first introduced in HDFS-2246.
*/
- static BlockReader getLegacyBlockReaderLocal(UserGroupInformation ugi,
- Configuration conf, String src, ExtendedBlock blk,
+ static BlockReader getLegacyBlockReaderLocal(DFSClient dfsClient,
+ String src, ExtendedBlock blk,
Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
- int socketTimeout, long offsetIntoBlock,
- boolean connectToDnViaHostname) throws InvalidToken, IOException {
+ long offsetIntoBlock) throws InvalidToken, IOException {
try {
- return BlockReaderLocalLegacy.newBlockReader(ugi, conf, src,
- blk, accessToken, chosenNode, socketTimeout, offsetIntoBlock,
- blk.getNumBytes() - offsetIntoBlock, connectToDnViaHostname);
+ final long length = blk.getNumBytes() - offsetIntoBlock;
+ return BlockReaderLocalLegacy.newBlockReader(dfsClient, src, blk,
+ accessToken, chosenNode, offsetIntoBlock, length);
} catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Thu Jun 20 04:50:24 2013
@@ -17,10 +17,8 @@
*/
package org.apache.hadoop.hdfs;
-import java.io.DataInputStream;
-import org.apache.hadoop.conf.Configuration;
-
import java.io.BufferedInputStream;
+import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -90,13 +88,8 @@ class BlockReaderLocal implements BlockR
private final FileInputStreamCache fisCache;
- private static int getSlowReadBufferNumChunks(Configuration conf,
+ private static int getSlowReadBufferNumChunks(int bufSize,
int bytesPerChecksum) {
-
- int bufSize =
- conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-
if (bufSize < bytesPerChecksum) {
throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
bufSize + ") is not large enough to hold a single chunk (" +
@@ -108,7 +101,7 @@ class BlockReaderLocal implements BlockR
return bufSize / bytesPerChecksum;
}
- public BlockReaderLocal(Configuration conf, String filename,
+ public BlockReaderLocal(DFSClient.Conf conf, String filename,
ExtendedBlock block, long startOffset, long length,
FileInputStream dataIn, FileInputStream checksumIn,
DatanodeID datanodeID, boolean verifyChecksum,
@@ -132,13 +125,7 @@ class BlockReaderLocal implements BlockR
throw new IOException("Wrong version (" + version + ") of the " +
"metadata file for " + filename + ".");
}
- if (!verifyChecksum) {
- this.verifyChecksum = false;
- } else {
- this.verifyChecksum = !conf.getBoolean(DFSConfigKeys.
- DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
- }
+ this.verifyChecksum = verifyChecksum && !conf.skipShortCircuitChecksums;
long firstChunkOffset;
if (this.verifyChecksum) {
this.checksum = header.getChecksum();
@@ -148,7 +135,8 @@ class BlockReaderLocal implements BlockR
- (startOffset % checksum.getBytesPerChecksum());
this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
- int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+ int chunksPerChecksumRead = getSlowReadBufferNumChunks(
+ conf.shortCircuitBufferSize, bytesPerChecksum);
slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
// Initially the buffers have nothing to read.
@@ -171,7 +159,12 @@ class BlockReaderLocal implements BlockR
this.dataIn.getChannel().position(firstChunkOffset);
success = true;
} finally {
- if (!success) {
+ if (success) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created BlockReaderLocal for file " + filename
+ + " block " + block + " in datanode " + datanodeID);
+ }
+ } else {
if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Thu Jun 20 04:50:24 2013
@@ -21,7 +21,6 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
@@ -32,17 +31,15 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -70,7 +67,7 @@ import org.apache.hadoop.util.DataChecks
* </ul>
*/
class BlockReaderLocalLegacy implements BlockReader {
- private static final Log LOG = LogFactory.getLog(DFSClient.class);
+ private static final Log LOG = LogFactory.getLog(BlockReaderLocalLegacy.class);
//Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo {
@@ -173,19 +170,20 @@ class BlockReaderLocalLegacy implements
/**
* The only way this object can be instantiated.
*/
- static BlockReaderLocalLegacy newBlockReader(UserGroupInformation ugi,
- Configuration conf, String file, ExtendedBlock blk,
- Token<BlockTokenIdentifier> token, DatanodeInfo node, int socketTimeout,
- long startOffset, long length, boolean connectToDnViaHostname)
+ static BlockReaderLocalLegacy newBlockReader(DFSClient dfsClient,
+ String file, ExtendedBlock blk, Token<BlockTokenIdentifier> token,
+ DatanodeInfo node, long startOffset, long length)
throws IOException {
+ final DFSClient.Conf conf = dfsClient.getConf();
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort());
// check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) {
- pathinfo = getBlockPathInfo(ugi, blk, node, conf, socketTimeout, token,
- connectToDnViaHostname);
+ pathinfo = getBlockPathInfo(dfsClient.ugi, blk, node,
+ dfsClient.getConfiguration(), dfsClient.getHdfsTimeout(), token,
+ conf.connectToDnViaHostname);
}
// check to see if the file exists. It may so happen that the
@@ -197,7 +195,7 @@ class BlockReaderLocalLegacy implements
FileInputStream dataIn = null;
FileInputStream checksumIn = null;
BlockReaderLocalLegacy localBlockReader = null;
- boolean skipChecksumCheck = skipChecksumCheck(conf);
+ boolean skipChecksumCheck = conf.skipShortCircuitChecksums;
try {
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
@@ -285,16 +283,8 @@ class BlockReaderLocalLegacy implements
return pathinfo;
}
- private static boolean skipChecksumCheck(Configuration conf) {
- return conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
- }
-
- private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
- int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-
+ private static int getSlowReadBufferNumChunks(int bufferSizeBytes,
+ int bytesPerChecksum) {
if (bufferSizeBytes < bytesPerChecksum) {
throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
"buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
@@ -307,7 +297,7 @@ class BlockReaderLocalLegacy implements
return bufferSizeBytes / bytesPerChecksum;
}
- private BlockReaderLocalLegacy(Configuration conf, String hdfsfile,
+ private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
throws IOException {
@@ -316,7 +306,7 @@ class BlockReaderLocalLegacy implements
dataIn, startOffset, null);
}
- private BlockReaderLocalLegacy(Configuration conf, String hdfsfile,
+ private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
@@ -333,7 +323,8 @@ class BlockReaderLocalLegacy implements
this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
- int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+ int chunksPerChecksumRead = getSlowReadBufferNumChunks(
+ conf.shortCircuitBufferSize, bytesPerChecksum);
slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
// Initially the buffers have nothing to read.
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu Jun 20 04:50:24 2013
@@ -178,6 +178,9 @@ public class DFSClient implements java.i
public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
+
+ private final Configuration conf;
+ private final Conf dfsClientConf;
final ClientProtocol namenode;
/* The service used for delegation tokens */
private Text dtService;
@@ -188,14 +191,11 @@ public class DFSClient implements java.i
private volatile FsServerDefaults serverDefaults;
private volatile long serverDefaultsLastUpdate;
final String clientName;
- Configuration conf;
SocketFactory socketFactory;
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
- final int hdfsTimeout; // timeout value for a DFS operation.
private final String authority;
final PeerCache peerCache;
- final Conf dfsClientConf;
private Random r = new Random();
private SocketAddress[] localInterfaceAddrs;
private DataEncryptionKey encryptionKey;
@@ -204,7 +204,8 @@ public class DFSClient implements java.i
/**
* DFSClient configuration
*/
- static class Conf {
+ public static class Conf {
+ final int hdfsTimeout; // timeout value for a DFS operation.
final int maxFailoverAttempts;
final int failoverSleepBaseMillis;
final int failoverSleepMaxMillis;
@@ -227,18 +228,25 @@ public class DFSClient implements java.i
final short defaultReplication;
final String taskId;
final FsPermission uMask;
- final boolean useLegacyBlockReaderLocal;
final boolean connectToDnViaHostname;
final boolean getHdfsBlocksMetadataEnabled;
final int getFileBlockStorageLocationsNumThreads;
final int getFileBlockStorageLocationsTimeout;
+
+ final boolean useLegacyBlockReader;
+ final boolean useLegacyBlockReaderLocal;
final String domainSocketPath;
final boolean skipShortCircuitChecksums;
final int shortCircuitBufferSize;
final boolean shortCircuitLocalReads;
final boolean domainSocketDataTraffic;
+ final int shortCircuitStreamsCacheSize;
+ final long shortCircuitStreamsCacheExpiryMs;
+
+ public Conf(Configuration conf) {
+ // The hdfsTimeout is currently the same as the ipc timeout
+ hdfsTimeout = Client.getTimeout(conf);
- Conf(Configuration conf) {
maxFailoverAttempts = conf.getInt(
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
@@ -277,19 +285,15 @@ public class DFSClient implements java.i
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
- timeWindow = conf
- .getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
+ timeWindow = conf.getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
- nBlockWriteLocateFollowingRetry = conf
- .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
- DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
+ nBlockWriteLocateFollowingRetry = conf.getInt(
+ DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
+ DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
uMask = FsPermission.getUMask(conf);
- useLegacyBlockReaderLocal = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
getHdfsBlocksMetadataEnabled = conf.getBoolean(
@@ -301,20 +305,50 @@ public class DFSClient implements java.i
getFileBlockStorageLocationsTimeout = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
- domainSocketPath = conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+
+ useLegacyBlockReader = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
+ useLegacyBlockReaderLocal = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
+ shortCircuitLocalReads = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+ domainSocketDataTraffic = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
+ domainSocketPath = conf.getTrimmed(
+ DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
+
+ if (BlockReaderLocal.LOG.isDebugEnabled()) {
+ BlockReaderLocal.LOG.debug(
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
+ + " = " + useLegacyBlockReaderLocal);
+ BlockReaderLocal.LOG.debug(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
+ + " = " + shortCircuitLocalReads);
+ BlockReaderLocal.LOG.debug(
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
+ + " = " + domainSocketDataTraffic);
+ BlockReaderLocal.LOG.debug(
+ DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
+ + " = " + domainSocketPath);
+ }
+
skipShortCircuitChecksums = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
shortCircuitBufferSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
- shortCircuitLocalReads = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
- domainSocketDataTraffic = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
+ shortCircuitStreamsCacheSize = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
+ shortCircuitStreamsCacheExpiryMs = conf.getLong(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
}
private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -360,10 +394,14 @@ public class DFSClient implements java.i
}
}
- Conf getConf() {
+ public Conf getConf() {
return dfsClientConf;
}
+ Configuration getConfiguration() {
+ return conf;
+ }
+
/**
* A map from file names to {@link DFSOutputStream} objects
* that are currently being written by this client.
@@ -426,8 +464,6 @@ public class DFSClient implements java.i
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
- // The hdfsTimeout is currently the same as the ipc timeout
- this.hdfsTimeout = Client.getTimeout(conf);
this.ugi = UserGroupInformation.getCurrentUser();
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
@@ -542,21 +578,13 @@ public class DFSClient implements java.i
}
int getHdfsTimeout() {
- return hdfsTimeout;
+ return dfsClientConf.hdfsTimeout;
}
String getClientName() {
return clientName;
}
- /**
- * @return whether the client should use hostnames instead of IPs
- * when connecting to DataNodes
- */
- boolean connectToDnViaHostname() {
- return dfsClientConf.connectToDnViaHostname;
- }
-
void checkOpen() throws IOException {
if (!clientRunning) {
IOException result = new IOException("Filesystem closed");
@@ -793,6 +821,7 @@ public class DFSClient implements java.i
* @throws IOException
* @deprecated Use Token.renew instead.
*/
+ @Deprecated
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token));
@@ -864,6 +893,7 @@ public class DFSClient implements java.i
* @throws IOException
* @deprecated Use Token.cancel instead.
*/
+ @Deprecated
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token));
@@ -965,6 +995,11 @@ public class DFSClient implements java.i
return dfsClientConf.defaultReplication;
}
+ public LocatedBlocks getLocatedBlocks(String src, long start)
+ throws IOException {
+ return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
+ }
+
/*
* This is just a wrapper around callGetBlockLocations, but non-static so that
* we can stub it out for tests.
@@ -1693,10 +1728,10 @@ public class DFSClient implements java.i
* @param socketFactory to create sockets to connect to DNs
* @param socketTimeout timeout to use when connecting and waiting for a response
* @param encryptionKey the key needed to communicate with DNs in this cluster
- * @param connectToDnViaHostname {@link #connectToDnViaHostname()}
+ * @param connectToDnViaHostname whether the client should use hostnames instead of IPs
* @return The checksum
*/
- static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+ private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
String clientName,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Jun 20 04:50:24 2013
@@ -72,7 +72,6 @@ public class DFSInputStream extends FSIn
private final DFSClient dfsClient;
private boolean closed = false;
private final String src;
- private final long prefetchSize;
private BlockReader blockReader = null;
private final boolean verifyChecksum;
private LocatedBlocks locatedBlocks = null;
@@ -163,7 +162,6 @@ public class DFSInputStream extends FSIn
* capped at maxBlockAcquireFailures
*/
private int failures = 0;
- private final int timeWindow;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@@ -173,8 +171,6 @@ public class DFSInputStream extends FSIn
private final byte[] oneByteBuf = new byte[1]; // used for 'int read()'
- private final int nCachedConnRetry;
-
void addToDeadNodes(DatanodeInfo dnInfo) {
deadNodes.put(dnInfo, dnInfo);
}
@@ -187,15 +183,8 @@ public class DFSInputStream extends FSIn
this.src = src;
this.peerCache = dfsClient.peerCache;
this.fileInputStreamCache = new FileInputStreamCache(
- dfsClient.conf.getInt(DFSConfigKeys.
- DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
- dfsClient.conf.getLong(DFSConfigKeys.
- DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT));
- prefetchSize = dfsClient.getConf().prefetchSize;
- timeWindow = dfsClient.getConf().timeWindow;
- nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
+ dfsClient.getConf().shortCircuitStreamsCacheSize,
+ dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
openInfo();
}
@@ -236,7 +225,7 @@ public class DFSInputStream extends FSIn
}
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
- LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0, prefetchSize);
+ final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
}
@@ -280,8 +269,8 @@ public class DFSInputStream extends FSIn
ClientDatanodeProtocol cdp = null;
try {
- cdp = DFSUtil.createClientDatanodeProtocolProxy(
- datanode, dfsClient.conf, dfsClient.getConf().socketTimeout,
+ cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode,
+ dfsClient.getConfiguration(), dfsClient.getConf().socketTimeout,
dfsClient.getConf().connectToDnViaHostname, locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@@ -389,8 +378,7 @@ public class DFSInputStream extends FSIn
if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
// fetch more blocks
- LocatedBlocks newBlocks;
- newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize);
+ final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
assert (newBlocks != null) : "Could not find target position " + offset;
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
}
@@ -413,8 +401,7 @@ public class DFSInputStream extends FSIn
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
}
// fetch blocks
- LocatedBlocks newBlocks;
- newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize);
+ final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
if (newBlocks == null) {
throw new IOException("Could not find target position " + offset);
}
@@ -832,7 +819,7 @@ public class DFSInputStream extends FSIn
try {
DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
final String dnAddr =
- chosenNode.getXferAddr(dfsClient.connectToDnViaHostname());
+ chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
@@ -861,6 +848,7 @@ public class DFSInputStream extends FSIn
// alleviating the request rate from the server. Similarly the 3rd retry
// will wait 6000ms grace period before retry and the waiting window is
// expanded to 9000ms.
+ final int timeWindow = dfsClient.getConf().timeWindow;
double waitTime = timeWindow * failures + // grace period for the last round of attempt
timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
@@ -1011,7 +999,7 @@ public class DFSInputStream extends FSIn
DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
"the FileInputStreamCache.");
}
- return new BlockReaderLocal(dfsClient.conf, file,
+ return new BlockReaderLocal(dfsClient.getConf(), file,
block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum,
fileInputStreamCache);
}
@@ -1023,9 +1011,8 @@ public class DFSInputStream extends FSIn
DFSClient.isLocalAddress(dnAddr) &&
(!shortCircuitForbidden())) {
try {
- return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient.ugi,
- dfsClient.conf, clientName, block, blockToken, chosenNode,
- dfsClient.hdfsTimeout, startOffset,dfsClient.connectToDnViaHostname());
+ return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient,
+ clientName, block, blockToken, chosenNode, startOffset);
} catch (IOException e) {
DFSClient.LOG.warn("error creating legacy BlockReaderLocal. " +
"Disabling legacy local reads.", e);
@@ -1037,6 +1024,7 @@ public class DFSInputStream extends FSIn
int cacheTries = 0;
DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
BlockReader reader = null;
+ final int nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
for (; cacheTries < nCachedConnRetry; ++cacheTries) {
Peer peer = peerCache.get(chosenNode, true);
if (peer == null) break;
@@ -1044,7 +1032,7 @@ public class DFSInputStream extends FSIn
boolean allowShortCircuitLocalReads = dfsClient.getConf().
shortCircuitLocalReads && (!shortCircuitForbidden());
reader = BlockReaderFactory.newBlockReader(
- dfsClient.conf, file, block, blockToken, startOffset,
+ dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads);
@@ -1067,7 +1055,7 @@ public class DFSInputStream extends FSIn
boolean allowShortCircuitLocalReads = dfsClient.getConf().
shortCircuitLocalReads && (!shortCircuitForbidden());
reader = BlockReaderFactory.newBlockReader(
- dfsClient.conf, file, block, blockToken, startOffset,
+ dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads);
@@ -1091,7 +1079,7 @@ public class DFSInputStream extends FSIn
if (peer == null) break;
try {
reader = BlockReaderFactory.newBlockReader(
- dfsClient.conf, file, block, blockToken, startOffset,
+ dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false);
return reader;
@@ -1110,7 +1098,7 @@ public class DFSInputStream extends FSIn
// Try to create a new remote peer.
Peer peer = newTcpPeer(dnAddr);
return BlockReaderFactory.newBlockReader(
- dfsClient.conf, file, block, blockToken, startOffset,
+ dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false);
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Jun 20 04:50:24 2013
@@ -1288,7 +1288,8 @@ public class DFSOutputStream extends FSO
*/
static Socket createSocketForPipeline(final DatanodeInfo first,
final int length, final DFSClient client) throws IOException {
- final String dnAddr = first.getXferAddr(client.connectToDnViaHostname());
+ final String dnAddr = first.getXferAddr(
+ client.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
@@ -1813,8 +1814,8 @@ public class DFSOutputStream extends FSO
if (closed) {
return;
}
- streamer.setLastException(new IOException("Lease timeout of " +
- (dfsClient.hdfsTimeout/1000) + " seconds expired."));
+ streamer.setLastException(new IOException("Lease timeout of "
+ + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
closeThreads(true);
dfsClient.endFileLease(src);
}
@@ -1884,13 +1885,13 @@ public class DFSOutputStream extends FSO
while (!fileComplete) {
fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last);
if (!fileComplete) {
+ final int hdfsTimeout = dfsClient.getHdfsTimeout();
if (!dfsClient.clientRunning ||
- (dfsClient.hdfsTimeout > 0 &&
- localstart + dfsClient.hdfsTimeout < Time.now())) {
+ (hdfsTimeout > 0 && localstart + hdfsTimeout < Time.now())) {
String msg = "Unable to close file because dfsclient " +
" was unable to contact the HDFS servers." +
" clientRunning " + dfsClient.clientRunning +
- " hdfsTimeout " + dfsClient.hdfsTimeout;
+ " hdfsTimeout " + hdfsTimeout;
DFSClient.LOG.info(msg);
throw new IOException(msg);
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java Thu Jun 20 04:50:24 2013
@@ -23,16 +23,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.DFSClient.Conf;
-
import org.apache.hadoop.net.unix.DomainSocket;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
class DomainSocketFactory {
- public static final Log LOG = LogFactory.getLog(DomainSocketFactory.class);
+ private static final Log LOG = BlockReaderLocal.LOG;
private final Conf conf;
enum PathStatus {
@@ -51,21 +50,26 @@ class DomainSocketFactory {
public DomainSocketFactory(Conf conf) {
this.conf = conf;
- String feature = null;
+ final String feature;
if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) {
feature = "The short-circuit local reads feature";
} else if (conf.domainSocketDataTraffic) {
feature = "UNIX domain socket data traffic";
+ } else {
+ feature = null;
}
- if (feature != null) {
+
+ if (feature == null) {
+ LOG.debug("Both short-circuit local reads and UNIX domain socket are disabled.");
+ } else {
if (conf.domainSocketPath.isEmpty()) {
- LOG.warn(feature + " is disabled because you have not set " +
- DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY);
+ throw new HadoopIllegalArgumentException(feature + " is enabled but "
+ + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set.");
} else if (DomainSocket.getLoadingFailureReason() != null) {
- LOG.warn(feature + " is disabled because " +
- DomainSocket.getLoadingFailureReason());
+ LOG.warn(feature + " cannot be used because "
+ + DomainSocket.getLoadingFailureReason());
} else {
- LOG.debug(feature + "is enabled.");
+ LOG.debug(feature + " is enabled.");
}
}
}
@@ -86,8 +90,8 @@ class DomainSocketFactory {
// sockets.
if (conf.domainSocketPath.isEmpty()) return null;
// If we can't do anything with the domain socket, don't create it.
- if ((conf.domainSocketDataTraffic == false) &&
- ((!conf.shortCircuitLocalReads) || conf.useLegacyBlockReaderLocal)) {
+ if (!conf.domainSocketDataTraffic &&
+ (!conf.shortCircuitLocalReads || conf.useLegacyBlockReaderLocal)) {
return null;
}
// UNIX domain sockets can only be used to talk to local peers
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Thu Jun 20 04:50:24 2013
@@ -22,7 +22,6 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -462,18 +461,6 @@ public class RemoteBlockReader extends F
peer.getRemoteAddressString() + ": " + e.getMessage());
}
}
-
- /**
- * File name to print when accessing a block directly (from servlets)
- * @param s Address of the block location
- * @param poolId Block pool ID of the block
- * @param blockId Block ID of the block
- * @return string that has a file name for debug purposes
- */
- public static String getFileName(final InetSocketAddress s,
- final String poolId, final long blockId) {
- return s.toString() + ":" + poolId + ":" + blockId;
- }
@Override
public int read(ByteBuffer buf) throws IOException {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu Jun 20 04:50:24 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -198,7 +199,8 @@ public class JspHelper {
public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
long blockSize, long offsetIntoBlock, long chunkSizeToView,
- JspWriter out, Configuration conf, DataEncryptionKey encryptionKey)
+ JspWriter out, Configuration conf, DFSClient.Conf dfsConf,
+ DataEncryptionKey encryptionKey)
throws IOException {
if (chunkSizeToView == 0) return;
Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
@@ -209,8 +211,7 @@ public class JspHelper {
// Use the block name for file name.
String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
- BlockReader blockReader = BlockReaderFactory.newBlockReader(
- conf, file,
+ BlockReader blockReader = BlockReaderFactory.newBlockReader(dfsConf, file,
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead, true,
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
@@ -218,7 +219,7 @@ public class JspHelper {
addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
null, null, false);
- byte[] buf = new byte[(int)amtToRead];
+ final byte[] buf = new byte[amtToRead];
int readOffset = 0;
int retries = 2;
while ( amtToRead > 0 ) {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Thu Jun 20 04:50:24 2013
@@ -604,7 +604,8 @@ public class DatanodeJspHelper {
try {
JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
- startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
+ startOffset, chunkSizeToView, out, conf, dfs.getConf(),
+ dfs.getDataEncryptionKey());
} catch (Exception e) {
out.print(e);
}
@@ -697,7 +698,8 @@ public class DatanodeJspHelper {
out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
- blockSize, startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
+ blockSize, startOffset, chunkSizeToView, out, conf, dfs.getConf(),
+ dfs.getDataEncryptionKey());
out.print("</textarea>");
dfs.close();
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Jun 20 04:50:24 2013
@@ -559,8 +559,8 @@ public class NamenodeFsck {
String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
block.getBlockId());
- blockReader = BlockReaderFactory.newBlockReader(
- conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
+ blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
+ file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
getDataEncryptionKey()),
chosenNode, null, null, null, false);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Thu Jun 20 04:50:24 2013
@@ -150,7 +150,7 @@ public class BlockReaderTestUtil {
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
return BlockReaderFactory.newBlockReader(
- conf,
+ new DFSClient.Conf(conf),
targetAddr.toString()+ ":" + block.getBlockId(), block,
testBlock.getBlockToken(),
offset, lenToRead,
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Thu Jun 20 04:50:24 2013
@@ -130,7 +130,7 @@ public class TestBlockReaderLocal {
test.setup(dataFile, checksum);
dataIn = new FileInputStream(dataFile);
checkIn = new FileInputStream(metaFile);
- blockReaderLocal = new BlockReaderLocal(conf,
+ blockReaderLocal = new BlockReaderLocal(new DFSClient.Conf(conf),
TEST_PATH.getName(), block, 0, -1,
dataIn, checkIn, datanodeID, checksum, null);
dataIn = null;
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java Thu Jun 20 04:50:24 2013
@@ -325,7 +325,7 @@ public class TestParallelReadUtil {
testInfo.filepath = new Path("/TestParallelRead.dat." + i);
testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
testInfo.dis = dfsClient.open(testInfo.filepath.toString(),
- dfsClient.dfsClientConf.ioBufferSize, verifyChecksums);
+ dfsClient.getConf().ioBufferSize, verifyChecksums);
for (int j = 0; j < nWorkerEach; ++j) {
workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Thu Jun 20 04:50:24 2013
@@ -146,7 +146,7 @@ public class TestBlockTokenWithDFS {
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader(
- conf, file, block, lblock.getBlockToken(), 0, -1,
+ new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
nodes[0], null, null, null, false);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1494855&r1=1494854&r2=1494855&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Thu Jun 20 04:50:24 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -284,7 +285,7 @@ public class TestDataNodeVolumeFailure {
"test-blockpoolid",
block.getBlockId());
BlockReader blockReader =
- BlockReaderFactory.newBlockReader(conf, file, block,
+ BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false);
blockReader.close();