You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/10/29 00:18:43 UTC
svn commit: r1190626 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
Author: todd
Date: Fri Oct 28 22:18:42 2011
New Revision: 1190626
URL: http://svn.apache.org/viewvc?rev=1190626&view=rev
Log:
HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. Contributed by Todd Lipcon.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1190626&r1=1190625&r2=1190626&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Oct 28 22:18:42 2011
@@ -848,6 +848,8 @@ Release 0.23.0 - Unreleased
HDFS-2500. Avoid file system operations in BPOfferService thread while
processing deletes. (todd)
+ HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. (todd)
+
BUG FIXES
HDFS-2347. Fix checkpointTxnCount's comment about editlog size.
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1190626&r1=1190625&r2=1190626&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Oct 28 22:18:42 2011
@@ -54,6 +54,15 @@ public class DFSConfigKeys extends Commo
public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
+ public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
+ public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 0;
+ public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";
+ public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
+ public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
+ public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
+ public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
+ public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
+
public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50070";
public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1190626&r1=1190625&r2=1190626&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Oct 28 22:18:42 2011
@@ -24,6 +24,7 @@ import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
+import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.PureJavaCrc32;
@@ -57,10 +59,13 @@ import org.apache.hadoop.util.PureJavaCr
class BlockReceiver implements Closeable {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
+
+ private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
private OutputStream out = null; // to block file at local disk
+ private FileDescriptor outFd;
private OutputStream cout = null; // output stream for cehcksum file
private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum;
@@ -80,6 +85,11 @@ class BlockReceiver implements Closeable
private final DataNode datanode;
volatile private boolean mirrorError;
+ // Cache management state
+ private boolean dropCacheBehindWrites;
+ private boolean syncBehindWrites;
+ private long lastCacheDropOffset = 0;
+
/** The client name. It is empty if a datanode is the client */
private final String clientname;
private final boolean isClient;
@@ -170,6 +180,8 @@ class BlockReceiver implements Closeable
this.checksum = DataChecksum.newDataChecksum(in);
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
+ this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
+ this.syncBehindWrites = datanode.shouldSyncBehindWrites();
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
@@ -177,6 +189,12 @@ class BlockReceiver implements Closeable
this.bytesPerChecksum, this.checksumSize);
if (streams != null) {
this.out = streams.dataOut;
+ if (out instanceof FileOutputStream) {
+ this.outFd = ((FileOutputStream)out).getFD();
+ } else {
+ LOG.warn("Could not get file descriptor for outputstream of class " +
+ out.getClass());
+ }
this.cout = streams.checksumOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
@@ -631,6 +649,8 @@ class BlockReceiver implements Closeable
);
datanode.metrics.incrBytesWritten(len);
+
+ dropOsCacheBehindWriter(offsetInBlock);
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
@@ -645,6 +665,28 @@ class BlockReceiver implements Closeable
return lastPacketInBlock?-1:len;
}
+ private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException {
+ try {
+ if (outFd != null &&
+ offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
+ long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
+ if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
+ NativeIO.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
+ NativeIO.POSIX_FADV_DONTNEED);
+ }
+
+ if (syncBehindWrites) {
+ NativeIO.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
+ NativeIO.SYNC_FILE_RANGE_WRITE);
+ }
+
+ lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
+ }
+ } catch (Throwable t) {
+ LOG.warn("Couldn't drop os cache behind writer for " + block, t);
+ }
+ }
+
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
checksum.writeHeader(mirrorOut);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1190626&r1=1190625&r2=1190626&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Oct 28 22:18:42 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.da
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -36,6 +37,9 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
@@ -118,7 +122,9 @@ class BlockSender implements java.io.Clo
private DataInputStream checksumIn;
/** Checksum utility */
private final DataChecksum checksum;
- /** Starting position to read */
+ /** Initial position to read */
+ private long initialOffset;
+ /** Current position of read */
private long offset;
/** Position of last byte to read from block file */
private final long endOffset;
@@ -142,6 +148,24 @@ class BlockSender implements java.io.Clo
private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null;
+ /** The file descriptor of the block being sent */
+ private FileDescriptor blockInFd;
+
+ // Cache-management related fields
+ private final long readaheadLength;
+ private boolean shouldDropCacheBehindRead;
+ private ReadaheadRequest curReadahead;
+ private long lastCacheDropOffset;
+ private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+ /**
+ * Minimum length of read below which management of the OS
+ * buffer cache is disabled.
+ */
+ private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
+
+ private static ReadaheadPool readaheadPool =
+ ReadaheadPool.getInstance();
+
/**
* Constructor
*
@@ -165,6 +189,8 @@ class BlockSender implements java.io.Clo
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;
+ this.readaheadLength = datanode.getReadaheadLength();
+ this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
synchronized(datanode.data) {
this.replica = getReplica(block, datanode);
@@ -277,6 +303,11 @@ class BlockSender implements java.io.Clo
DataNode.LOG.debug("replica=" + replica);
}
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+ if (blockIn instanceof FileInputStream) {
+ blockInFd = ((FileInputStream)blockIn).getFD();
+ } else {
+ blockInFd = null;
+ }
} catch (IOException ioe) {
IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
@@ -288,6 +319,20 @@ class BlockSender implements java.io.Clo
* close opened files.
*/
public void close() throws IOException {
+ if (blockInFd != null && shouldDropCacheBehindRead) {
+ // drop the last few MB of the file from cache
+ try {
+ NativeIO.posixFadviseIfPossible(
+ blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
+ NativeIO.POSIX_FADV_DONTNEED);
+ } catch (Exception e) {
+ LOG.warn("Unable to drop cache on file close", e);
+ }
+ }
+ if (curReadahead != null) {
+ curReadahead.cancel();
+ }
+
IOException ioe = null;
if(checksumIn!=null) {
try {
@@ -304,6 +349,7 @@ class BlockSender implements java.io.Clo
ioe = e;
}
blockIn = null;
+ blockInFd = null;
}
// throw IOException if there is any
if(ioe!= null) {
@@ -538,10 +584,20 @@ class BlockSender implements java.io.Clo
if (out == null) {
throw new IOException( "out stream is null" );
}
- final long initialOffset = offset;
+ initialOffset = offset;
long totalRead = 0;
OutputStream streamForSendChunks = out;
+ lastCacheDropOffset = initialOffset;
+
+ if (isLongRead() && blockInFd != null) {
+ // Advise that this file descriptor will be accessed sequentially.
+ NativeIO.posixFadviseIfPossible(blockInFd, 0, 0, NativeIO.POSIX_FADV_SEQUENTIAL);
+ }
+
+ // Trigger readahead of beginning of file if configured.
+ manageOsCache();
+
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try {
writeChecksumHeader(out);
@@ -569,6 +625,7 @@ class BlockSender implements java.io.Clo
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
while (endOffset > offset) {
+ manageOsCache();
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, throttler);
offset += len;
@@ -595,6 +652,45 @@ class BlockSender implements java.io.Clo
}
return totalRead;
}
+
+ /**
+ * Manage the OS buffer cache by performing read-ahead
+ * and drop-behind.
+ */
+ private void manageOsCache() throws IOException {
+ if (!isLongRead() || blockInFd == null) {
+ // don't manage cache manually for short-reads, like
+ // HBase random read workloads.
+ return;
+ }
+
+ // Perform readahead if necessary
+ if (readaheadLength > 0 && readaheadPool != null) {
+ curReadahead = readaheadPool.readaheadStream(
+ clientTraceFmt, blockInFd,
+ offset, readaheadLength, Long.MAX_VALUE,
+ curReadahead);
+ }
+
+ // Drop what we've just read from cache, since we aren't
+ // likely to need it again
+ long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
+ if (shouldDropCacheBehindRead &&
+ offset >= nextCacheDropOffset) {
+ long dropLength = offset - lastCacheDropOffset;
+ if (dropLength >= 1024) {
+ NativeIO.posixFadviseIfPossible(blockInFd,
+ lastCacheDropOffset, dropLength,
+ NativeIO.POSIX_FADV_DONTNEED);
+ }
+ lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
+ }
+ }
+
+ private boolean isLongRead() {
+ return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
+ }
+
/**
* Write checksum header to the output stream
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1190626&r1=1190625&r2=1190626&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Oct 28 22:18:42 2011
@@ -104,6 +104,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -418,6 +419,11 @@ public class DataNode extends Configured
int socketTimeout;
int socketWriteTimeout = 0;
boolean transferToAllowed = true;
+ private boolean dropCacheBehindWrites = false;
+ private boolean syncBehindWrites = false;
+ private boolean dropCacheBehindReads = false;
+ private long readaheadLength = 0;
+
int writePacketSize = 0;
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
@@ -501,6 +507,20 @@ public class DataNode extends Configured
DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+
+ this.readaheadLength = conf.getLong(
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ this.dropCacheBehindWrites = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
+ this.syncBehindWrites = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
+ DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
+ this.dropCacheBehindReads = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
+
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
this.initialBlockReportDelay = conf.getLong(
@@ -2903,4 +2923,20 @@ public class DataNode extends Configured
(DataXceiverServer) this.dataXceiverServer.getRunnable();
return dxcs.balanceThrottler.getBandwidth();
}
+
+ long getReadaheadLength() {
+ return readaheadLength;
+ }
+
+ boolean shouldDropCacheBehindWrites() {
+ return dropCacheBehindWrites;
+ }
+
+ boolean shouldDropCacheBehindReads() {
+ return dropCacheBehindReads;
+ }
+
+ boolean shouldSyncBehindWrites() {
+ return syncBehindWrites;
+ }
}