You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/09/27 17:47:13 UTC
svn commit: r1526961 - in
/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src:
main/java/org/apache/hadoop/hdfs/
main/java/org/apache/hadoop/hdfs/protocol/datatransfer/
main/java/org/apache/hadoop/hdfs/server/common/ main/java/o...
Author: cmccabe
Date: Fri Sep 27 15:47:11 2013
New Revision: 1526961
URL: http://svn.apache.org/r1526961
Log:
HDFS-4817. Make HDFS advisory caching configurable on a per-file basis. (Contributed by Colin Patrick McCabe)
Added:
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Fri Sep 27 15:47:11 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocolPB
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.unix.DomainSocket;
@@ -85,7 +86,8 @@ public class BlockReaderFactory {
DomainSocketFactory domSockFactory,
PeerCache peerCache,
FileInputStreamCache fisCache,
- boolean allowShortCircuitLocalReads)
+ boolean allowShortCircuitLocalReads,
+ CachingStrategy cachingStrategy)
throws IOException {
peer.setReadTimeout(conf.socketTimeout);
peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
@@ -122,12 +124,14 @@ public class BlockReaderFactory {
@SuppressWarnings("deprecation")
RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
block, blockToken, startOffset, len, conf.ioBufferSize,
- verifyChecksum, clientName, peer, datanodeID, peerCache);
+ verifyChecksum, clientName, peer, datanodeID, peerCache,
+ cachingStrategy);
return reader;
} else {
return RemoteBlockReader2.newBlockReader(
file, block, blockToken, startOffset, len,
- verifyChecksum, clientName, peer, datanodeID, peerCache);
+ verifyChecksum, clientName, peer, datanodeID, peerCache,
+ cachingStrategy);
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 27 15:47:11 2013
@@ -44,6 +44,9 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
@@ -136,6 +139,7 @@ import org.apache.hadoop.hdfs.protocolPB
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -202,6 +206,8 @@ public class DFSClient implements java.i
private SocketAddress[] localInterfaceAddrs;
private DataEncryptionKey encryptionKey;
private boolean shouldUseLegacyBlockReaderLocal;
+ private final CachingStrategy defaultReadCachingStrategy;
+ private final CachingStrategy defaultWriteCachingStrategy;
/**
* DFSClient configuration
@@ -520,6 +526,16 @@ public class DFSClient implements java.i
}
this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
+ Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
+ null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
+ Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
+ null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
+ Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
+ null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
+ this.defaultReadCachingStrategy =
+ new CachingStrategy(readDropBehind, readahead);
+ this.defaultWriteCachingStrategy =
+ new CachingStrategy(writeDropBehind, readahead);
}
/**
@@ -1990,7 +2006,8 @@ public class DFSClient implements java.i
HdfsConstants.SMALL_BUFFER_SIZE));
DataInputStream in = new DataInputStream(pair.in);
- new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true);
+ new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
+ 0, 1, true, CachingStrategy.newDefaultStrategy());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
@@ -2489,4 +2506,12 @@ public class DFSClient implements java.i
public boolean useLegacyBlockReaderLocal() {
return shouldUseLegacyBlockReaderLocal;
}
+
+ public CachingStrategy getDefaultReadCachingStrategy() {
+ return defaultReadCachingStrategy;
+ }
+
+ public CachingStrategy getDefaultWriteCachingStrategy() {
+ return defaultWriteCachingStrategy;
+ }
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Sep 27 15:47:11 2013
@@ -54,6 +54,9 @@ public class DFSConfigKeys extends Commo
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
+ public static final String DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = "dfs.client.cache.drop.behind.writes";
+ public static final String DFS_CLIENT_CACHE_DROP_BEHIND_READS = "dfs.client.cache.drop.behind.reads";
+ public static final String DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead";
public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Sep 27 15:47:11 2013
@@ -36,6 +36,8 @@ import java.util.concurrent.ConcurrentHa
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -65,7 +68,8 @@ import com.google.common.annotations.Vis
* negotiation of the namenode and various datanodes as necessary.
****************************************************************/
@InterfaceAudience.Private
-public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
+public class DFSInputStream extends FSInputStream
+implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
@VisibleForTesting
static boolean tcpReadsDisabledForTesting = false;
private final PeerCache peerCache;
@@ -80,6 +84,7 @@ public class DFSInputStream extends FSIn
private LocatedBlock currentLocatedBlock = null;
private long pos = 0;
private long blockEnd = -1;
+ private CachingStrategy cachingStrategy;
private final ReadStatistics readStatistics = new ReadStatistics();
public static class ReadStatistics {
@@ -185,6 +190,8 @@ public class DFSInputStream extends FSIn
this.fileInputStreamCache = new FileInputStreamCache(
dfsClient.getConf().shortCircuitStreamsCacheSize,
dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
+ this.cachingStrategy =
+ dfsClient.getDefaultReadCachingStrategy().duplicate();
openInfo();
}
@@ -1035,7 +1042,7 @@ public class DFSInputStream extends FSIn
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache,
- allowShortCircuitLocalReads);
+ allowShortCircuitLocalReads, cachingStrategy);
return reader;
} catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@@ -1058,7 +1065,7 @@ public class DFSInputStream extends FSIn
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache,
- allowShortCircuitLocalReads);
+ allowShortCircuitLocalReads, cachingStrategy);
return reader;
} catch (IOException e) {
DFSClient.LOG.warn("failed to connect to " + domSock, e);
@@ -1081,7 +1088,8 @@ public class DFSInputStream extends FSIn
reader = BlockReaderFactory.newBlockReader(
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
- dsFactory, peerCache, fileInputStreamCache, false);
+ dsFactory, peerCache, fileInputStreamCache, false,
+ cachingStrategy);
return reader;
} catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@@ -1100,7 +1108,8 @@ public class DFSInputStream extends FSIn
return BlockReaderFactory.newBlockReader(
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
- dsFactory, peerCache, fileInputStreamCache, false);
+ dsFactory, peerCache, fileInputStreamCache, false,
+ cachingStrategy);
}
@@ -1358,4 +1367,30 @@ public class DFSInputStream extends FSIn
public synchronized ReadStatistics getReadStatistics() {
return new ReadStatistics(readStatistics);
}
+
+ private synchronized void closeCurrentBlockReader() {
+ if (blockReader == null) return;
+ // Close the current block reader so that the new caching settings can
+ // take effect immediately.
+ try {
+ blockReader.close();
+ } catch (IOException e) {
+ DFSClient.LOG.error("error closing blockReader", e);
+ }
+ blockReader = null;
+ }
+
+ @Override
+ public synchronized void setReadahead(Long readahead)
+ throws IOException {
+ this.cachingStrategy.setReadahead(readahead);
+ closeCurrentBlockReader();
+ }
+
+ @Override
+ public synchronized void setDropBehind(Boolean dropBehind)
+ throws IOException {
+ this.cachingStrategy.setDropBehind(dropBehind);
+ closeCurrentBlockReader();
+ }
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Sep 27 15:47:11 2013
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -71,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
@@ -83,6 +85,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
+import org.mortbay.log.Log;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
@@ -115,7 +118,8 @@ import com.google.common.cache.RemovalNo
* starts sending packets from the dataQueue.
****************************************************************/
@InterfaceAudience.Private
-public class DFSOutputStream extends FSOutputSummer implements Syncable {
+public class DFSOutputStream extends FSOutputSummer
+ implements Syncable, CanSetDropBehind {
private final DFSClient dfsClient;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private Socket s;
@@ -147,6 +151,7 @@ public class DFSOutputStream extends FSO
private Progressable progress;
private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close
+ private CachingStrategy cachingStrategy;
private class Packet {
long seqno; // sequencenumber of buffer in block
@@ -1143,7 +1148,8 @@ public class DFSOutputStream extends FSO
// send the request
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
- nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
+ nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
+ cachingStrategy);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1340,6 +1346,8 @@ public class DFSOutputStream extends FSO
this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication();
this.progress = progress;
+ this.cachingStrategy =
+ dfsClient.getDefaultWriteCachingStrategy().duplicate();
if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug(
"Set non-null progress callback on DFSOutputStream " + src);
@@ -1937,4 +1945,8 @@ public class DFSOutputStream extends FSO
return streamer.getBlockToken();
}
+ @Override
+ public void setDropBehind(Boolean dropBehind) throws IOException {
+ this.cachingStrategy.setDropBehind(dropBehind);
+ }
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Fri Sep 27 15:47:11 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@@ -381,13 +382,14 @@ public class RemoteBlockReader extends F
int bufferSize, boolean verifyChecksum,
String clientName, Peer peer,
DatanodeID datanodeID,
- PeerCache peerCache)
- throws IOException {
+ PeerCache peerCache,
+ CachingStrategy cachingStrategy)
+ throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
- verifyChecksum);
+ verifyChecksum, cachingStrategy);
//
// Get bytes in block, set streams
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Fri Sep 27 15:47:11 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -375,12 +376,13 @@ public class RemoteBlockReader2 impleme
boolean verifyChecksum,
String clientName,
Peer peer, DatanodeID datanodeID,
- PeerCache peerCache) throws IOException {
+ PeerCache peerCache,
+ CachingStrategy cachingStrategy) throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
- verifyChecksum);
+ verifyChecksum, cachingStrategy);
//
// Get bytes in block
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Fri Sep 27 15:47:11 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -57,13 +58,15 @@ public interface DataTransferProtocol {
* @param length maximum number of bytes for this read.
* @param sendChecksum if false, the DN should skip reading and sending
* checksums
+ * @param cachingStrategy The caching strategy to use.
*/
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length,
- final boolean sendChecksum) throws IOException;
+ final boolean sendChecksum,
+ final CachingStrategy cachingStrategy) throws IOException;
/**
* Write a block to a datanode pipeline.
@@ -89,7 +92,8 @@ public interface DataTransferProtocol {
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
- final DataChecksum requestedChecksum) throws IOException;
+ final DataChecksum requestedChecksum,
+ final CachingStrategy cachingStrategy) throws IOException;
/**
* Transfer a block to another datanode.
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Fri Sep 27 15:47:11 2013
@@ -31,8 +31,10 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
/** Receiver */
@InterfaceAudience.Private
@@ -85,6 +87,14 @@ public abstract class Receiver implement
}
}
+ static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy) {
+ Boolean dropBehind = strategy.hasDropBehind() ?
+ strategy.getDropBehind() : null;
+ Long readahead = strategy.hasReadahead() ?
+ strategy.getReadahead() : null;
+ return new CachingStrategy(dropBehind, readahead);
+ }
+
/** Receive OP_READ_BLOCK */
private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
@@ -93,7 +103,10 @@ public abstract class Receiver implement
proto.getHeader().getClientName(),
proto.getOffset(),
proto.getLen(),
- proto.getSendChecksums());
+ proto.getSendChecksums(),
+ (proto.hasCachingStrategy() ?
+ getCachingStrategy(proto.getCachingStrategy()) :
+ CachingStrategy.newDefaultStrategy()));
}
/** Receive OP_WRITE_BLOCK */
@@ -108,7 +121,10 @@ public abstract class Receiver implement
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(),
- fromProto(proto.getRequestedChecksum()));
+ fromProto(proto.getRequestedChecksum()),
+ (proto.hasCachingStrategy() ?
+ getCachingStrategy(proto.getCachingStrategy()) :
+ CachingStrategy.newDefaultStrategy()));
}
/** Receive {@link Op#TRANSFER_BLOCK} */
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Fri Sep 27 15:47:11 2013
@@ -35,9 +35,11 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -72,19 +74,32 @@ public class Sender implements DataTrans
out.flush();
}
+ static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
+ CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
+ if (cachingStrategy.getReadahead() != null) {
+ builder.setReadahead(cachingStrategy.getReadahead().longValue());
+ }
+ if (cachingStrategy.getDropBehind() != null) {
+ builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
+ }
+ return builder.build();
+ }
+
@Override
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length,
- final boolean sendChecksum) throws IOException {
+ final boolean sendChecksum,
+ final CachingStrategy cachingStrategy) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
.setLen(length)
.setSendChecksums(sendChecksum)
+ .setCachingStrategy(getCachingStrategy(cachingStrategy))
.build();
send(out, Op.READ_BLOCK, proto);
@@ -102,7 +117,8 @@ public class Sender implements DataTrans
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
- DataChecksum requestedChecksum) throws IOException {
+ DataChecksum requestedChecksum,
+ final CachingStrategy cachingStrategy) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
@@ -117,7 +133,8 @@ public class Sender implements DataTrans
.setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp)
- .setRequestedChecksum(checksumProto);
+ .setRequestedChecksum(checksumProto)
+ .setCachingStrategy(getCachingStrategy(cachingStrategy));
if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source));
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Sep 27 15:47:11 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
@@ -217,7 +218,7 @@ public class JspHelper {
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
new DatanodeID(addr.getAddress().getHostAddress(),
addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
- null, null, false);
+ null, null, false, CachingStrategy.newDefaultStrategy());
final byte[] buf = new byte[amtToRead];
int readOffset = 0;
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Fri Sep 27 15:47:11 2013
@@ -417,7 +417,7 @@ class BlockPoolSliceScanner {
adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, true, true,
- datanode, null);
+ datanode, null, CachingStrategy.newDropBehind());
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Sep 27 15:47:11 2013
@@ -53,6 +53,8 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.annotations.VisibleForTesting;
+
/** A class that receives a block and writes to its own disk, meanwhile
* may copies it to another site. If a throttler is provided,
* streaming throttling is also supported.
@@ -61,7 +63,8 @@ class BlockReceiver implements Closeable
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
- private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
+ @VisibleForTesting
+ static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
private DataInputStream in = null; // from where data are read
private DataChecksum clientChecksum; // checksum used by client
@@ -97,8 +100,8 @@ class BlockReceiver implements Closeable
// Cache management state
private boolean dropCacheBehindWrites;
+ private long lastCacheManagementOffset = 0;
private boolean syncBehindWrites;
- private long lastCacheDropOffset = 0;
/** The client name. It is empty if a datanode is the client */
private final String clientname;
@@ -120,8 +123,8 @@ class BlockReceiver implements Closeable
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
- final DataNode datanode, DataChecksum requestedChecksum)
- throws IOException {
+ final DataNode datanode, DataChecksum requestedChecksum,
+ CachingStrategy cachingStrategy) throws IOException {
try{
this.block = block;
this.in = in;
@@ -146,6 +149,7 @@ class BlockReceiver implements Closeable
+ "\n isClient =" + isClient + ", clientname=" + clientname
+ "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
+ "\n inAddr=" + inAddr + ", myAddr=" + myAddr
+ + "\n cachingStrategy = " + cachingStrategy
);
}
@@ -192,7 +196,9 @@ class BlockReceiver implements Closeable
" while receiving block " + block + " from " + inAddr);
}
}
- this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites;
+ this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
+ datanode.getDnConf().dropCacheBehindWrites :
+ cachingStrategy.getDropBehind();
this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
final boolean isCreate = isDatanode || isTransfer
@@ -598,7 +604,7 @@ class BlockReceiver implements Closeable
datanode.metrics.incrBytesWritten(len);
- dropOsCacheBehindWriter(offsetInBlock);
+ manageWriterOsCache(offsetInBlock);
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
@@ -620,25 +626,44 @@ class BlockReceiver implements Closeable
return lastPacketInBlock?-1:len;
}
- private void dropOsCacheBehindWriter(long offsetInBlock) {
+ private void manageWriterOsCache(long offsetInBlock) {
try {
if (outFd != null &&
- offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
- long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
- if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
- NativeIO.POSIX.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
- NativeIO.POSIX.POSIX_FADV_DONTNEED);
- }
-
+ offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
+ //
+ // For SYNC_FILE_RANGE_WRITE, we want to sync from
+ // lastCacheManagementOffset to a position "two windows ago"
+ //
+ // <========= sync ===========>
+ // +-----------------------O--------------------------X
+ // start last curPos
+ // of file
+ //
if (syncBehindWrites) {
- NativeIO.POSIX.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
+ NativeIO.POSIX.syncFileRangeIfPossible(outFd,
+ lastCacheManagementOffset,
+ offsetInBlock - lastCacheManagementOffset,
NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
}
-
- lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
+ //
+ // For POSIX_FADV_DONTNEED, we want to drop from the beginning
+ // of the file to a position prior to the current position.
+ //
+ // <=== drop =====>
+ // <---W--->
+ // +--------------+--------O--------------------------X
+ // start dropPos last curPos
+ // of file
+ //
+ long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
+ if (dropPos > 0 && dropCacheBehindWrites) {
+ NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
+ outFd, 0, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ }
+ lastCacheManagementOffset = offsetInBlock;
}
} catch (Throwable t) {
- LOG.warn("Couldn't drop os cache behind writer for " + block, t);
+ LOG.warn("Error managing cache for writer of block " + block, t);
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Sep 27 15:47:11 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.io.nativeio.Nat
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -141,13 +142,22 @@ class BlockSender implements java.io.Clo
// Cache-management related fields
private final long readaheadLength;
- private boolean shouldDropCacheBehindRead;
+
private ReadaheadRequest curReadahead;
+
+ private final boolean alwaysReadahead;
+
+ private final boolean dropCacheBehindLargeReads;
+
+ private final boolean dropCacheBehindAllReads;
+
private long lastCacheDropOffset;
- private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+
+ @VisibleForTesting
+ static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+
/**
- * Minimum length of read below which management of the OS
- * buffer cache is disabled.
+ * See {{@link BlockSender#isLongRead()}
*/
private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
@@ -167,16 +177,42 @@ class BlockSender implements java.io.Clo
*/
BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean verifyChecksum,
- boolean sendChecksum,
- DataNode datanode, String clientTraceFmt)
+ boolean sendChecksum, DataNode datanode, String clientTraceFmt,
+ CachingStrategy cachingStrategy)
throws IOException {
try {
this.block = block;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;
- this.readaheadLength = datanode.getDnConf().readaheadLength;
- this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
+
+ /*
+ * If the client asked for the cache to be dropped behind all reads,
+ * we honor that. Otherwise, we use the DataNode defaults.
+ * When using DataNode defaults, we use a heuristic where we only
+ * drop the cache for large reads.
+ */
+ if (cachingStrategy.getDropBehind() == null) {
+ this.dropCacheBehindAllReads = false;
+ this.dropCacheBehindLargeReads =
+ datanode.getDnConf().dropCacheBehindReads;
+ } else {
+ this.dropCacheBehindAllReads =
+ this.dropCacheBehindLargeReads =
+ cachingStrategy.getDropBehind().booleanValue();
+ }
+ /*
+ * Similarly, if readahead was explicitly requested, we always do it.
+ * Otherwise, we read ahead based on the DataNode settings, and only
+ * when the reads are large.
+ */
+ if (cachingStrategy.getReadahead() == null) {
+ this.alwaysReadahead = false;
+ this.readaheadLength = datanode.getDnConf().readaheadLength;
+ } else {
+ this.alwaysReadahead = true;
+ this.readaheadLength = cachingStrategy.getReadahead().longValue();
+ }
this.datanode = datanode;
if (verifyChecksum) {
@@ -335,10 +371,11 @@ class BlockSender implements java.io.Clo
*/
@Override
public void close() throws IOException {
- if (blockInFd != null && shouldDropCacheBehindRead && isLongRead()) {
- // drop the last few MB of the file from cache
+ if (blockInFd != null &&
+ ((dropCacheBehindAllReads) ||
+ (dropCacheBehindLargeReads && isLongRead()))) {
try {
- NativeIO.POSIX.posixFadviseIfPossible(
+ NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Exception e) {
@@ -637,7 +674,7 @@ class BlockSender implements java.io.Clo
if (isLongRead() && blockInFd != null) {
// Advise that this file descriptor will be accessed sequentially.
- NativeIO.POSIX.posixFadviseIfPossible(
+ NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
}
@@ -705,37 +742,47 @@ class BlockSender implements java.io.Clo
* and drop-behind.
*/
private void manageOsCache() throws IOException {
- if (!isLongRead() || blockInFd == null) {
- // don't manage cache manually for short-reads, like
- // HBase random read workloads.
- return;
- }
+ // We can't manage the cache for this block if we don't have a file
+ // descriptor to work with.
+ if (blockInFd == null) return;
// Perform readahead if necessary
- if (readaheadLength > 0 && datanode.readaheadPool != null) {
+ if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
+ (alwaysReadahead || isLongRead())) {
curReadahead = datanode.readaheadPool.readaheadStream(
- clientTraceFmt, blockInFd,
- offset, readaheadLength, Long.MAX_VALUE,
+ clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
curReadahead);
}
// Drop what we've just read from cache, since we aren't
// likely to need it again
- long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
- if (shouldDropCacheBehindRead &&
- offset >= nextCacheDropOffset) {
- long dropLength = offset - lastCacheDropOffset;
- if (dropLength >= 1024) {
- NativeIO.POSIX.posixFadviseIfPossible(blockInFd,
- lastCacheDropOffset, dropLength,
+ if (dropCacheBehindAllReads ||
+ (dropCacheBehindLargeReads && isLongRead())) {
+ long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
+ if (offset >= nextCacheDropOffset) {
+ long dropLength = offset - lastCacheDropOffset;
+ NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
+ blockInFd, lastCacheDropOffset, dropLength,
NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ lastCacheDropOffset = offset;
}
- lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
}
}
+ /**
+ * Returns true if we have done a long enough read for this block to qualify
+ * for the DataNode-wide cache management defaults. We avoid applying the
+ * cache management defaults to smaller reads because the overhead would be
+ * too high.
+ *
+ * Note that if the client explicitly asked for dropBehind, we will do it
+ * even on short reads.
+ *
+ * This is also used to determine when to invoke
+ * posix_fadvise(POSIX_FADV_SEQUENTIAL).
+ */
private boolean isLongRead() {
- return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
+ return (endOffset - initialOffset) > LONG_READ_THRESHOLD_BYTES;
}
/**
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Sep 27 15:47:11 2013
@@ -1517,6 +1517,7 @@ public class DataNode extends Configured
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
final String clientname;
+ final CachingStrategy cachingStrategy;
/**
* Connect to the first item in the target list. Pass along the
@@ -1537,6 +1538,8 @@ public class DataNode extends Configured
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
bpReg = bpos.bpRegistration;
this.clientname = clientname;
+ this.cachingStrategy =
+ new CachingStrategy(true, getDnConf().readaheadLength);
}
/**
@@ -1579,7 +1582,7 @@ public class DataNode extends Configured
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
- false, false, true, DataNode.this, null);
+ false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
//
@@ -1592,7 +1595,7 @@ public class DataNode extends Configured
}
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
- stage, 0, 0, 0, 0, blockSender.getChecksum());
+ stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
// send data & checksum
blockSender.sendBlock(out, unbufOut, null);
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Sep 27 15:47:11 2013
@@ -299,7 +299,8 @@ class DataXceiver extends Receiver imple
final String clientName,
final long blockOffset,
final long length,
- final boolean sendChecksum) throws IOException {
+ final boolean sendChecksum,
+ final CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientName;
OutputStream baseStream = getOutputStream();
@@ -324,7 +325,8 @@ class DataXceiver extends Receiver imple
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
- true, false, sendChecksum, datanode, clientTraceFmt);
+ true, false, sendChecksum, datanode, clientTraceFmt,
+ cachingStrategy);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
@@ -393,7 +395,8 @@ class DataXceiver extends Receiver imple
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
- DataChecksum requestedChecksum) throws IOException {
+ DataChecksum requestedChecksum,
+ CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0;
@@ -452,7 +455,8 @@ class DataXceiver extends Receiver imple
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
- clientname, srcDataNode, datanode, requestedChecksum);
+ clientname, srcDataNode, datanode, requestedChecksum,
+ cachingStrategy);
} else {
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
}
@@ -497,7 +501,8 @@ class DataXceiver extends Receiver imple
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize,
- minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
+ minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
+ cachingStrategy);
mirrorOut.flush();
@@ -715,7 +720,7 @@ class DataXceiver extends Receiver imple
try {
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
- null);
+ null, CachingStrategy.newDropBehind());
// set up response stream
OutputStream baseStream = getOutputStream();
@@ -846,7 +851,8 @@ class DataXceiver extends Receiver imple
blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
- null, 0, 0, 0, "", null, datanode, remoteChecksum);
+ null, 0, 0, 0, "", null, datanode, remoteChecksum,
+ CachingStrategy.newDropBehind());
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Sep 27 15:47:11 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
@@ -574,8 +575,8 @@ public class NamenodeFsck {
blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
- getDataEncryptionKey()),
- chosenNode, null, null, null, false);
+ getDataEncryptionKey()), chosenNode, null, null, null,
+ false, CachingStrategy.newDropBehind());
} catch (IOException ex) {
// Put chosen node into dead list, continue
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Fri Sep 27 15:47:11 2013
@@ -54,11 +54,17 @@ message ClientOperationHeaderProto {
required string clientName = 2;
}
+message CachingStrategyProto {
+ optional bool dropBehind = 1;
+ optional int64 readahead = 2;
+}
+
message OpReadBlockProto {
required ClientOperationHeaderProto header = 1;
required uint64 offset = 2;
required uint64 len = 3;
optional bool sendChecksums = 4 [default = true];
+ optional CachingStrategyProto cachingStrategy = 5;
}
@@ -100,6 +106,7 @@ message OpWriteBlockProto {
* The requested checksum mechanism for this block write.
*/
required ChecksumProto requestedChecksum = 9;
+ optional CachingStrategyProto cachingStrategy = 10;
}
message OpTransferBlockProto {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Sep 27 15:47:11 2013
@@ -1332,6 +1332,51 @@
</property>
<property>
+ <name>dfs.client.cache.drop.behind.writes</name>
+ <value></value>
+ <description>
+ Just like dfs.datanode.drop.cache.behind.writes, this setting causes the
+ page cache to be dropped behind HDFS writes, potentially freeing up more
+ memory for other uses. Unlike dfs.datanode.drop.cache.behind.writes, this
+ is a client-side setting rather than a setting for the entire datanode.
+ If present, this setting will override the DataNode default.
+
+ If the native libraries are not available to the DataNode, this
+ configuration has no effect.
+ </description>
+</property>
+
+<property>
+ <name>dfs.client.cache.drop.behind.reads</name>
+ <value></value>
+ <description>
+ Just like dfs.datanode.drop.cache.behind.reads, this setting causes the
+ page cache to be dropped behind HDFS reads, potentially freeing up more
+ memory for other uses. Unlike dfs.datanode.drop.cache.behind.reads, this
+ is a client-side setting rather than a setting for the entire datanode. If
+ present, this setting will override the DataNode default.
+
+ If the native libraries are not available to the DataNode, this
+ configuration has no effect.
+ </description>
+</property>
+
+<property>
+ <name>dfs.client.cache.readahead</name>
+ <value></value>
+ <description>
+ Just like dfs.datanode.readahead.bytes, this setting causes the datanode to
+ read ahead in the block file using posix_fadvise, potentially decreasing
+ I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side
+ setting rather than a setting for the entire datanode. If present, this
+ setting will override the DataNode default.
+
+ If the native libraries are not available to the DataNode, this
+ configuration has no effect.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.enable.retrycache</name>
<value>true</value>
<description>
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Fri Sep 27 15:47:11 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.net.NetUtils;
@@ -155,7 +156,7 @@ public class BlockReaderTestUtil {
testBlock.getBlockToken(),
offset, lenToRead,
true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
- nodes[0], null, null, null, false);
+ nodes[0], null, null, null, false, CachingStrategy.newDefaultStrategy());
}
/**
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Fri Sep 27 15:47:11 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
@@ -194,7 +195,7 @@ public class TestDataTransferProtocol {
sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS,
- DEFAULT_CHECKSUM);
+ DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
if (eofExcepted) {
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData(description, true);
@@ -391,7 +392,7 @@ public class TestDataTransferProtocol {
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE,
0, 0L, 0L, 0L,
- badChecksum);
+ badChecksum, CachingStrategy.newDefaultStrategy());
recvBuf.reset();
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true);
@@ -402,7 +403,7 @@ public class TestDataTransferProtocol {
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
- DEFAULT_CHECKSUM);
+ DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
PacketHeader hdr = new PacketHeader(
4, // size of packet
@@ -425,7 +426,7 @@ public class TestDataTransferProtocol {
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
- DEFAULT_CHECKSUM);
+ DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
hdr = new PacketHeader(
8, // size of packet
@@ -452,21 +453,21 @@ public class TestDataTransferProtocol {
recvBuf.reset();
blk.setBlockId(blkid-1);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
- 0L, fileLen, true);
+ 0L, fileLen, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
// negative block start offset -1L
sendBuf.reset();
blk.setBlockId(blkid);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
- -1L, fileLen, true);
+ -1L, fileLen, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Negative start-offset for read for block " +
firstBlock.getBlockId(), false);
// bad block start offset
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
- fileLen, fileLen, true);
+ fileLen, fileLen, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Wrong start-offset for reading block " +
firstBlock.getBlockId(), false);
@@ -483,7 +484,8 @@ public class TestDataTransferProtocol {
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
- 0L, -1L-random.nextInt(oneMil), true);
+ 0L, -1L-random.nextInt(oneMil), true,
+ CachingStrategy.newDefaultStrategy());
sendRecvData("Negative length for reading block " +
firstBlock.getBlockId(), false);
@@ -496,14 +498,14 @@ public class TestDataTransferProtocol {
recvOut);
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
- 0L, fileLen+1, true);
+ 0L, fileLen+1, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Wrong length for reading block " +
firstBlock.getBlockId(), false);
//At the end of all this, read the file to make sure that succeeds finally.
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
- 0L, fileLen, true);
+ 0L, fileLen, true, CachingStrategy.newDefaultStrategy());
readFile(fileSys, file, fileLen);
} finally {
cluster.shutdown();
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Fri Sep 27 15:47:11 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.net.NetUtils;
@@ -148,7 +149,8 @@ public class TestBlockTokenWithDFS {
blockReader = BlockReaderFactory.newBlockReader(
new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
- nodes[0], null, null, null, false);
+ nodes[0], null, null, null, false,
+ CachingStrategy.newDefaultStrategy());
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) {
Added: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java?rev=1526961&view=auto
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java (added)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java Fri Sep 27 15:47:11 2013
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCachingStrategy {
+ private static final Log LOG = LogFactory.getLog(TestCachingStrategy.class);
+ private static int MAX_TEST_FILE_LEN = 1024 * 1024;
+ private static int WRITE_PACKET_SIZE = DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+
+ private final static TestRecordingCacheTracker tracker =
+ new TestRecordingCacheTracker();
+
+ @BeforeClass
+ public static void setupTest() {
+ EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+
+ // Track calls to posix_fadvise.
+ NativeIO.POSIX.cacheTracker = tracker;
+
+ // Normally, we wait for a few megabytes of data to be read or written
+ // before dropping the cache. This is to avoid an excessive number of
+ // JNI calls to the posix_fadvise function. However, for the purpose
+ // of this test, we want to use small files and see all fadvise calls
+ // happen.
+ BlockSender.CACHE_DROP_INTERVAL_BYTES = 4096;
+ BlockReceiver.CACHE_DROP_LAG_BYTES = 4096;
+ }
+
+ private static class Stats {
+ private final String fileName;
+ private final boolean dropped[] = new boolean[MAX_TEST_FILE_LEN];
+
+ Stats(String fileName) {
+ this.fileName = fileName;
+ }
+
+ synchronized void fadvise(int offset, int len, int flags) {
+ LOG.debug("got fadvise(offset=" + offset + ", len=" + len +
+ ",flags=" + flags + ")");
+ if (flags == NativeIO.POSIX.POSIX_FADV_DONTNEED) {
+ for (int i = 0; i < (int)len; i++) {
+ dropped[(int)(offset + i)] = true;
+ }
+ }
+ }
+
+ synchronized void assertNotDroppedInRange(int start, int end) {
+ for (int i = start; i < end; i++) {
+ if (dropped[i]) {
+ throw new RuntimeException("in file " + fileName + ", we " +
+ "dropped the cache at offset " + i);
+ }
+ }
+ }
+
+ synchronized void assertDroppedInRange(int start, int end) {
+ for (int i = start; i < end; i++) {
+ if (!dropped[i]) {
+ throw new RuntimeException("in file " + fileName + ", we " +
+ "did not drop the cache at offset " + i);
+ }
+ }
+ }
+
+ synchronized void clear() {
+ Arrays.fill(dropped, false);
+ }
+ }
+
+ private static class TestRecordingCacheTracker implements CacheTracker {
+ private final Map<String, Stats> map = new TreeMap<String, Stats>();
+
+ @Override
+ synchronized public void fadvise(String name,
+ long offset, long len, int flags) {
+ if ((len < 0) || (len > Integer.MAX_VALUE)) {
+ throw new RuntimeException("invalid length of " + len +
+ " passed to posixFadviseIfPossible");
+ }
+ if ((offset < 0) || (offset > Integer.MAX_VALUE)) {
+ throw new RuntimeException("invalid offset of " + offset +
+ " passed to posixFadviseIfPossible");
+ }
+ Stats stats = map.get(name);
+ if (stats == null) {
+ stats = new Stats(name);
+ map.put(name, stats);
+ }
+ stats.fadvise((int)offset, (int)len, flags);
+ }
+
+ synchronized void clear() {
+ map.clear();
+ }
+
+ synchronized Stats getStats(String fileName) {
+ return map.get(fileName);
+ }
+
+ synchronized public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("TestRecordingCacheManipulator{");
+ String prefix = "";
+ for (String fileName : map.keySet()) {
+ bld.append(prefix);
+ prefix = ", ";
+ bld.append(fileName);
+ }
+ bld.append("}");
+ return bld.toString();
+ }
+ }
+
+ static void createHdfsFile(FileSystem fs, Path p, long length,
+ Boolean dropBehind) throws Exception {
+ FSDataOutputStream fos = null;
+ try {
+ // create file with replication factor of 1
+ fos = fs.create(p, (short)1);
+ if (dropBehind != null) {
+ fos.setDropBehind(dropBehind);
+ }
+ byte buf[] = new byte[8196];
+ while (length > 0) {
+ int amt = (length > buf.length) ? (int)buf.length : (int)length;
+ fos.write(buf, 0, amt);
+ length -= amt;
+ }
+ } catch (IOException e) {
+ LOG.error("ioexception", e);
+ } finally {
+ if (fos != null) {
+ fos.close();
+ }
+ }
+ }
+
+ static long readHdfsFile(FileSystem fs, Path p, long length,
+ Boolean dropBehind) throws Exception {
+ FSDataInputStream fis = null;
+ long totalRead = 0;
+ try {
+ fis = fs.open(p);
+ if (dropBehind != null) {
+ fis.setDropBehind(dropBehind);
+ }
+ byte buf[] = new byte[8196];
+ while (length > 0) {
+ int amt = (length > buf.length) ? (int)buf.length : (int)length;
+ int ret = fis.read(buf, 0, amt);
+ if (ret == -1) {
+ return totalRead;
+ }
+ totalRead += ret;
+ length -= ret;
+ }
+ } catch (IOException e) {
+ LOG.error("ioexception", e);
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+ throw new RuntimeException("unreachable");
+ }
+
+ @Test(timeout=120000)
+ public void testFadviseAfterWriteThenRead() throws Exception {
+ // start a cluster
+ LOG.info("testFadviseAfterWriteThenRead");
+ tracker.clear();
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = null;
+ String TEST_PATH = "/test";
+ int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+ .build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+
+ // create new file
+ createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, true);
+ // verify that we dropped everything from the cache during file creation.
+ ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+ TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+ String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+ Stats stats = tracker.getStats(fadvisedFileName);
+ stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+ stats.clear();
+
+ // read file
+ readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, true);
+ // verify that we dropped everything from the cache.
+ Assert.assertNotNull(stats);
+ stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /***
+ * Test the scenario where the DataNode defaults to not dropping the cache,
+ * but our client defaults are set.
+ */
+ @Test(timeout=120000)
+ public void testClientDefaults() throws Exception {
+ // start a cluster
+ LOG.info("testClientDefaults");
+ tracker.clear();
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, false);
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, false);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS, true);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, true);
+ MiniDFSCluster cluster = null;
+ String TEST_PATH = "/test";
+ int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+ .build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+
+ // create new file
+ createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, null);
+ // verify that we dropped everything from the cache during file creation.
+ ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+ TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+ String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+ Stats stats = tracker.getStats(fadvisedFileName);
+ stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+ stats.clear();
+
+ // read file
+ readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, null);
+ // verify that we dropped everything from the cache.
+ Assert.assertNotNull(stats);
+ stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testFadviseSkippedForSmallReads() throws Exception {
+ // start a cluster
+ LOG.info("testFadviseSkippedForSmallReads");
+ tracker.clear();
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, true);
+ conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, true);
+ MiniDFSCluster cluster = null;
+ String TEST_PATH = "/test";
+ int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+ FSDataInputStream fis = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+ .build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+
+ // create new file
+ createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, null);
+ // Since the DataNode was configured with drop-behind, and we didn't
+ // specify any policy, we should have done drop-behind.
+ ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+ TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+ String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+ Stats stats = tracker.getStats(fadvisedFileName);
+ stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+ stats.clear();
+ stats.assertNotDroppedInRange(0, TEST_PATH_LEN);
+
+ // read file
+ fis = fs.open(new Path(TEST_PATH));
+ byte buf[] = new byte[17];
+ fis.readFully(4096, buf, 0, buf.length);
+
+ // we should not have dropped anything because of the small read.
+ stats = tracker.getStats(fadvisedFileName);
+ stats.assertNotDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+ } finally {
+ IOUtils.cleanup(null, fis);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testNoFadviseAfterWriteThenRead() throws Exception {
+ // start a cluster
+ LOG.info("testNoFadviseAfterWriteThenRead");
+ tracker.clear();
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = null;
+ String TEST_PATH = "/test";
+ int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+ .build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+
+ // create new file
+ createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
+ // verify that we did not drop everything from the cache during file creation.
+ ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+ TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+ String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+ Stats stats = tracker.getStats(fadvisedFileName);
+ Assert.assertNull(stats);
+
+ // read file
+ readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, false);
+ // verify that we dropped everything from the cache.
+ Assert.assertNull(stats);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Fri Sep 27 15:47:11 2013
@@ -287,7 +287,8 @@ public class TestDataNodeVolumeFailure {
BlockReader blockReader =
BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
- TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false);
+ TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false,
+ CachingStrategy.newDefaultStrategy());
blockReader.close();
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1526961&r1=1526960&r2=1526961&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Fri Sep 27 15:47:11 2013
@@ -148,7 +148,7 @@ public class TestDiskError {
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
- checksum);
+ checksum, CachingStrategy.newDefaultStrategy());
out.flush();
// close the connection before sending the content of the block