You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/09/27 17:47:13 UTC

svn commit: r1526961 - in /hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src: main/java/org/apache/hadoop/hdfs/ main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ main/java/org/apache/hadoop/hdfs/server/common/ main/java/o...

Author: cmccabe
Date: Fri Sep 27 15:47:11 2013
New Revision: 1526961

URL: http://svn.apache.org/r1526961
Log:
HDFS-4817. Make HDFS advisory caching configurable on a per-file basis.  (Contributed by Colin Patrick McCabe)

Added:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java
Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Fri Sep 27 15:47:11 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.unix.DomainSocket;
@@ -85,7 +86,8 @@ public class BlockReaderFactory {
                                      DomainSocketFactory domSockFactory,
                                      PeerCache peerCache,
                                      FileInputStreamCache fisCache,
-                                     boolean allowShortCircuitLocalReads)
+                                     boolean allowShortCircuitLocalReads,
+                                     CachingStrategy cachingStrategy)
   throws IOException {
     peer.setReadTimeout(conf.socketTimeout);
     peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
@@ -122,12 +124,14 @@ public class BlockReaderFactory {
       @SuppressWarnings("deprecation")
       RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
           block, blockToken, startOffset, len, conf.ioBufferSize,
-          verifyChecksum, clientName, peer, datanodeID, peerCache);
+          verifyChecksum, clientName, peer, datanodeID, peerCache,
+          cachingStrategy);
       return reader;
     } else {
       return RemoteBlockReader2.newBlockReader(
           file, block, blockToken, startOffset, len,
-          verifyChecksum, clientName, peer, datanodeID, peerCache);
+          verifyChecksum, clientName, peer, datanodeID, peerCache,
+          cachingStrategy);
     }
   }
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 27 15:47:11 2013
@@ -44,6 +44,9 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
@@ -136,6 +139,7 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -202,6 +206,8 @@ public class DFSClient implements java.i
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
   private boolean shouldUseLegacyBlockReaderLocal;
+  private final CachingStrategy defaultReadCachingStrategy;
+  private final CachingStrategy defaultWriteCachingStrategy;
   
   /**
    * DFSClient configuration 
@@ -520,6 +526,16 @@ public class DFSClient implements java.i
     }
     
     this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
+    Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
+        null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
+    Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
+        null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
+    Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
+        null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
+    this.defaultReadCachingStrategy =
+        new CachingStrategy(readDropBehind, readahead);
+    this.defaultWriteCachingStrategy =
+        new CachingStrategy(writeDropBehind, readahead);
   }
   
   /**
@@ -1990,7 +2006,8 @@ public class DFSClient implements java.i
           HdfsConstants.SMALL_BUFFER_SIZE));
       DataInputStream in = new DataInputStream(pair.in);
   
-      new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true);
+      new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
+          0, 1, true, CachingStrategy.newDefaultStrategy());
       final BlockOpResponseProto reply =
           BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
       
@@ -2489,4 +2506,12 @@ public class DFSClient implements java.i
   public boolean useLegacyBlockReaderLocal() {
     return shouldUseLegacyBlockReaderLocal;
   }
+
+  public CachingStrategy getDefaultReadCachingStrategy() {
+    return defaultReadCachingStrategy;
+  }
+
+  public CachingStrategy getDefaultWriteCachingStrategy() {
+    return defaultWriteCachingStrategy;
+  }
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Sep 27 15:47:11 2013
@@ -54,6 +54,9 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
   public static final String  DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
   public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
+  public static final String  DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = "dfs.client.cache.drop.behind.writes";
+  public static final String  DFS_CLIENT_CACHE_DROP_BEHIND_READS = "dfs.client.cache.drop.behind.reads";
+  public static final String  DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead";
   public static final String  DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
   public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
   public static final String  DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Sep 27 15:47:11 2013
@@ -36,6 +36,8 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -65,7 +68,8 @@ import com.google.common.annotations.Vis
  * negotiation of the namenode and various datanodes as necessary.
  ****************************************************************/
 @InterfaceAudience.Private
-public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
+public class DFSInputStream extends FSInputStream
+implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
   @VisibleForTesting
   static boolean tcpReadsDisabledForTesting = false;
   private final PeerCache peerCache;
@@ -80,6 +84,7 @@ public class DFSInputStream extends FSIn
   private LocatedBlock currentLocatedBlock = null;
   private long pos = 0;
   private long blockEnd = -1;
+  private CachingStrategy cachingStrategy;
   private final ReadStatistics readStatistics = new ReadStatistics();
 
   public static class ReadStatistics {
@@ -185,6 +190,8 @@ public class DFSInputStream extends FSIn
     this.fileInputStreamCache = new FileInputStreamCache(
         dfsClient.getConf().shortCircuitStreamsCacheSize,
         dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
+    this.cachingStrategy =
+        dfsClient.getDefaultReadCachingStrategy().duplicate();
     openInfo();
   }
 
@@ -1035,7 +1042,7 @@ public class DFSInputStream extends FSIn
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
             dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads);
+            allowShortCircuitLocalReads, cachingStrategy);
         return reader;
       } catch (IOException ex) {
         DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@@ -1058,7 +1065,7 @@ public class DFSInputStream extends FSIn
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode,
             dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads);
+            allowShortCircuitLocalReads, cachingStrategy);
         return reader;
       } catch (IOException e) {
         DFSClient.LOG.warn("failed to connect to " + domSock, e);
@@ -1081,7 +1088,8 @@ public class DFSInputStream extends FSIn
         reader = BlockReaderFactory.newBlockReader(
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
-            dsFactory, peerCache, fileInputStreamCache, false);
+            dsFactory, peerCache, fileInputStreamCache, false,
+            cachingStrategy);
         return reader;
       } catch (IOException ex) {
         DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@@ -1100,7 +1108,8 @@ public class DFSInputStream extends FSIn
     return BlockReaderFactory.newBlockReader(
         dfsClient.getConf(), file, block, blockToken, startOffset,
         len, verifyChecksum, clientName, peer, chosenNode, 
-        dsFactory, peerCache, fileInputStreamCache, false);
+        dsFactory, peerCache, fileInputStreamCache, false,
+        cachingStrategy);
   }
 
 
@@ -1358,4 +1367,30 @@ public class DFSInputStream extends FSIn
   public synchronized ReadStatistics getReadStatistics() {
     return new ReadStatistics(readStatistics);
   }
+
+  private synchronized void closeCurrentBlockReader() {
+    if (blockReader == null) return;
+    // Close the current block reader so that the new caching settings can 
+    // take effect immediately.
+    try {
+      blockReader.close();
+    } catch (IOException e) {
+      DFSClient.LOG.error("error closing blockReader", e);
+    }
+    blockReader = null;
+  }
+
+  @Override
+  public synchronized void setReadahead(Long readahead)
+      throws IOException {
+    this.cachingStrategy.setReadahead(readahead);
+    closeCurrentBlockReader();
+  }
+
+  @Override
+  public synchronized void setDropBehind(Boolean dropBehind)
+      throws IOException {
+    this.cachingStrategy.setDropBehind(dropBehind);
+    closeCurrentBlockReader();
+  }
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Sep 27 15:47:11 2013
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -71,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.EnumSetWritable;
@@ -83,6 +85,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
+import org.mortbay.log.Log;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
@@ -115,7 +118,8 @@ import com.google.common.cache.RemovalNo
  * starts sending packets from the dataQueue.
 ****************************************************************/
 @InterfaceAudience.Private
-public class DFSOutputStream extends FSOutputSummer implements Syncable {
+public class DFSOutputStream extends FSOutputSummer
+    implements Syncable, CanSetDropBehind {
   private final DFSClient dfsClient;
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
   private Socket s;
@@ -147,6 +151,7 @@ public class DFSOutputStream extends FSO
   private Progressable progress;
   private final short blockReplication; // replication factor of file
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
+  private CachingStrategy cachingStrategy;
   
   private class Packet {
     long    seqno;               // sequencenumber of buffer in block
@@ -1143,7 +1148,8 @@ public class DFSOutputStream extends FSO
           // send the request
           new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
               nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
-              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
+              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
+              cachingStrategy);
   
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1340,6 +1346,8 @@ public class DFSOutputStream extends FSO
     this.blockSize = stat.getBlockSize();
     this.blockReplication = stat.getReplication();
     this.progress = progress;
+    this.cachingStrategy =
+        dfsClient.getDefaultWriteCachingStrategy().duplicate();
     if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug(
           "Set non-null progress callback on DFSOutputStream " + src);
@@ -1937,4 +1945,8 @@ public class DFSOutputStream extends FSO
     return streamer.getBlockToken();
   }
 
+  @Override
+  public void setDropBehind(Boolean dropBehind) throws IOException {
+    this.cachingStrategy.setDropBehind(dropBehind);
+  }
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Fri Sep 27 15:47:11 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -381,13 +382,14 @@ public class RemoteBlockReader extends F
                                      int bufferSize, boolean verifyChecksum,
                                      String clientName, Peer peer,
                                      DatanodeID datanodeID,
-                                     PeerCache peerCache)
-                                     throws IOException {
+                                     PeerCache peerCache,
+                                     CachingStrategy cachingStrategy)
+                                       throws IOException {
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out =
         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum);
+        verifyChecksum, cachingStrategy);
     
     //
     // Get bytes in block, set streams

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Fri Sep 27 15:47:11 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -375,12 +376,13 @@ public class RemoteBlockReader2  impleme
                                      boolean verifyChecksum,
                                      String clientName,
                                      Peer peer, DatanodeID datanodeID,
-                                     PeerCache peerCache) throws IOException {
+                                     PeerCache peerCache,
+                                     CachingStrategy cachingStrategy) throws IOException {
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
           peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum);
+        verifyChecksum, cachingStrategy);
 
     //
     // Get bytes in block

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Fri Sep 27 15:47:11 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -57,13 +58,15 @@ public interface DataTransferProtocol {
    * @param length maximum number of bytes for this read.
    * @param sendChecksum if false, the DN should skip reading and sending
    *        checksums
+   * @param cachingStrategy  The caching strategy to use.
    */
   public void readBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final long blockOffset,
       final long length,
-      final boolean sendChecksum) throws IOException;
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException;
 
   /**
    * Write a block to a datanode pipeline.
@@ -89,7 +92,8 @@ public interface DataTransferProtocol {
       final long minBytesRcvd,
       final long maxBytesRcvd,
       final long latestGenerationStamp,
-      final DataChecksum requestedChecksum) throws IOException;
+      final DataChecksum requestedChecksum,
+      final CachingStrategy cachingStrategy) throws IOException;
 
   /**
    * Transfer a block to another datanode.

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Fri Sep 27 15:47:11 2013
@@ -31,8 +31,10 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 
 /** Receiver */
 @InterfaceAudience.Private
@@ -85,6 +87,14 @@ public abstract class Receiver implement
     }
   }
 
+  static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy) {
+    Boolean dropBehind = strategy.hasDropBehind() ?
+        strategy.getDropBehind() : null;
+    Long readahead = strategy.hasReadahead() ?
+        strategy.getReadahead() : null;
+    return new CachingStrategy(dropBehind, readahead);
+  }
+
   /** Receive OP_READ_BLOCK */
   private void opReadBlock() throws IOException {
     OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
@@ -93,7 +103,10 @@ public abstract class Receiver implement
         proto.getHeader().getClientName(),
         proto.getOffset(),
         proto.getLen(),
-        proto.getSendChecksums());
+        proto.getSendChecksums(),
+        (proto.hasCachingStrategy() ?
+            getCachingStrategy(proto.getCachingStrategy()) :
+          CachingStrategy.newDefaultStrategy()));
   }
   
   /** Receive OP_WRITE_BLOCK */
@@ -108,7 +121,10 @@ public abstract class Receiver implement
         proto.getPipelineSize(),
         proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
         proto.getLatestGenerationStamp(),
-        fromProto(proto.getRequestedChecksum()));
+        fromProto(proto.getRequestedChecksum()),
+        (proto.hasCachingStrategy() ?
+            getCachingStrategy(proto.getCachingStrategy()) :
+          CachingStrategy.newDefaultStrategy()));
   }
 
   /** Receive {@link Op#TRANSFER_BLOCK} */

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Fri Sep 27 15:47:11 2013
@@ -35,9 +35,11 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -72,19 +74,32 @@ public class Sender implements DataTrans
     out.flush();
   }
 
+  static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
+    CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
+    if (cachingStrategy.getReadahead() != null) {
+      builder.setReadahead(cachingStrategy.getReadahead().longValue());
+    }
+    if (cachingStrategy.getDropBehind() != null) {
+      builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
+    }
+    return builder.build();
+  }
+
   @Override
   public void readBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final long blockOffset,
       final long length,
-      final boolean sendChecksum) throws IOException {
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
 
     OpReadBlockProto proto = OpReadBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
       .setOffset(blockOffset)
       .setLen(length)
       .setSendChecksums(sendChecksum)
+      .setCachingStrategy(getCachingStrategy(cachingStrategy))
       .build();
 
     send(out, Op.READ_BLOCK, proto);
@@ -102,7 +117,8 @@ public class Sender implements DataTrans
       final long minBytesRcvd,
       final long maxBytesRcvd,
       final long latestGenerationStamp,
-      DataChecksum requestedChecksum) throws IOException {
+      DataChecksum requestedChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
     
@@ -117,7 +133,8 @@ public class Sender implements DataTrans
       .setMinBytesRcvd(minBytesRcvd)
       .setMaxBytesRcvd(maxBytesRcvd)
       .setLatestGenerationStamp(latestGenerationStamp)
-      .setRequestedChecksum(checksumProto);
+      .setRequestedChecksum(checksumProto)
+      .setCachingStrategy(getCachingStrategy(cachingStrategy));
     
     if (source != null) {
       proto.setSource(PBHelper.convertDatanodeInfo(source));

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Sep 27 15:47:11 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
@@ -217,7 +218,7 @@ public class JspHelper {
         "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
         new DatanodeID(addr.getAddress().getHostAddress(),
             addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
-            null, null, false);
+            null, null, false, CachingStrategy.newDefaultStrategy());
         
     final byte[] buf = new byte[amtToRead];
     int readOffset = 0;

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Fri Sep 27 15:47:11 2013
@@ -417,7 +417,7 @@ class BlockPoolSliceScanner {
         adjustThrottler();
         
         blockSender = new BlockSender(block, 0, -1, false, true, true, 
-            datanode, null);
+            datanode, null, CachingStrategy.newDropBehind());
 
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Sep 27 15:47:11 2013
@@ -53,6 +53,8 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /** A class that receives a block and writes to its own disk, meanwhile
  * may copies it to another site. If a throttler is provided,
  * streaming throttling is also supported.
@@ -61,7 +63,8 @@ class BlockReceiver implements Closeable
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
 
-  private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
+  @VisibleForTesting
+  static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
   
   private DataInputStream in = null; // from where data are read
   private DataChecksum clientChecksum; // checksum used by client
@@ -97,8 +100,8 @@ class BlockReceiver implements Closeable
 
   // Cache management state
   private boolean dropCacheBehindWrites;
+  private long lastCacheManagementOffset = 0;
   private boolean syncBehindWrites;
-  private long lastCacheDropOffset = 0;
 
   /** The client name.  It is empty if a datanode is the client */
   private final String clientname;
@@ -120,8 +123,8 @@ class BlockReceiver implements Closeable
       final BlockConstructionStage stage, 
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
       final String clientname, final DatanodeInfo srcDataNode,
-      final DataNode datanode, DataChecksum requestedChecksum)
-      throws IOException {
+      final DataNode datanode, DataChecksum requestedChecksum,
+      CachingStrategy cachingStrategy) throws IOException {
     try{
       this.block = block;
       this.in = in;
@@ -146,6 +149,7 @@ class BlockReceiver implements Closeable
             + "\n  isClient  =" + isClient + ", clientname=" + clientname
             + "\n  isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
             + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr
+            + "\n  cachingStrategy = " + cachingStrategy
             );
       }
 
@@ -192,7 +196,9 @@ class BlockReceiver implements Closeable
               " while receiving block " + block + " from " + inAddr);
         }
       }
-      this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites;
+      this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
+        datanode.getDnConf().dropCacheBehindWrites :
+          cachingStrategy.getDropBehind();
       this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
       
       final boolean isCreate = isDatanode || isTransfer 
@@ -598,7 +604,7 @@ class BlockReceiver implements Closeable
 
           datanode.metrics.incrBytesWritten(len);
 
-          dropOsCacheBehindWriter(offsetInBlock);
+          manageWriterOsCache(offsetInBlock);
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
@@ -620,25 +626,44 @@ class BlockReceiver implements Closeable
     return lastPacketInBlock?-1:len;
   }
 
-  private void dropOsCacheBehindWriter(long offsetInBlock) {
+  private void manageWriterOsCache(long offsetInBlock) {
     try {
       if (outFd != null &&
-          offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
-        long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
-        if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
-          NativeIO.POSIX.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
-              NativeIO.POSIX.POSIX_FADV_DONTNEED);
-        }
-        
+          offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
+        //
+        // For SYNC_FILE_RANGE_WRITE, we want to sync from
+        // lastCacheManagementOffset to a position "two windows ago"
+        //
+        //                         <========= sync ===========>
+        // +-----------------------O--------------------------X
+        // start                  last                      curPos
+        // of file                 
+        //
         if (syncBehindWrites) {
-          NativeIO.POSIX.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
+          NativeIO.POSIX.syncFileRangeIfPossible(outFd,
+              lastCacheManagementOffset,
+              offsetInBlock - lastCacheManagementOffset,
               NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
         }
-        
-        lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
+        //
+        // For POSIX_FADV_DONTNEED, we want to drop from the beginning 
+        // of the file to a position prior to the current position.
+        //
+        // <=== drop =====> 
+        //                 <---W--->
+        // +--------------+--------O--------------------------X
+        // start        dropPos   last                      curPos
+        // of file             
+        //                     
+        long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
+        if (dropPos > 0 && dropCacheBehindWrites) {
+          NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
+              outFd, 0, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
+        }
+        lastCacheManagementOffset = offsetInBlock;
       }
     } catch (Throwable t) {
-      LOG.warn("Couldn't drop os cache behind writer for " + block, t);
+      LOG.warn("Error managing cache for writer of block " + block, t);
     }
   }
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Sep 27 15:47:11 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.io.nativeio.Nat
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -141,13 +142,22 @@ class BlockSender implements java.io.Clo
 
   // Cache-management related fields
   private final long readaheadLength;
-  private boolean shouldDropCacheBehindRead;
+
   private ReadaheadRequest curReadahead;
+
+  private final boolean alwaysReadahead;
+  
+  private final boolean dropCacheBehindLargeReads;
+  
+  private final boolean dropCacheBehindAllReads;
+  
   private long lastCacheDropOffset;
-  private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+  
+  @VisibleForTesting
+  static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+  
   /**
-   * Minimum length of read below which management of the OS
-   * buffer cache is disabled.
+   * See {{@link BlockSender#isLongRead()}
    */
   private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
   
@@ -167,16 +177,42 @@ class BlockSender implements java.io.Clo
    */
   BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean verifyChecksum,
-              boolean sendChecksum,
-              DataNode datanode, String clientTraceFmt)
+              boolean sendChecksum, DataNode datanode, String clientTraceFmt,
+              CachingStrategy cachingStrategy)
       throws IOException {
     try {
       this.block = block;
       this.corruptChecksumOk = corruptChecksumOk;
       this.verifyChecksum = verifyChecksum;
       this.clientTraceFmt = clientTraceFmt;
-      this.readaheadLength = datanode.getDnConf().readaheadLength;
-      this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
+
+      /*
+       * If the client asked for the cache to be dropped behind all reads,
+       * we honor that.  Otherwise, we use the DataNode defaults.
+       * When using DataNode defaults, we use a heuristic where we only
+       * drop the cache for large reads.
+       */
+      if (cachingStrategy.getDropBehind() == null) {
+        this.dropCacheBehindAllReads = false;
+        this.dropCacheBehindLargeReads =
+            datanode.getDnConf().dropCacheBehindReads;
+      } else {
+        this.dropCacheBehindAllReads =
+            this.dropCacheBehindLargeReads =
+                 cachingStrategy.getDropBehind().booleanValue();
+      }
+      /*
+       * Similarly, if readahead was explicitly requested, we always do it.
+       * Otherwise, we read ahead based on the DataNode settings, and only
+       * when the reads are large.
+       */
+      if (cachingStrategy.getReadahead() == null) {
+        this.alwaysReadahead = false;
+        this.readaheadLength = datanode.getDnConf().readaheadLength;
+      } else {
+        this.alwaysReadahead = true;
+        this.readaheadLength = cachingStrategy.getReadahead().longValue();
+      }
       this.datanode = datanode;
       
       if (verifyChecksum) {
@@ -335,10 +371,11 @@ class BlockSender implements java.io.Clo
    */
   @Override
   public void close() throws IOException {
-    if (blockInFd != null && shouldDropCacheBehindRead && isLongRead()) {
-      // drop the last few MB of the file from cache
+    if (blockInFd != null &&
+        ((dropCacheBehindAllReads) ||
+         (dropCacheBehindLargeReads && isLongRead()))) {
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(
+        NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
             blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Exception e) {
@@ -637,7 +674,7 @@ class BlockSender implements java.io.Clo
 
     if (isLongRead() && blockInFd != null) {
       // Advise that this file descriptor will be accessed sequentially.
-      NativeIO.POSIX.posixFadviseIfPossible(
+      NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
           blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
     }
     
@@ -705,37 +742,47 @@ class BlockSender implements java.io.Clo
    * and drop-behind.
    */
   private void manageOsCache() throws IOException {
-    if (!isLongRead() || blockInFd == null) {
-      // don't manage cache manually for short-reads, like
-      // HBase random read workloads.
-      return;
-    }
+    // We can't manage the cache for this block if we don't have a file
+    // descriptor to work with.
+    if (blockInFd == null) return;
 
     // Perform readahead if necessary
-    if (readaheadLength > 0 && datanode.readaheadPool != null) {
+    if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
+          (alwaysReadahead || isLongRead())) {
       curReadahead = datanode.readaheadPool.readaheadStream(
-          clientTraceFmt, blockInFd,
-          offset, readaheadLength, Long.MAX_VALUE,
+          clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
           curReadahead);
     }
 
     // Drop what we've just read from cache, since we aren't
     // likely to need it again
-    long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
-    if (shouldDropCacheBehindRead &&
-        offset >= nextCacheDropOffset) {
-      long dropLength = offset - lastCacheDropOffset;
-      if (dropLength >= 1024) {
-        NativeIO.POSIX.posixFadviseIfPossible(blockInFd,
-            lastCacheDropOffset, dropLength,
+    if (dropCacheBehindAllReads ||
+        (dropCacheBehindLargeReads && isLongRead())) {
+      long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
+      if (offset >= nextCacheDropOffset) {
+        long dropLength = offset - lastCacheDropOffset;
+        NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
+            blockInFd, lastCacheDropOffset, dropLength,
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
+        lastCacheDropOffset = offset;
       }
-      lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
     }
   }
 
+  /**
+   * Returns true if we have done a long enough read for this block to qualify
+   * for the DataNode-wide cache management defaults.  We avoid applying the
+   * cache management defaults to smaller reads because the overhead would be
+   * too high.
+   *
+   * Note that if the client explicitly asked for dropBehind, we will do it
+   * even on short reads.
+   * 
+   * This is also used to determine when to invoke
+   * posix_fadvise(POSIX_FADV_SEQUENTIAL).
+   */
   private boolean isLongRead() {
-    return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
+    return (endOffset - initialOffset) > LONG_READ_THRESHOLD_BYTES;
   }
 
   /**

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Sep 27 15:47:11 2013
@@ -1517,6 +1517,7 @@ public class DataNode extends Configured
     final BlockConstructionStage stage;
     final private DatanodeRegistration bpReg;
     final String clientname;
+    final CachingStrategy cachingStrategy;
 
     /**
      * Connect to the first item in the target list.  Pass along the 
@@ -1537,6 +1538,8 @@ public class DataNode extends Configured
       BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
       bpReg = bpos.bpRegistration;
       this.clientname = clientname;
+      this.cachingStrategy =
+          new CachingStrategy(true, getDnConf().readaheadLength);
     }
 
     /**
@@ -1579,7 +1582,7 @@ public class DataNode extends Configured
             HdfsConstants.SMALL_BUFFER_SIZE));
         in = new DataInputStream(unbufIn);
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, true, DataNode.this, null);
+            false, false, true, DataNode.this, null, cachingStrategy);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
         //
@@ -1592,7 +1595,7 @@ public class DataNode extends Configured
         }
 
         new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
-            stage, 0, 0, 0, 0, blockSender.getChecksum());
+            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, null);

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Sep 27 15:47:11 2013
@@ -299,7 +299,8 @@ class DataXceiver extends Receiver imple
       final String clientName,
       final long blockOffset,
       final long length,
-      final boolean sendChecksum) throws IOException {
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
     previousOpClientName = clientName;
 
     OutputStream baseStream = getOutputStream();
@@ -324,7 +325,8 @@ class DataXceiver extends Receiver imple
     try {
       try {
         blockSender = new BlockSender(block, blockOffset, length,
-            true, false, sendChecksum, datanode, clientTraceFmt);
+            true, false, sendChecksum, datanode, clientTraceFmt,
+            cachingStrategy);
       } catch(IOException e) {
         String msg = "opReadBlock " + block + " received exception " + e; 
         LOG.info(msg);
@@ -393,7 +395,8 @@ class DataXceiver extends Receiver imple
       final long minBytesRcvd,
       final long maxBytesRcvd,
       final long latestGenerationStamp,
-      DataChecksum requestedChecksum) throws IOException {
+      DataChecksum requestedChecksum,
+      CachingStrategy cachingStrategy) throws IOException {
     previousOpClientName = clientname;
     updateCurrentThreadName("Receiving block " + block);
     final boolean isDatanode = clientname.length() == 0;
@@ -452,7 +455,8 @@ class DataXceiver extends Receiver imple
             peer.getRemoteAddressString(),
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
-            clientname, srcDataNode, datanode, requestedChecksum);
+            clientname, srcDataNode, datanode, requestedChecksum,
+            cachingStrategy);
       } else {
         datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
       }
@@ -497,7 +501,8 @@ class DataXceiver extends Receiver imple
 
           new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
               clientname, targets, srcDataNode, stage, pipelineSize,
-              minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
+              minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
+              cachingStrategy);
 
           mirrorOut.flush();
 
@@ -715,7 +720,7 @@ class DataXceiver extends Receiver imple
     try {
       // check if the block exists or not
       blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, 
-          null);
+          null, CachingStrategy.newDropBehind());
 
       // set up response stream
       OutputStream baseStream = getOutputStream();
@@ -846,7 +851,8 @@ class DataXceiver extends Receiver imple
       blockReceiver = new BlockReceiver(
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
-          null, 0, 0, 0, "", null, datanode, remoteChecksum);
+          null, 0, 0, 0, "", null, datanode, remoteChecksum,
+          CachingStrategy.newDropBehind());
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Sep 27 15:47:11 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
@@ -574,8 +575,8 @@ public class NamenodeFsck {
         blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
             file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
             TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
-                getDataEncryptionKey()),
-            chosenNode, null, null, null, false);
+                getDataEncryptionKey()), chosenNode, null, null, null, 
+                false, CachingStrategy.newDropBehind());
         
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Fri Sep 27 15:47:11 2013
@@ -54,11 +54,17 @@ message ClientOperationHeaderProto {
   required string clientName = 2;
 }
 
+message CachingStrategyProto {
+  optional bool dropBehind = 1;
+  optional int64 readahead = 2;
+}
+
 message OpReadBlockProto {
   required ClientOperationHeaderProto header = 1;
   required uint64 offset = 2;
   required uint64 len = 3;
   optional bool sendChecksums = 4 [default = true];
+  optional CachingStrategyProto cachingStrategy = 5;
 }
 
 
@@ -100,6 +106,7 @@ message OpWriteBlockProto {
    * The requested checksum mechanism for this block write.
    */
   required ChecksumProto requestedChecksum = 9;
+  optional CachingStrategyProto cachingStrategy = 10;
 }
   
 message OpTransferBlockProto {

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Sep 27 15:47:11 2013
@@ -1332,6 +1332,51 @@
 </property>
 
 <property>
+  <name>dfs.client.cache.drop.behind.writes</name>
+  <value></value>
+  <description>
+    Just like dfs.datanode.drop.cache.behind.writes, this setting causes the
+    page cache to be dropped behind HDFS writes, potentially freeing up more
+    memory for other uses.  Unlike dfs.datanode.drop.cache.behind.writes, this
+    is a client-side setting rather than a setting for the entire datanode.
+    If present, this setting will override the DataNode default.
+
+    If the native libraries are not available to the DataNode, this
+    configuration has no effect.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.cache.drop.behind.reads</name>
+  <value></value>
+  <description>
+    Just like dfs.datanode.drop.cache.behind.reads, this setting causes the
+    page cache to be dropped behind HDFS reads, potentially freeing up more
+    memory for other uses.  Unlike dfs.datanode.drop.cache.behind.reads, this
+    is a client-side setting rather than a setting for the entire datanode.  If
+    present, this setting will override the DataNode default.
+
+    If the native libraries are not available to the DataNode, this
+    configuration has no effect.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.cache.readahead</name>
+  <value></value>
+  <description>
+    Just like dfs.datanode.readahead.bytes, this setting causes the datanode to
+    read ahead in the block file using posix_fadvise, potentially decreasing
+    I/O wait times.  Unlike dfs.datanode.readahead.bytes, this is a client-side
+    setting rather than a setting for the entire datanode.  If present, this
+    setting will override the DataNode default.
+
+    If the native libraries are not available to the DataNode, this
+    configuration has no effect.
+  </description>
+</property>
+
+<property>
 	<name>dfs.namenode.enable.retrycache</name>
 	<value>true</value>
 	<description>

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Fri Sep 27 15:47:11 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.net.NetUtils;
 
@@ -155,7 +156,7 @@ public class BlockReaderTestUtil {
       testBlock.getBlockToken(), 
       offset, lenToRead,
       true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
-      nodes[0], null, null, null, false);
+      nodes[0], null, null, null, false, CachingStrategy.newDefaultStrategy());
   }
 
   /**

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Fri Sep 27 15:47:11 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
@@ -194,7 +195,7 @@ public class TestDataTransferProtocol {
     sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], null, stage,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
-        DEFAULT_CHECKSUM);
+        DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
     if (eofExcepted) {
       sendResponse(Status.ERROR, null, null, recvOut);
       sendRecvData(description, true);
@@ -391,7 +392,7 @@ public class TestDataTransferProtocol {
         new DatanodeInfo[1], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE,
         0, 0L, 0L, 0L,
-        badChecksum);
+        badChecksum, CachingStrategy.newDefaultStrategy());
     recvBuf.reset();
     sendResponse(Status.ERROR, null, null, recvOut);
     sendRecvData("wrong bytesPerChecksum while writing", true);
@@ -402,7 +403,7 @@ public class TestDataTransferProtocol {
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
-        DEFAULT_CHECKSUM);
+        DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
 
     PacketHeader hdr = new PacketHeader(
       4,     // size of packet
@@ -425,7 +426,7 @@ public class TestDataTransferProtocol {
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
-        DEFAULT_CHECKSUM);
+        DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
 
     hdr = new PacketHeader(
       8,     // size of packet
@@ -452,21 +453,21 @@ public class TestDataTransferProtocol {
     recvBuf.reset();
     blk.setBlockId(blkid-1);
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        0L, fileLen, true);
+        0L, fileLen, true, CachingStrategy.newDefaultStrategy());
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
 
     // negative block start offset -1L
     sendBuf.reset();
     blk.setBlockId(blkid);
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        -1L, fileLen, true);
+        -1L, fileLen, true, CachingStrategy.newDefaultStrategy());
     sendRecvData("Negative start-offset for read for block " + 
                  firstBlock.getBlockId(), false);
 
     // bad block start offset
     sendBuf.reset();
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        fileLen, fileLen, true);
+        fileLen, fileLen, true, CachingStrategy.newDefaultStrategy());
     sendRecvData("Wrong start-offset for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -483,7 +484,8 @@ public class TestDataTransferProtocol {
     
     sendBuf.reset();
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        0L, -1L-random.nextInt(oneMil), true);
+        0L, -1L-random.nextInt(oneMil), true,
+        CachingStrategy.newDefaultStrategy());
     sendRecvData("Negative length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -496,14 +498,14 @@ public class TestDataTransferProtocol {
         recvOut);
     sendBuf.reset();
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        0L, fileLen+1, true);
+        0L, fileLen+1, true, CachingStrategy.newDefaultStrategy());
     sendRecvData("Wrong length for reading block " +
                  firstBlock.getBlockId(), false);
     
     //At the end of all this, read the file to make sure that succeeds finally.
     sendBuf.reset();
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        0L, fileLen, true);
+        0L, fileLen, true, CachingStrategy.newDefaultStrategy());
     readFile(fileSys, file, fileLen);
     } finally {
       cluster.shutdown();

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Fri Sep 27 15:47:11 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.net.NetUtils;
@@ -148,7 +149,8 @@ public class TestBlockTokenWithDFS {
       blockReader = BlockReaderFactory.newBlockReader(
           new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
           true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
-          nodes[0], null, null, null, false);
+          nodes[0], null, null, null, false,
+          CachingStrategy.newDefaultStrategy());
 
     } catch (IOException ex) {
       if (ex instanceof InvalidBlockTokenException) {

Added: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java?rev=1526961&view=auto
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java (added)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java Fri Sep 27 15:47:11 2013
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCachingStrategy {
+  private static final Log LOG = LogFactory.getLog(TestCachingStrategy.class);
+  private static int MAX_TEST_FILE_LEN = 1024 * 1024;
+  private static int WRITE_PACKET_SIZE = DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+  
+  private final static TestRecordingCacheTracker tracker =
+      new TestRecordingCacheTracker();
+
+  @BeforeClass
+  public static void setupTest() {
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+
+    // Track calls to posix_fadvise.
+    NativeIO.POSIX.cacheTracker = tracker;
+    
+    // Normally, we wait for a few megabytes of data to be read or written 
+    // before dropping the cache.  This is to avoid an excessive number of
+    // JNI calls to the posix_fadvise function.  However, for the purpose
+    // of this test, we want to use small files and see all fadvise calls
+    // happen.
+    BlockSender.CACHE_DROP_INTERVAL_BYTES = 4096;
+    BlockReceiver.CACHE_DROP_LAG_BYTES = 4096;
+  }
+
+  private static class Stats {
+    private final String fileName;
+    private final boolean dropped[] = new boolean[MAX_TEST_FILE_LEN];
+
+    Stats(String fileName) {
+      this.fileName = fileName;
+    }
+    
+    synchronized void fadvise(int offset, int len, int flags) {
+      LOG.debug("got fadvise(offset=" + offset + ", len=" + len +
+          ",flags=" + flags + ")");
+      if (flags == NativeIO.POSIX.POSIX_FADV_DONTNEED) {
+        for (int i = 0; i < (int)len; i++) {
+          dropped[(int)(offset + i)] = true;
+        }
+      }
+    }
+
+    synchronized void assertNotDroppedInRange(int start, int end) {
+      for (int i = start; i < end; i++) {
+        if (dropped[i]) {
+          throw new RuntimeException("in file " + fileName + ", we " +
+              "dropped the cache at offset " + i);
+        }
+      }
+    }
+    
+    synchronized void assertDroppedInRange(int start, int end) {
+      for (int i = start; i < end; i++) {
+        if (!dropped[i]) {
+          throw new RuntimeException("in file " + fileName + ", we " +
+              "did not drop the cache at offset " + i);
+        }
+      }
+    }
+    
+    synchronized void clear() {
+      Arrays.fill(dropped, false);
+    }
+  }
+
+  private static class TestRecordingCacheTracker implements CacheTracker {
+    private final Map<String, Stats> map = new TreeMap<String, Stats>();
+
+    @Override
+    synchronized public void fadvise(String name,
+        long offset, long len, int flags) {
+      if ((len < 0) || (len > Integer.MAX_VALUE)) {
+        throw new RuntimeException("invalid length of " + len +
+            " passed to posixFadviseIfPossible");
+      }
+      if ((offset < 0) || (offset > Integer.MAX_VALUE)) {
+        throw new RuntimeException("invalid offset of " + offset +
+            " passed to posixFadviseIfPossible");
+      }
+      Stats stats = map.get(name);
+      if (stats == null) {
+        stats = new Stats(name);
+        map.put(name, stats);
+      }
+      stats.fadvise((int)offset, (int)len, flags);
+    }
+
+    synchronized void clear() {
+      map.clear();
+    }
+    
+    synchronized Stats getStats(String fileName) {
+      return map.get(fileName);
+    }
+    
+    synchronized public String toString() {
+      StringBuilder bld = new StringBuilder();
+      bld.append("TestRecordingCacheManipulator{");
+      String prefix = "";
+      for (String fileName : map.keySet()) {
+        bld.append(prefix);
+        prefix = ", ";
+        bld.append(fileName);
+      }
+      bld.append("}");
+      return bld.toString();
+    }
+  }
+
+  static void createHdfsFile(FileSystem fs, Path p, long length,
+        Boolean dropBehind) throws Exception {
+    FSDataOutputStream fos = null;
+    try {
+      // create file with replication factor of 1
+      fos = fs.create(p, (short)1);
+      if (dropBehind != null) {
+        fos.setDropBehind(dropBehind);
+      }
+      byte buf[] = new byte[8196];
+      while (length > 0) {
+        int amt = (length > buf.length) ? (int)buf.length : (int)length;
+        fos.write(buf, 0, amt);
+        length -= amt;
+      }
+    } catch (IOException e) {
+      LOG.error("ioexception", e);
+    } finally {
+      if (fos != null) {
+        fos.close();
+      }
+    }
+  }
+  
+  static long readHdfsFile(FileSystem fs, Path p, long length,
+      Boolean dropBehind) throws Exception {
+    FSDataInputStream fis = null;
+    long totalRead = 0;
+    try {
+      fis = fs.open(p);
+      if (dropBehind != null) {
+        fis.setDropBehind(dropBehind);
+      }
+      byte buf[] = new byte[8196];
+      while (length > 0) {
+        int amt = (length > buf.length) ? (int)buf.length : (int)length;
+        int ret = fis.read(buf, 0, amt);
+        if (ret == -1) {
+          return totalRead;
+        }
+        totalRead += ret;
+        length -= ret;
+      }
+    } catch (IOException e) {
+      LOG.error("ioexception", e);
+    } finally {
+      if (fis != null) {
+        fis.close();
+      }
+    }
+    throw new RuntimeException("unreachable");
+  }
+ 
+  @Test(timeout=120000)
+  public void testFadviseAfterWriteThenRead() throws Exception {
+    // start a cluster
+    LOG.info("testFadviseAfterWriteThenRead");
+    tracker.clear();
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    String TEST_PATH = "/test";
+    int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+
+      // create new file
+      createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, true);
+      // verify that we dropped everything from the cache during file creation.
+      ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+          TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+      String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+      Stats stats = tracker.getStats(fadvisedFileName);
+      stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+      stats.clear();
+      
+      // read file
+      readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, true);
+      // verify that we dropped everything from the cache.
+      Assert.assertNotNull(stats);
+      stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /***
+   * Test the scenario where the DataNode defaults to not dropping the cache,
+   * but our client defaults are set.
+   */
+  @Test(timeout=120000)
+  public void testClientDefaults() throws Exception {
+    // start a cluster
+    LOG.info("testClientDefaults");
+    tracker.clear();
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, false);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, false);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, true);
+    MiniDFSCluster cluster = null;
+    String TEST_PATH = "/test";
+    int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      
+      // create new file
+      createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, null);
+      // verify that we dropped everything from the cache during file creation.
+      ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+          TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+      String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+      Stats stats = tracker.getStats(fadvisedFileName);
+      stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+      stats.clear();
+      
+      // read file
+      readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, null);
+      // verify that we dropped everything from the cache.
+      Assert.assertNotNull(stats);
+      stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testFadviseSkippedForSmallReads() throws Exception {
+    // start a cluster
+    LOG.info("testFadviseSkippedForSmallReads");
+    tracker.clear();
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, true);
+    MiniDFSCluster cluster = null;
+    String TEST_PATH = "/test";
+    int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+    FSDataInputStream fis = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+
+      // create new file
+      createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, null);
+      // Since the DataNode was configured with drop-behind, and we didn't
+      // specify any policy, we should have done drop-behind.
+      ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+          TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+      String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+      Stats stats = tracker.getStats(fadvisedFileName);
+      stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+      stats.clear();
+      stats.assertNotDroppedInRange(0, TEST_PATH_LEN);
+
+      // read file
+      fis = fs.open(new Path(TEST_PATH));
+      byte buf[] = new byte[17];
+      fis.readFully(4096, buf, 0, buf.length);
+
+      // we should not have dropped anything because of the small read.
+      stats = tracker.getStats(fadvisedFileName);
+      stats.assertNotDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+    } finally {
+      IOUtils.cleanup(null, fis);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test(timeout=120000)
+  public void testNoFadviseAfterWriteThenRead() throws Exception {
+    // start a cluster
+    LOG.info("testNoFadviseAfterWriteThenRead");
+    tracker.clear();
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    String TEST_PATH = "/test";
+    int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+
+      // create new file
+      createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
+      // verify that we did not drop everything from the cache during file creation.
+      ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+          TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+      String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+      Stats stats = tracker.getStats(fadvisedFileName);
+      Assert.assertNull(stats);
+      
+      // read file
+      readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, false);
+      // verify that we dropped everything from the cache.
+      Assert.assertNull(stats);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Fri Sep 27 15:47:11 2013
@@ -287,7 +287,8 @@ public class TestDataNodeVolumeFailure {
     BlockReader blockReader =
       BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
         lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
-        TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false);
+        TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false,
+        CachingStrategy.newDefaultStrategy());
     blockReader.close();
   }
   

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Fri Sep 27 15:47:11 2013
@@ -148,7 +148,7 @@ public class TestDiskError {
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         new DatanodeInfo[0], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
-        checksum);
+        checksum, CachingStrategy.newDefaultStrategy());
     out.flush();
 
     // close the connection before sending the content of the block