You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/06 01:20:57 UTC
[17/17] hadoop git commit: HDFS-8314. Move
HdfsServerConstants#IO_FILE_BUFFER_SIZE and SMALL_BUFFER_SIZE to the users.
Contributed by Li Lu.
HDFS-8314. Move HdfsServerConstants#IO_FILE_BUFFER_SIZE and SMALL_BUFFER_SIZE to the users. Contributed by Li Lu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4da8490b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4da8490b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4da8490b
Branch: refs/heads/HDFS-7240
Commit: 4da8490b512a33a255ed27309860859388d7c168
Parents: 9809a16
Author: Haohui Mai <wh...@apache.org>
Authored: Tue May 5 15:41:22 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue May 5 15:41:22 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++
.../java/org/apache/hadoop/hdfs/DFSClient.java | 6 +++--
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 11 +++++++++
.../org/apache/hadoop/hdfs/DataStreamer.java | 7 +++---
.../hadoop/hdfs/server/balancer/Dispatcher.java | 7 ++++--
.../hdfs/server/common/HdfsServerConstants.java | 8 ------
.../server/datanode/BlockMetadataHeader.java | 7 ++++--
.../hdfs/server/datanode/BlockReceiver.java | 5 ++--
.../hdfs/server/datanode/BlockSender.java | 14 ++++++++---
.../hadoop/hdfs/server/datanode/DataNode.java | 2 +-
.../hdfs/server/datanode/DataXceiver.java | 26 ++++++++++++--------
.../datanode/fsdataset/impl/BlockPoolSlice.java | 7 ++++--
.../datanode/fsdataset/impl/FsDatasetImpl.java | 26 ++++++++++++--------
.../impl/RamDiskAsyncLazyPersistService.java | 8 ++++--
.../hdfs/server/namenode/TransferFsImage.java | 10 +++++---
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +-
16 files changed, 95 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cc6758f..92a82c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -513,6 +513,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote
NameNode (Charles Lamb via Colin P. McCabe)
+ HDFS-8314. Move HdfsServerConstants#IO_FILE_BUFFER_SIZE and
+ SMALL_BUFFER_SIZE to the users. (Li Lu via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 99b8d2c..c145959 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -238,6 +238,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
new DFSHedgedReadMetrics();
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
private final Sampler<?> traceSampler;
+ private final int smallBufferSize;
public DfsClientConf getConf() {
return dfsClientConf;
@@ -309,6 +310,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.stats = stats;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
+ this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);
this.ugi = UserGroupInformation.getCurrentUser();
@@ -1902,7 +1904,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
//connect to a datanode
IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
out = new DataOutputStream(new BufferedOutputStream(pair.out,
- HdfsServerConstants.SMALL_BUFFER_SIZE));
+ smallBufferSize));
in = new DataInputStream(pair.in);
if (LOG.isDebugEnabled()) {
@@ -2067,7 +2069,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
try {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
- HdfsServerConstants.SMALL_BUFFER_SIZE));
+ smallBufferSize));
DataInputStream in = new DataInputStream(pair.in);
new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 42ff7fa..a925a60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -1514,4 +1515,14 @@ public class DFSUtil {
.createKeyProviderCryptoExtension(keyProvider);
return cryptoProvider;
}
+
+ public static int getIoFileBufferSize(Configuration conf) {
+ return conf.getInt(
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+ }
+
+ public static int getSmallBufferSize(Configuration conf) {
+ return Math.min(getIoFileBufferSize(conf) / 2, 512);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 43787ab..96bf212 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
@@ -92,7 +91,6 @@ import org.apache.htrace.Trace;
import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope;
-import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -123,6 +121,7 @@ import com.google.common.cache.RemovalNotification;
@InterfaceAudience.Private
class DataStreamer extends Daemon {
static final Log LOG = LogFactory.getLog(DataStreamer.class);
+
/**
* Create a socket for a write pipeline
*
@@ -1145,7 +1144,7 @@ class DataStreamer extends Daemon {
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- HdfsServerConstants.SMALL_BUFFER_SIZE));
+ DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
in = new DataInputStream(unbufIn);
//send the TRANSFER_BLOCK request
@@ -1425,7 +1424,7 @@ class DataStreamer extends Daemon {
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- HdfsServerConstants.SMALL_BUFFER_SIZE));
+ DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
blockReplyStream = new DataInputStream(unbufIn);
//
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index ba16905..4a8f40f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -118,6 +118,8 @@ public class Dispatcher {
/** The maximum number of concurrent blocks moves at a datanode */
private final int maxConcurrentMovesPerNode;
+ private final int ioFileBufferSize;
+
private static class GlobalBlockMap {
private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
@@ -308,9 +310,9 @@ public class Dispatcher {
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- HdfsServerConstants.IO_FILE_BUFFER_SIZE));
+ ioFileBufferSize));
in = new DataInputStream(new BufferedInputStream(unbufIn,
- HdfsServerConstants.IO_FILE_BUFFER_SIZE));
+ ioFileBufferSize));
sendRequest(out, eb, accessToken);
receiveResponse(in);
@@ -801,6 +803,7 @@ public class Dispatcher {
this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
+ this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf);
}
public DistributedFileSystem getDistributedFileSystem() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index 47b0818..31af6c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -24,9 +24,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@@ -56,12 +54,6 @@ public interface HdfsServerConstants {
// to 1k.
int MAX_PATH_LENGTH = 8000;
int MAX_PATH_DEPTH = 1000;
- int IO_FILE_BUFFER_SIZE = new HdfsConfiguration().getInt(
- CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
- CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
- // Used for writing header etc.
- int SMALL_BUFFER_SIZE = Math.min(IO_FILE_BUFFER_SIZE / 2,
- 512);
// An invalid transaction ID that will never be seen in a real namesystem.
long INVALID_TXID = -12345;
// Number of generation stamps reserved for legacy blocks.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
index 04700b8..4977fd7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
@@ -33,7 +33,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
@@ -60,6 +61,8 @@ public class BlockMetadataHeader {
*/
private final short version;
private DataChecksum checksum = null;
+
+ private static final HdfsConfiguration conf = new HdfsConfiguration();
@VisibleForTesting
public BlockMetadataHeader(short version, DataChecksum checksum) {
@@ -85,7 +88,7 @@ public class BlockMetadataHeader {
DataInputStream in = null;
try {
in = new DataInputStream(new BufferedInputStream(
- new FileInputStream(metaFile), HdfsServerConstants.IO_FILE_BUFFER_SIZE));
+ new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf)));
return readDataChecksum(in, metaFile);
} finally {
IOUtils.closeStream(in);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 10692d4..90e2194 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -39,6 +39,7 @@ import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
@@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -246,7 +246,8 @@ class BlockReceiver implements Closeable {
out.getClass());
}
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
- streams.getChecksumOut(), HdfsServerConstants.SMALL_BUFFER_SIZE));
+ streams.getChecksumOut(), DFSUtil.getSmallBufferSize(
+ datanode.getConf())));
// write data chunk header if creating a new replica
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 6b958a2..79f4dd7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -34,9 +34,10 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -104,8 +105,13 @@ class BlockSender implements java.io.Closeable {
* not sure if there will be much more improvement.
*/
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+ private static final int IO_FILE_BUFFER_SIZE;
+ static {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf);
+ }
private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
- HdfsServerConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
+ IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
/** the block to read from */
private final ExtendedBlock block;
@@ -298,7 +304,7 @@ class BlockSender implements java.io.Closeable {
// storage and computes the checksum.
if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) {
checksumIn = new DataInputStream(new BufferedInputStream(
- metaIn, HdfsServerConstants.IO_FILE_BUFFER_SIZE));
+ metaIn, IO_FILE_BUFFER_SIZE));
csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
keepMetaInOpen = true;
@@ -747,7 +753,7 @@ class BlockSender implements java.io.Closeable {
pktBufSize += checksumSize * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
- numberOfChunks(HdfsServerConstants.IO_FILE_BUFFER_SIZE));
+ numberOfChunks(IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum and data
pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 22b1520..d2b2939 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2156,7 +2156,7 @@ public class DataNode extends ReconfigurableBase
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- HdfsServerConstants.SMALL_BUFFER_SIZE));
+ DFSUtil.getSmallBufferSize(conf)));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, true, DataNode.this, null, cachingStrategy);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index de25579..26d669c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -48,7 +48,9 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -109,7 +111,9 @@ class DataXceiver extends Receiver implements Runnable {
private final InputStream socketIn;
private OutputStream socketOut;
private BlockReceiver blockReceiver = null;
-
+ private final int ioFileBufferSize;
+ private final int smallBufferSize;
+
/**
* Client Name used in previous operation. Not available on first request
* on the socket.
@@ -131,6 +135,8 @@ class DataXceiver extends Receiver implements Runnable {
this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
+ this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(datanode.getConf());
+ this.smallBufferSize = DFSUtil.getSmallBufferSize(datanode.getConf());
remoteAddress = peer.getRemoteAddressString();
final int colonIdx = remoteAddress.indexOf(':');
remoteAddressWithoutPort =
@@ -191,7 +197,7 @@ class DataXceiver extends Receiver implements Runnable {
socketIn, datanode.getXferAddress().getPort(),
datanode.getDatanodeId());
input = new BufferedInputStream(saslStreams.in,
- HdfsServerConstants.SMALL_BUFFER_SIZE);
+ smallBufferSize);
socketOut = saslStreams.out;
} catch (InvalidMagicNumberException imne) {
if (imne.isHandshake4Encryption()) {
@@ -514,7 +520,7 @@ class DataXceiver extends Receiver implements Runnable {
long read = 0;
OutputStream baseStream = getOutputStream();
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
- baseStream, HdfsServerConstants.SMALL_BUFFER_SIZE));
+ baseStream, smallBufferSize));
checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
@@ -658,7 +664,7 @@ class DataXceiver extends Receiver implements Runnable {
final DataOutputStream replyOut = new DataOutputStream(
new BufferedOutputStream(
getOutputStream(),
- HdfsServerConstants.SMALL_BUFFER_SIZE));
+ smallBufferSize));
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
@@ -717,7 +723,7 @@ class DataXceiver extends Receiver implements Runnable {
unbufMirrorOut = saslStreams.out;
unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
- HdfsServerConstants.SMALL_BUFFER_SIZE));
+ smallBufferSize));
mirrorIn = new DataInputStream(unbufMirrorIn);
// Do not propagate allowLazyPersist to downstream DataNodes.
@@ -932,7 +938,7 @@ class DataXceiver extends Receiver implements Runnable {
.getMetaDataInputStream(block);
final DataInputStream checksumIn = new DataInputStream(
- new BufferedInputStream(metadataIn, HdfsServerConstants.IO_FILE_BUFFER_SIZE));
+ new BufferedInputStream(metadataIn, ioFileBufferSize));
updateCurrentThreadName("Getting checksum for block " + block);
try {
//read metadata file
@@ -1024,7 +1030,7 @@ class DataXceiver extends Receiver implements Runnable {
// set up response stream
OutputStream baseStream = getOutputStream();
reply = new DataOutputStream(new BufferedOutputStream(
- baseStream, HdfsServerConstants.SMALL_BUFFER_SIZE));
+ baseStream, smallBufferSize));
// send status first
writeSuccessWithChecksumInfo(blockSender, reply);
@@ -1131,10 +1137,10 @@ class DataXceiver extends Receiver implements Runnable {
unbufProxyOut = saslStreams.out;
unbufProxyIn = saslStreams.in;
- proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
- HdfsServerConstants.SMALL_BUFFER_SIZE));
+ proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
+ smallBufferSize));
proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
- HdfsServerConstants.IO_FILE_BUFFER_SIZE));
+ ioFileBufferSize));
/* send request to the proxy */
IoeDuringCopyBlockOperation = true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 414d078..a47d564 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -38,10 +38,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -76,6 +76,7 @@ class BlockPoolSlice {
private final File lazypersistDir;
private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica
+ private final int ioFileBufferSize;
private static final String DU_CACHE_FILE = "dfsUsed";
private volatile boolean dfsUsedSaved = false;
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
@@ -108,6 +109,8 @@ class BlockPoolSlice {
}
}
+ this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf);
+
this.deleteDuplicateReplicas = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
@@ -612,7 +615,7 @@ class BlockPoolSlice {
}
checksumIn = new DataInputStream(
new BufferedInputStream(new FileInputStream(metaFile),
- HdfsServerConstants.IO_FILE_BUFFER_SIZE));
+ ioFileBufferSize));
// read and handle the common header here. For now just a version
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 31ac414..9c0d86d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -58,7 +58,9 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -66,7 +68,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
@@ -247,6 +248,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
+ private final int smallBufferSize;
// Used for synchronizing access to usage stats
private final Object statsLock = new Object();
@@ -264,6 +266,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.datanode = datanode;
this.dataStorage = storage;
this.conf = conf;
+ this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated =
@@ -837,19 +840,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @throws IOException
*/
static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
- File srcFile, File destRoot, boolean calculateChecksum)
- throws IOException {
+ File srcFile, File destRoot, boolean calculateChecksum,
+ int smallBufferSize) throws IOException {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
final File dstFile = new File(destDir, srcFile.getName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
- return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum);
+ return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum,
+ smallBufferSize);
}
static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta,
- File dstFile, boolean calculateChecksum)
+ File dstFile, boolean calculateChecksum,
+ int smallBufferSize)
throws IOException {
if (calculateChecksum) {
- computeChecksum(srcMeta, dstMeta, srcFile);
+ computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize);
} else {
try {
Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
@@ -913,7 +918,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File[] blockFiles = copyBlockFiles(block.getBlockId(),
block.getGenerationStamp(), oldMetaFile, oldBlockFile,
targetVolume.getTmpDir(block.getBlockPoolId()),
- replicaInfo.isOnTransientStorage());
+ replicaInfo.isOnTransientStorage(), smallBufferSize);
ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
@@ -941,7 +946,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @param blockFile block file for which the checksum will be computed
* @throws IOException
*/
- private static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
+ private static void computeChecksum(File srcMeta, File dstMeta,
+ File blockFile, int smallBufferSize)
throws IOException {
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
final byte[] data = new byte[1 << 16];
@@ -957,7 +963,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
metaOut = new DataOutputStream(new BufferedOutputStream(
- new FileOutputStream(dstMeta), HdfsServerConstants.SMALL_BUFFER_SIZE));
+ new FileOutputStream(dstMeta), smallBufferSize));
BlockMetadataHeader.writeHeader(metaOut, checksum);
int offset = 0;
@@ -2480,7 +2486,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
return copyBlockFiles(replicaInfo.getMetaFile(),
replicaInfo.getBlockFile(),
- dstMetaFile, dstBlockFile, true);
+ dstMetaFile, dstBlockFile, true, smallBufferSize);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index cf8de0a..884df2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
-import javax.ws.rs.HEAD;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
@@ -55,6 +56,7 @@ class RamDiskAsyncLazyPersistService {
private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
+ private final static HdfsConfiguration EMPTY_HDFS_CONF = new HdfsConfiguration();
/**
* Create a RamDiskAsyncLazyPersistService with a set of volumes (specified by their
@@ -234,9 +236,11 @@ class RamDiskAsyncLazyPersistService {
boolean succeeded = false;
final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset();
try (FsVolumeReference ref = this.targetVolume) {
+ int smallBufferSize = DFSUtil.getSmallBufferSize(EMPTY_HDFS_CONF);
// No FsDatasetImpl lock for the file copy
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
- blockId, genStamp, metaFile, blockFile, lazyPersistDir, true);
+ blockId, genStamp, metaFile, blockFile, lazyPersistDir, true,
+ smallBufferSize);
// Lock FsDataSetImpl during onCompleteLazyPersist callback
dataset.onCompleteLazyPersist(bpId, blockId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index 041c3cb..9783cca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -43,8 +43,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
@@ -77,6 +77,7 @@ public class TransferFsImage {
private final static String CONTENT_TYPE = "Content-Type";
private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
+ private final static int IO_FILE_BUFFER_SIZE;
@VisibleForTesting
static int timeout = 0;
@@ -88,6 +89,7 @@ public class TransferFsImage {
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
+ IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf);
}
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
@@ -336,7 +338,7 @@ public class TransferFsImage {
private static void copyFileToStream(OutputStream out, File localfile,
FileInputStream infile, DataTransferThrottler throttler,
Canceler canceler) throws IOException {
- byte buf[] = new byte[HdfsServerConstants.IO_FILE_BUFFER_SIZE];
+ byte buf[] = new byte[IO_FILE_BUFFER_SIZE];
try {
CheckpointFaultInjector.getInstance()
.aboutToSendFile(localfile);
@@ -345,7 +347,7 @@ public class TransferFsImage {
shouldSendShortFile(localfile)) {
// Test sending image shorter than localfile
long len = localfile.length();
- buf = new byte[(int)Math.min(len/2, HdfsServerConstants.IO_FILE_BUFFER_SIZE)];
+ buf = new byte[(int)Math.min(len/2, IO_FILE_BUFFER_SIZE)];
// This will read at most half of the image
// and the rest of the image will be sent over the wire
infile.read(buf);
@@ -510,7 +512,7 @@ public class TransferFsImage {
}
int num = 1;
- byte[] buf = new byte[HdfsServerConstants.IO_FILE_BUFFER_SIZE];
+ byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
while (num > 0) {
num = stream.read(buf);
if (num > 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4da8490b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index cfee997..a88a459 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -956,7 +956,7 @@ public class DFSTestUtil {
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(s, writeTimeout),
- HdfsServerConstants.SMALL_BUFFER_SIZE));
+ DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
// send the request