You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/10/29 00:18:43 UTC

svn commit: r1190626 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/

Author: todd
Date: Fri Oct 28 22:18:42 2011
New Revision: 1190626

URL: http://svn.apache.org/viewvc?rev=1190626&view=rev
Log:
HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. Contributed by Todd Lipcon.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1190626&r1=1190625&r2=1190626&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Oct 28 22:18:42 2011
@@ -848,6 +848,8 @@ Release 0.23.0 - Unreleased
     HDFS-2500. Avoid file system operations in BPOfferService thread while
     processing deletes. (todd)
 
+    HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. (todd)
+
   BUG FIXES
 
     HDFS-2347. Fix checkpointTxnCount's comment about editlog size. 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1190626&r1=1190625&r2=1190626&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Oct 28 22:18:42 2011
@@ -54,6 +54,15 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
   public static final String  DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
   public static final long    DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
+  public static final String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
+  public static final long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 0;
+  public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";
+  public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
+  public static final String  DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
+  public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
+  public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
+  public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
+
   public static final String  DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
   public static final String  DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50070";
   public static final String  DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1190626&r1=1190625&r2=1190626&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Oct 28 22:18:42 2011
@@ -24,6 +24,7 @@ import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
+import java.io.FileDescriptor;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.PureJavaCrc32;
@@ -57,10 +59,13 @@ import org.apache.hadoop.util.PureJavaCr
 class BlockReceiver implements Closeable {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
+
+  private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
   
   private DataInputStream in = null; // from where data are read
   private DataChecksum checksum; // from where chunks of a block can be read
   private OutputStream out = null; // to block file at local disk
+  private FileDescriptor outFd;
   private OutputStream cout = null; // output stream for cehcksum file
   private DataOutputStream checksumOut = null; // to crc file at local disk
   private int bytesPerChecksum;
@@ -80,6 +85,11 @@ class BlockReceiver implements Closeable
   private final DataNode datanode;
   volatile private boolean mirrorError;
 
+  // Cache management state
+  private boolean dropCacheBehindWrites;
+  private boolean syncBehindWrites;
+  private long lastCacheDropOffset = 0;
+
   /** The client name.  It is empty if a datanode is the client */
   private final String clientname;
   private final boolean isClient; 
@@ -170,6 +180,8 @@ class BlockReceiver implements Closeable
       this.checksum = DataChecksum.newDataChecksum(in);
       this.bytesPerChecksum = checksum.getBytesPerChecksum();
       this.checksumSize = checksum.getChecksumSize();
+      this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
+      this.syncBehindWrites = datanode.shouldSyncBehindWrites();
       
       final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
@@ -177,6 +189,12 @@ class BlockReceiver implements Closeable
           this.bytesPerChecksum, this.checksumSize);
       if (streams != null) {
         this.out = streams.dataOut;
+        if (out instanceof FileOutputStream) {
+          this.outFd = ((FileOutputStream)out).getFD();
+        } else {
+          LOG.warn("Could not get file descriptor for outputstream of class " +
+              out.getClass());
+        }
         this.cout = streams.checksumOut;
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
             streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
@@ -631,6 +649,8 @@ class BlockReceiver implements Closeable
           );
           
           datanode.metrics.incrBytesWritten(len);
+
+          dropOsCacheBehindWriter(offsetInBlock);
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
@@ -645,6 +665,28 @@ class BlockReceiver implements Closeable
     return lastPacketInBlock?-1:len;
   }
 
+  private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException {
+    try {
+      if (outFd != null &&
+          offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
+        long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
+        if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
+          NativeIO.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
+              NativeIO.POSIX_FADV_DONTNEED);
+        }
+        
+        if (syncBehindWrites) {
+          NativeIO.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
+              NativeIO.SYNC_FILE_RANGE_WRITE);
+        }
+        
+        lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
+      }
+    } catch (Throwable t) {
+      LOG.warn("Couldn't drop os cache behind writer for " + block, t);
+    }
+  }
+
   void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
     checksum.writeHeader(mirrorOut);
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1190626&r1=1190625&r2=1190626&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Oct 28 22:18:42 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.FileDescriptor;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -36,6 +37,9 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -118,7 +122,9 @@ class BlockSender implements java.io.Clo
   private DataInputStream checksumIn;
   /** Checksum utility */
   private final DataChecksum checksum;
-  /** Starting position to read */
+  /** Initial position to read */
+  private long initialOffset;
+  /** Current position of read */
   private long offset;
   /** Position of last byte to read from block file */
   private final long endOffset;
@@ -142,6 +148,24 @@ class BlockSender implements java.io.Clo
   private final String clientTraceFmt;
   private volatile ChunkChecksum lastChunkChecksum = null;
   
+  /** The file descriptor of the block being sent */
+  private FileDescriptor blockInFd;
+
+  // Cache-management related fields
+  private final long readaheadLength;
+  private boolean shouldDropCacheBehindRead;
+  private ReadaheadRequest curReadahead;
+  private long lastCacheDropOffset;
+  private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+  /**
+   * Minimum length of read below which management of the OS
+   * buffer cache is disabled.
+   */
+  private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
+  
+  private static ReadaheadPool readaheadPool =
+    ReadaheadPool.getInstance();
+
   /**
    * Constructor
    * 
@@ -165,6 +189,8 @@ class BlockSender implements java.io.Clo
       this.corruptChecksumOk = corruptChecksumOk;
       this.verifyChecksum = verifyChecksum;
       this.clientTraceFmt = clientTraceFmt;
+      this.readaheadLength = datanode.getReadaheadLength();
+      this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
       
       synchronized(datanode.data) { 
         this.replica = getReplica(block, datanode);
@@ -277,6 +303,11 @@ class BlockSender implements java.io.Clo
         DataNode.LOG.debug("replica=" + replica);
       }
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+      if (blockIn instanceof FileInputStream) {
+        blockInFd = ((FileInputStream)blockIn).getFD();
+      } else {
+        blockInFd = null;
+      }
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
       IOUtils.closeStream(blockIn);
@@ -288,6 +319,20 @@ class BlockSender implements java.io.Clo
    * close opened files.
    */
   public void close() throws IOException {
+    if (blockInFd != null && shouldDropCacheBehindRead) {
+      // drop the last few MB of the file from cache
+      try {
+        NativeIO.posixFadviseIfPossible(
+            blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
+            NativeIO.POSIX_FADV_DONTNEED);
+      } catch (Exception e) {
+        LOG.warn("Unable to drop cache on file close", e);
+      }
+    }
+    if (curReadahead != null) {
+      curReadahead.cancel();
+    }
+    
     IOException ioe = null;
     if(checksumIn!=null) {
       try {
@@ -304,6 +349,7 @@ class BlockSender implements java.io.Clo
         ioe = e;
       }
       blockIn = null;
+      blockInFd = null;
     }
     // throw IOException if there is any
     if(ioe!= null) {
@@ -538,10 +584,20 @@ class BlockSender implements java.io.Clo
     if (out == null) {
       throw new IOException( "out stream is null" );
     }
-    final long initialOffset = offset;
+    initialOffset = offset;
     long totalRead = 0;
     OutputStream streamForSendChunks = out;
     
+    lastCacheDropOffset = initialOffset;
+
+    if (isLongRead() && blockInFd != null) {
+      // Advise that this file descriptor will be accessed sequentially.
+      NativeIO.posixFadviseIfPossible(blockInFd, 0, 0, NativeIO.POSIX_FADV_SEQUENTIAL);
+    }
+    
+    // Trigger readahead of beginning of file if configured.
+    manageOsCache();
+
     final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
     try {
       writeChecksumHeader(out);
@@ -569,6 +625,7 @@ class BlockSender implements java.io.Clo
       ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
 
       while (endOffset > offset) {
+        manageOsCache();
         long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
             transferTo, throttler);
         offset += len;
@@ -595,6 +652,45 @@ class BlockSender implements java.io.Clo
     }
     return totalRead;
   }
+
+  /**
+   * Manage the OS buffer cache by performing read-ahead
+   * and drop-behind.
+   */
+  private void manageOsCache() throws IOException {
+    if (!isLongRead() || blockInFd == null) {
+      // don't manage cache manually for short-reads, like
+      // HBase random read workloads.
+      return;
+    }
+
+    // Perform readahead if necessary
+    if (readaheadLength > 0 && readaheadPool != null) {
+      curReadahead = readaheadPool.readaheadStream(
+          clientTraceFmt, blockInFd,
+          offset, readaheadLength, Long.MAX_VALUE,
+          curReadahead);
+    }
+
+    // Drop what we've just read from cache, since we aren't
+    // likely to need it again
+    long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
+    if (shouldDropCacheBehindRead &&
+        offset >= nextCacheDropOffset) {
+      long dropLength = offset - lastCacheDropOffset;
+      if (dropLength >= 1024) {
+        NativeIO.posixFadviseIfPossible(blockInFd,
+            lastCacheDropOffset, dropLength,
+            NativeIO.POSIX_FADV_DONTNEED);
+      }
+      lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
+    }
+  }
+
+  private boolean isLongRead() {
+    return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
+  }
+
   
   /**
    * Write checksum header to the output stream

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1190626&r1=1190625&r2=1190626&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Oct 28 22:18:42 2011
@@ -104,6 +104,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -418,6 +419,11 @@ public class DataNode extends Configured
   int socketTimeout;
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
+  private boolean dropCacheBehindWrites = false;
+  private boolean syncBehindWrites = false;
+  private boolean dropCacheBehindReads = false;
+  private long readaheadLength = 0;
+
   int writePacketSize = 0;
   boolean isBlockTokenEnabled;
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
@@ -501,6 +507,20 @@ public class DataNode extends Configured
         DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
     this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
                                        DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+
+    this.readaheadLength = conf.getLong(
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+    this.dropCacheBehindWrites = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
+    this.syncBehindWrites = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
+    this.dropCacheBehindReads = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
+
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
     this.initialBlockReportDelay = conf.getLong(
@@ -2903,4 +2923,20 @@ public class DataNode extends Configured
                        (DataXceiverServer) this.dataXceiverServer.getRunnable();
     return dxcs.balanceThrottler.getBandwidth();
   }
+
+  long getReadaheadLength() {
+    return readaheadLength;
+  }
+
+  boolean shouldDropCacheBehindWrites() {
+    return dropCacheBehindWrites;
+  }
+
+  boolean shouldDropCacheBehindReads() {
+    return dropCacheBehindReads;
+  }
+  
+  boolean shouldSyncBehindWrites() {
+    return syncBehindWrites;
+  }
 }