You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/12/17 21:57:01 UTC
svn commit: r1551701 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/resources/
src/test/java/org/apache/hadoop/hdfs/
Author: cmccabe
Date: Tue Dec 17 20:57:00 2013
New Revision: 1551701
URL: http://svn.apache.org/r1551701
Log:
HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not (cmccabe)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Dec 17 20:57:00 2013
@@ -256,6 +256,9 @@ Trunk (Unreleased)
HDFS-5431. Support cachepool-based limit management in path-based caching
(awang via cmccabe)
+ HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
+ (cmccabe)
+
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Tue Dec 17 20:57:00 2013
@@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
+import java.util.EnumSet;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -89,10 +91,10 @@ public interface BlockReader extends Byt
/**
* Get a ClientMmap object for this BlockReader.
*
- * @param curBlock The current block.
+ * @param opts The read options to use.
* @return The ClientMmap object, or null if mmap is not
* supported.
*/
- ClientMmap getClientMmap(LocatedBlock curBlock,
+ ClientMmap getClientMmap(EnumSet<ReadOption> opts,
ClientMmapManager mmapManager);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Tue Dec 17 20:57:00 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocolPB
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
@@ -98,7 +99,7 @@ public class BlockReaderFactory {
// enabled, try to set up a BlockReaderLocal.
BlockReader reader = newShortCircuitBlockReader(conf, file,
block, blockToken, startOffset, len, peer, datanodeID,
- domSockFactory, verifyChecksum, fisCache);
+ domSockFactory, verifyChecksum, fisCache, cachingStrategy);
if (reader != null) {
// One we've constructed the short-circuit block reader, we don't
// need the socket any more. So let's return it to the cache.
@@ -160,7 +161,8 @@ public class BlockReaderFactory {
* @param verifyChecksum True if we should verify the checksums.
* Note: even if this is true, when
* DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
- * set, we will skip checksums.
+ * set or the block is mlocked, we will skip
+ * checksums.
*
* @return The BlockReaderLocal, or null if the
* DataNode declined to provide short-circuit
@@ -172,7 +174,8 @@ public class BlockReaderFactory {
Token<BlockTokenIdentifier> blockToken, long startOffset,
long len, Peer peer, DatanodeID datanodeID,
DomainSocketFactory domSockFactory, boolean verifyChecksum,
- FileInputStreamCache fisCache) throws IOException {
+ FileInputStreamCache fisCache,
+ CachingStrategy cachingStrategy) throws IOException {
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream()));
@@ -189,9 +192,18 @@ public class BlockReaderFactory {
FileInputStream fis[] = new FileInputStream[2];
sock.recvFileInputStreams(fis, buf, 0, buf.length);
try {
- reader = new BlockReaderLocal(conf, file, block,
- startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum,
- fisCache);
+ reader = new BlockReaderLocal.Builder(conf).
+ setFilename(file).
+ setBlock(block).
+ setStartOffset(startOffset).
+ setStreams(fis).
+ setDatanodeID(datanodeID).
+ setVerifyChecksum(verifyChecksum).
+ setBlockMetadataHeader(
+ BlockMetadataHeader.preadHeader(fis[1].getChannel())).
+ setFileInputStreamCache(fisCache).
+ setCachingStrategy(cachingStrategy).
+ build();
} finally {
if (reader == null) {
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Tue Dec 17 20:57:00 2013
@@ -17,25 +17,30 @@
*/
package org.apache.hadoop.hdfs;
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.hadoop.conf.Configuration;
+import java.nio.channels.FileChannel;
+import java.util.EnumSet;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
/**
* BlockReaderLocal enables local short circuited reads. If the DFS client is on
* the same machine as the datanode, then the client can read files directly
@@ -55,446 +60,581 @@ import org.apache.hadoop.util.DataChecks
class BlockReaderLocal implements BlockReader {
static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
- private final FileInputStream dataIn; // reader for the data file
- private final FileInputStream checksumIn; // reader for the checksum file
- private final boolean verifyChecksum;
+ private static DirectBufferPool bufferPool = new DirectBufferPool();
+
+ public static class Builder {
+ private int bufferSize;
+ private boolean verifyChecksum;
+ private int maxReadahead;
+ private String filename;
+ private FileInputStream streams[];
+ private long dataPos;
+ private DatanodeID datanodeID;
+ private FileInputStreamCache fisCache;
+ private boolean mlocked;
+ private BlockMetadataHeader header;
+ private ExtendedBlock block;
+
+ public Builder(Conf conf) {
+ this.maxReadahead = Integer.MAX_VALUE;
+ this.verifyChecksum = !conf.skipShortCircuitChecksums;
+ this.bufferSize = conf.shortCircuitBufferSize;
+ }
+
+ public Builder setVerifyChecksum(boolean verifyChecksum) {
+ this.verifyChecksum = verifyChecksum;
+ return this;
+ }
+
+ public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
+ long readahead = cachingStrategy.getReadahead() != null ?
+ cachingStrategy.getReadahead() :
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
+ this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
+ return this;
+ }
+
+ public Builder setFilename(String filename) {
+ this.filename = filename;
+ return this;
+ }
+
+ public Builder setStreams(FileInputStream streams[]) {
+ this.streams = streams;
+ return this;
+ }
+
+ public Builder setStartOffset(long startOffset) {
+ this.dataPos = Math.max(0, startOffset);
+ return this;
+ }
+
+ public Builder setDatanodeID(DatanodeID datanodeID) {
+ this.datanodeID = datanodeID;
+ return this;
+ }
+
+ public Builder setFileInputStreamCache(FileInputStreamCache fisCache) {
+ this.fisCache = fisCache;
+ return this;
+ }
+
+ public Builder setMlocked(boolean mlocked) {
+ this.mlocked = mlocked;
+ return this;
+ }
+
+ public Builder setBlockMetadataHeader(BlockMetadataHeader header) {
+ this.header = header;
+ return this;
+ }
+
+ public Builder setBlock(ExtendedBlock block) {
+ this.block = block;
+ return this;
+ }
+
+ public BlockReaderLocal build() {
+ Preconditions.checkNotNull(streams);
+ Preconditions.checkArgument(streams.length == 2);
+ Preconditions.checkNotNull(header);
+ return new BlockReaderLocal(this);
+ }
+ }
+
+ private boolean closed = false;
/**
- * Offset from the most recent chunk boundary at which the next read should
- * take place. Is only set to non-zero at construction time, and is
- * decremented (usually to 0) by subsequent reads. This avoids having to do a
- * checksum read at construction to position the read cursor correctly.
+ * Pair of streams for this block.
*/
- private int offsetFromChunkBoundary;
-
- private byte[] skipBuf = null;
+ private final FileInputStream streams[];
/**
- * Used for checksummed reads that need to be staged before copying to their
- * output buffer because they are either a) smaller than the checksum chunk
- * size or b) issued by the slower read(byte[]...) path
+ * The data FileChannel.
*/
- private ByteBuffer slowReadBuff = null;
- private ByteBuffer checksumBuff = null;
- private DataChecksum checksum;
+ private final FileChannel dataIn;
- private static DirectBufferPool bufferPool = new DirectBufferPool();
+ /**
+ * The next place we'll read from in the block data FileChannel.
+ *
+ * If data is buffered in dataBuf, this offset will be larger than the
+ * offset of the next byte which a read() operation will give us.
+ */
+ private long dataPos;
+
+ /**
+ * The Checksum FileChannel.
+ */
+ private final FileChannel checksumIn;
+
+ /**
+ * Checksum type and size.
+ */
+ private final DataChecksum checksum;
- private final int bytesPerChecksum;
- private final int checksumSize;
+ /**
+ * If false, we will always skip the checksum.
+ */
+ private final boolean verifyChecksum;
+
+ /**
+ * If true, this block is mlocked on the DataNode.
+ */
+ private final AtomicBoolean mlocked;
- /** offset in block where reader wants to actually read */
- private long startOffset;
+ /**
+ * Name of the block, for logging purposes.
+ */
private final String filename;
+ /**
+ * DataNode which contained this block.
+ */
private final DatanodeID datanodeID;
+
+ /**
+ * Block ID and Block Pool ID.
+ */
private final ExtendedBlock block;
+ /**
+ * Cache of Checksum#bytesPerChecksum.
+ */
+ private int bytesPerChecksum;
+
+ /**
+ * Cache of Checksum#checksumSize.
+ */
+ private int checksumSize;
+
+ /**
+ * FileInputStream cache to return the streams to upon closing,
+ * or null if we should just close them unconditionally.
+ */
private final FileInputStreamCache fisCache;
+
+ /**
+ * Maximum number of chunks to allocate.
+ *
+ * This is used to allocate dataBuf and checksumBuf, in the event that
+ * we need them.
+ */
+ private final int maxAllocatedChunks;
+
+ /**
+ * True if zero readahead was requested.
+ */
+ private final boolean zeroReadaheadRequested;
+
+ /**
+ * Maximum amount of readahead we'll do. This will always be at least the,
+ * size of a single chunk, even if {@link zeroReadaheadRequested} is true.
+ * The reason is because we need to do a certain amount of buffering in order
+ * to do checksumming.
+ *
+ * This determines how many bytes we'll use out of dataBuf and checksumBuf.
+ * Why do we allocate buffers, and then (potentially) only use part of them?
+ * The rationale is that allocating a lot of buffers of different sizes would
+ * make it very difficult for the DirectBufferPool to re-use buffers.
+ */
+ private int maxReadaheadLength;
+
private ClientMmap clientMmap;
- private boolean mmapDisabled;
-
- private static int getSlowReadBufferNumChunks(int bufSize,
- int bytesPerChecksum) {
- if (bufSize < bytesPerChecksum) {
- throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
- bufSize + ") is not large enough to hold a single chunk (" +
- bytesPerChecksum + "). Please configure " +
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
- }
-
- // Round down to nearest chunk size
- return bufSize / bytesPerChecksum;
- }
-
- public BlockReaderLocal(DFSClient.Conf conf, String filename,
- ExtendedBlock block, long startOffset, long length,
- FileInputStream dataIn, FileInputStream checksumIn,
- DatanodeID datanodeID, boolean verifyChecksum,
- FileInputStreamCache fisCache) throws IOException {
- this.dataIn = dataIn;
- this.checksumIn = checksumIn;
- this.startOffset = Math.max(startOffset, 0);
- this.filename = filename;
- this.datanodeID = datanodeID;
- this.block = block;
- this.fisCache = fisCache;
- this.clientMmap = null;
- this.mmapDisabled = false;
-
- // read and handle the common header here. For now just a version
- checksumIn.getChannel().position(0);
- BlockMetadataHeader header = BlockMetadataHeader
- .readHeader(new DataInputStream(
- new BufferedInputStream(checksumIn,
- BlockMetadataHeader.getHeaderSize())));
- short version = header.getVersion();
- if (version != BlockMetadataHeader.VERSION) {
- throw new IOException("Wrong version (" + version + ") of the " +
- "metadata file for " + filename + ".");
- }
- this.verifyChecksum = verifyChecksum && !conf.skipShortCircuitChecksums;
- long firstChunkOffset;
- if (this.verifyChecksum) {
- this.checksum = header.getChecksum();
- this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
- this.checksumSize = this.checksum.getChecksumSize();
- firstChunkOffset = startOffset
- - (startOffset % checksum.getBytesPerChecksum());
- this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
-
- int chunksPerChecksumRead = getSlowReadBufferNumChunks(
- conf.shortCircuitBufferSize, bytesPerChecksum);
- slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
- checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
- // Initially the buffers have nothing to read.
- slowReadBuff.flip();
- checksumBuff.flip();
- long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
- IOUtils.skipFully(checksumIn, checkSumOffset);
+
+ /**
+ * Buffers data starting at the current dataPos and extending on
+ * for dataBuf.limit().
+ *
+ * This may be null if we don't need it.
+ */
+ private ByteBuffer dataBuf;
+
+ /**
+ * Buffers checksums starting at the current checksumPos and extending on
+ * for checksumBuf.limit().
+ *
+ * This may be null if we don't need it.
+ */
+ private ByteBuffer checksumBuf;
+
+ private boolean mmapDisabled = false;
+
+ private BlockReaderLocal(Builder builder) {
+ this.streams = builder.streams;
+ this.dataIn = builder.streams[0].getChannel();
+ this.dataPos = builder.dataPos;
+ this.checksumIn = builder.streams[1].getChannel();
+ this.checksum = builder.header.getChecksum();
+ this.verifyChecksum = builder.verifyChecksum &&
+ (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
+ this.mlocked = new AtomicBoolean(builder.mlocked);
+ this.filename = builder.filename;
+ this.datanodeID = builder.datanodeID;
+ this.fisCache = builder.fisCache;
+ this.block = builder.block;
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
+ this.checksumSize = checksum.getChecksumSize();
+
+ this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
+ ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
+ // Calculate the effective maximum readahead.
+ // We can't do more readahead than there is space in the buffer.
+ int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
+ ((Math.min(builder.bufferSize, builder.maxReadahead) +
+ bytesPerChecksum - 1) / bytesPerChecksum);
+ if (maxReadaheadChunks == 0) {
+ this.zeroReadaheadRequested = true;
+ maxReadaheadChunks = 1;
} else {
- firstChunkOffset = startOffset;
- this.checksum = null;
- this.bytesPerChecksum = 0;
- this.checksumSize = 0;
- this.offsetFromChunkBoundary = 0;
+ this.zeroReadaheadRequested = false;
}
-
- boolean success = false;
+ this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
+ }
+
+ private synchronized void createDataBufIfNeeded() {
+ if (dataBuf == null) {
+ dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
+ dataBuf.position(0);
+ dataBuf.limit(0);
+ }
+ }
+
+ private synchronized void freeDataBufIfExists() {
+ if (dataBuf != null) {
+ // When disposing of a dataBuf, we have to move our stored file index
+ // backwards.
+ dataPos -= dataBuf.remaining();
+ dataBuf.clear();
+ bufferPool.returnBuffer(dataBuf);
+ dataBuf = null;
+ }
+ }
+
+ private synchronized void createChecksumBufIfNeeded() {
+ if (checksumBuf == null) {
+ checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
+ checksumBuf.position(0);
+ checksumBuf.limit(0);
+ }
+ }
+
+ private synchronized void freeChecksumBufIfExists() {
+ if (checksumBuf != null) {
+ checksumBuf.clear();
+ bufferPool.returnBuffer(checksumBuf);
+ checksumBuf = null;
+ }
+ }
+
+ private synchronized int drainDataBuf(ByteBuffer buf)
+ throws IOException {
+ if (dataBuf == null) return 0;
+ int oldLimit = dataBuf.limit();
+ int nRead = Math.min(dataBuf.remaining(), buf.remaining());
+ if (nRead == 0) return 0;
try {
- // Reposition both input streams to the beginning of the chunk
- // containing startOffset
- this.dataIn.getChannel().position(firstChunkOffset);
- success = true;
+ dataBuf.limit(dataBuf.position() + nRead);
+ buf.put(dataBuf);
} finally {
- if (success) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created BlockReaderLocal for file " + filename
- + " block " + block + " in datanode " + datanodeID);
- }
- } else {
- if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
- if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
- }
+ dataBuf.limit(oldLimit);
}
+ return nRead;
}
/**
- * Reads bytes into a buffer until EOF or the buffer's limit is reached
+ * Read from the block file into a buffer.
+ *
+ * This function overwrites checksumBuf. It will increment dataPos.
+ *
+ * @param buf The buffer to read into. May be dataBuf.
+ * The position and limit of this buffer should be set to
+ * multiples of the checksum size.
+ * @param canSkipChecksum True if we can skip checksumming.
+ *
+ * @return Total bytes read. 0 on EOF.
*/
- private int fillBuffer(FileInputStream stream, ByteBuffer buf)
+ private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
throws IOException {
- int bytesRead = stream.getChannel().read(buf);
- if (bytesRead < 0) {
- //EOF
- return bytesRead;
- }
- while (buf.remaining() > 0) {
- int n = stream.getChannel().read(buf);
- if (n < 0) {
- //EOF
- return bytesRead;
- }
- bytesRead += n;
+ int total = 0;
+ long startDataPos = dataPos;
+ int startBufPos = buf.position();
+ while (buf.hasRemaining()) {
+ int nRead = dataIn.read(buf, dataPos);
+ if (nRead < 0) {
+ break;
+ }
+ dataPos += nRead;
+ total += nRead;
+ }
+ if (canSkipChecksum) {
+ freeChecksumBufIfExists();
+ return total;
}
- return bytesRead;
- }
+ if (total > 0) {
+ try {
+ buf.limit(buf.position());
+ buf.position(startBufPos);
+ createChecksumBufIfNeeded();
+ int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
+ checksumBuf.clear();
+ checksumBuf.limit(checksumsNeeded * checksumSize);
+ long checksumPos =
+ 7 + ((startDataPos / bytesPerChecksum) * checksumSize);
+ while (checksumBuf.hasRemaining()) {
+ int nRead = checksumIn.read(checksumBuf, checksumPos);
+ if (nRead < 0) {
+ throw new IOException("Got unexpected checksum file EOF at " +
+ checksumPos + ", block file position " + startDataPos + " for " +
+ "block " + block + " of file " + filename);
+ }
+ checksumPos += nRead;
+ }
+ checksumBuf.flip();
- /**
- * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
- * another.
- */
- private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
- int oldLimit = from.limit();
- from.limit(from.position() + length);
- try {
- to.put(from);
- } finally {
- from.limit(oldLimit);
+ checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
+ } finally {
+ buf.position(buf.limit());
+ }
}
+ return total;
}
+ private boolean getCanSkipChecksum() {
+ return (!verifyChecksum) || mlocked.get();
+ }
+
@Override
public synchronized int read(ByteBuffer buf) throws IOException {
- int nRead = 0;
- if (verifyChecksum) {
- // A 'direct' read actually has three phases. The first drains any
- // remaining bytes from the slow read buffer. After this the read is
- // guaranteed to be on a checksum chunk boundary. If there are still bytes
- // to read, the fast direct path is used for as many remaining bytes as
- // possible, up to a multiple of the checksum chunk size. Finally, any
- // 'odd' bytes remaining at the end of the read cause another slow read to
- // be issued, which involves an extra copy.
-
- // Every 'slow' read tries to fill the slow read buffer in one go for
- // efficiency's sake. As described above, all non-checksum-chunk-aligned
- // reads will be served from the slower read path.
-
- if (slowReadBuff.hasRemaining()) {
- // There are remaining bytes from a small read available. This usually
- // means this read is unaligned, which falls back to the slow path.
- int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining());
- writeSlice(slowReadBuff, buf, fromSlowReadBuff);
- nRead += fromSlowReadBuff;
- }
-
- if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
- // Since we have drained the 'small read' buffer, we are guaranteed to
- // be chunk-aligned
- int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
-
- // There's only enough checksum buffer space available to checksum one
- // entire slow read buffer. This saves keeping the number of checksum
- // chunks around.
- len = Math.min(len, slowReadBuff.capacity());
- int oldlimit = buf.limit();
- buf.limit(buf.position() + len);
- int readResult = 0;
- try {
- readResult = doByteBufferRead(buf);
- } finally {
- buf.limit(oldlimit);
- }
- if (readResult == -1) {
- return nRead;
- } else {
- nRead += readResult;
- buf.position(buf.position() + readResult);
- }
- }
-
- // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
- // until chunk boundary
- if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) {
- int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary);
- int readResult = fillSlowReadBuffer(toRead);
- if (readResult == -1) {
- return nRead;
- } else {
- int fromSlowReadBuff = Math.min(readResult, buf.remaining());
- writeSlice(slowReadBuff, buf, fromSlowReadBuff);
- nRead += fromSlowReadBuff;
- }
+ boolean canSkipChecksum = getCanSkipChecksum();
+
+ String traceString = null;
+ if (LOG.isTraceEnabled()) {
+ traceString = new StringBuilder().
+ append("read(").
+ append("buf.remaining=").append(buf.remaining()).
+ append(", block=").append(block).
+ append(", filename=").append(filename).
+ append(", canSkipChecksum=").append(canSkipChecksum).
+ append(")").toString();
+ LOG.info(traceString + ": starting");
+ }
+ int nRead;
+ try {
+ if (canSkipChecksum && zeroReadaheadRequested) {
+ nRead = readWithoutBounceBuffer(buf);
+ } else {
+ nRead = readWithBounceBuffer(buf, canSkipChecksum);
}
- } else {
- // Non-checksummed reads are much easier; we can just fill the buffer directly.
- nRead = doByteBufferRead(buf);
- if (nRead > 0) {
- buf.position(buf.position() + nRead);
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.info(traceString + ": I/O error", e);
}
+ throw e;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.info(traceString + ": returning " + nRead);
}
return nRead;
}
+ private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
+ throws IOException {
+ freeDataBufIfExists();
+ freeChecksumBufIfExists();
+ int total = 0;
+ while (buf.hasRemaining()) {
+ int nRead = dataIn.read(buf, dataPos);
+ if (nRead < 0) {
+ break;
+ }
+ dataPos += nRead;
+ total += nRead;
+ }
+ return (total == 0) ? -1 : total;
+ }
+
/**
- * Tries to read as many bytes as possible into supplied buffer, checksumming
- * each chunk if needed.
- *
- * <b>Preconditions:</b>
- * <ul>
- * <li>
- * If checksumming is enabled, buf.remaining must be a multiple of
- * bytesPerChecksum. Note that this is not a requirement for clients of
- * read(ByteBuffer) - in the case of non-checksum-sized read requests,
- * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
- * method.
- * </li>
- * </ul>
- * <b>Postconditions:</b>
- * <ul>
- * <li>buf.limit and buf.mark are unchanged.</li>
- * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
- * requested bytes can be read straight from the buffer</li>
- * </ul>
- *
- * @param buf
- * byte buffer to write bytes to. If checksums are not required, buf
- * can have any number of bytes remaining, otherwise there must be a
- * multiple of the checksum chunk size remaining.
- * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
- * that is, the the number of useful bytes (up to the amount
- * requested) readable from the buffer by the client.
- */
- private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
- if (verifyChecksum) {
- assert buf.remaining() % bytesPerChecksum == 0;
- }
- int dataRead = -1;
-
- int oldpos = buf.position();
- // Read as much as we can into the buffer.
- dataRead = fillBuffer(dataIn, buf);
-
- if (dataRead == -1) {
- return -1;
- }
-
- if (verifyChecksum) {
- ByteBuffer toChecksum = buf.duplicate();
- toChecksum.position(oldpos);
- toChecksum.limit(oldpos + dataRead);
-
- checksumBuff.clear();
- // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
- int numChunks =
- (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
- checksumBuff.limit(checksumSize * numChunks);
-
- fillBuffer(checksumIn, checksumBuff);
- checksumBuff.flip();
-
- checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
- this.startOffset);
- }
-
- if (dataRead >= 0) {
- buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
- }
-
- if (dataRead < offsetFromChunkBoundary) {
- // yikes, didn't even get enough bytes to honour offset. This can happen
- // even if we are verifying checksums if we are at EOF.
- offsetFromChunkBoundary -= dataRead;
- dataRead = 0;
+ * Fill the data buffer. If necessary, validate the data against the
+ * checksums.
+ *
+ * We always want the offsets of the data contained in dataBuf to be
+ * aligned to the chunk boundary. If we are validating checksums, we
+ * accomplish this by seeking backwards in the file until we're on a
+ * chunk boundary. (This is necessary because we can't checksum a
+ * partial chunk.) If we are not validating checksums, we simply only
+ * fill the latter part of dataBuf.
+ *
+ * @param canSkipChecksum true if we can skip checksumming.
+ * @return true if we hit EOF.
+ * @throws IOException
+ */
+ private synchronized boolean fillDataBuf(boolean canSkipChecksum)
+ throws IOException {
+ createDataBufIfNeeded();
+ final int slop = (int)(dataPos % bytesPerChecksum);
+ final long oldDataPos = dataPos;
+ dataBuf.limit(maxReadaheadLength);
+ if (canSkipChecksum) {
+ dataBuf.position(slop);
+ fillBuffer(dataBuf, canSkipChecksum);
} else {
- dataRead -= offsetFromChunkBoundary;
- offsetFromChunkBoundary = 0;
+ dataPos -= slop;
+ dataBuf.position(0);
+ fillBuffer(dataBuf, canSkipChecksum);
}
-
- return dataRead;
+ dataBuf.limit(dataBuf.position());
+ dataBuf.position(Math.min(dataBuf.position(), slop));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
+ "buffer from offset " + oldDataPos + " of " + block);
+ }
+ return dataBuf.limit() != maxReadaheadLength;
}
/**
- * Ensures that up to len bytes are available and checksummed in the slow read
- * buffer. The number of bytes available to read is returned. If the buffer is
- * not already empty, the number of remaining bytes is returned and no actual
- * read happens.
+ * Read using the bounce buffer.
+ *
+ * A 'direct' read actually has three phases. The first drains any
+ * remaining bytes from the slow read buffer. After this the read is
+ * guaranteed to be on a checksum chunk boundary. If there are still bytes
+ * to read, the fast direct path is used for as many remaining bytes as
+ * possible, up to a multiple of the checksum chunk size. Finally, any
+ * 'odd' bytes remaining at the end of the read cause another slow read to
+ * be issued, which involves an extra copy.
+ *
+ * Every 'slow' read tries to fill the slow read buffer in one go for
+ * efficiency's sake. As described above, all non-checksum-chunk-aligned
+ * reads will be served from the slower read path.
*
- * @param len
- * the maximum number of bytes to make available. After len bytes
- * are read, the underlying bytestream <b>must</b> be at a checksum
- * boundary, or EOF. That is, (len + currentPosition) %
- * bytesPerChecksum == 0.
- * @return the number of bytes available to read, or -1 if EOF.
+ * @param buf The buffer to read into.
+ * @param canSkipChecksum True if we can skip checksums.
*/
- private synchronized int fillSlowReadBuffer(int len) throws IOException {
- int nRead = -1;
- if (slowReadBuff.hasRemaining()) {
- // Already got data, good to go.
- nRead = Math.min(len, slowReadBuff.remaining());
- } else {
- // Round a complete read of len bytes (plus any implicit offset) to the
- // next chunk boundary, since we try and read in multiples of a chunk
- int nextChunk = len + offsetFromChunkBoundary +
- (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum));
- int limit = Math.min(nextChunk, slowReadBuff.capacity());
- assert limit % bytesPerChecksum == 0;
-
- slowReadBuff.clear();
- slowReadBuff.limit(limit);
-
- nRead = doByteBufferRead(slowReadBuff);
-
- if (nRead > 0) {
- // So that next time we call slowReadBuff.hasRemaining(), we don't get a
- // false positive.
- slowReadBuff.limit(nRead + slowReadBuff.position());
+ private synchronized int readWithBounceBuffer(ByteBuffer buf,
+ boolean canSkipChecksum) throws IOException {
+ int total = 0;
+ boolean eof = false;
+ while (true) {
+ int bb = drainDataBuf(buf); // drain bounce buffer if possible
+ total += bb;
+ int needed = buf.remaining();
+ if (eof || (needed == 0)) {
+ break;
+ } else if (buf.isDirect() && (needed >= maxReadaheadLength)
+ && ((dataPos % bytesPerChecksum) == 0)) {
+ // Fast lane: try to read directly into user-supplied buffer, bypassing
+ // bounce buffer.
+ int oldLimit = buf.limit();
+ int nRead;
+ try {
+ buf.limit(buf.position() + maxReadaheadLength);
+ nRead = fillBuffer(buf, canSkipChecksum);
+ } finally {
+ buf.limit(oldLimit);
+ }
+ if (nRead < maxReadaheadLength) {
+ eof = true;
+ }
+ total += nRead;
+ } else {
+ // Slow lane: refill bounce buffer.
+ if (fillDataBuf(canSkipChecksum)) {
+ eof = true;
+ }
}
}
- return nRead;
+ return total == 0 ? -1 : total;
}
@Override
- public synchronized int read(byte[] buf, int off, int len) throws IOException {
+ public synchronized int read(byte[] arr, int off, int len)
+ throws IOException {
+ boolean canSkipChecksum = getCanSkipChecksum();
+ String traceString = null;
if (LOG.isTraceEnabled()) {
- LOG.trace("read off " + off + " len " + len);
+ traceString = new StringBuilder().
+ append("read(arr.length=").append(arr.length).
+ append(", off=").append(off).
+ append(", len=").append(len).
+ append(", filename=").append(filename).
+ append(", block=").append(block).
+ append(", canSkipChecksum=").append(canSkipChecksum).
+ append(")").toString();
+ LOG.trace(traceString + ": starting");
}
- if (!verifyChecksum) {
- return dataIn.read(buf, off, len);
+ int nRead;
+ try {
+ if (canSkipChecksum && zeroReadaheadRequested) {
+ nRead = readWithoutBounceBuffer(arr, off, len);
+ } else {
+ nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+ }
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(traceString + ": I/O error", e);
+ }
+ throw e;
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(traceString + ": returning " + nRead);
+ }
+ return nRead;
+ }
- int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
-
+ private synchronized int readWithoutBounceBuffer(byte arr[], int off,
+ int len) throws IOException {
+ freeDataBufIfExists();
+ freeChecksumBufIfExists();
+ int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
if (nRead > 0) {
- // Possible that buffer is filled with a larger read than we need, since
- // we tried to read as much as possible at once
- nRead = Math.min(len, nRead);
- slowReadBuff.get(buf, off, nRead);
+ dataPos += nRead;
}
+ return nRead == 0 ? -1 : nRead;
+ }
- return nRead;
+ private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
+ boolean canSkipChecksum) throws IOException {
+ createDataBufIfNeeded();
+ if (!dataBuf.hasRemaining()) {
+ dataBuf.position(0);
+ dataBuf.limit(maxReadaheadLength);
+ fillDataBuf(canSkipChecksum);
+ }
+ int toRead = Math.min(dataBuf.remaining(), len);
+ dataBuf.get(arr, off, toRead);
+ return toRead == 0 ? -1 : toRead;
}
@Override
public synchronized long skip(long n) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("skip " + n);
- }
- if (n <= 0) {
- return 0;
- }
- if (!verifyChecksum) {
- return dataIn.skip(n);
- }
-
- // caller made sure newPosition is not beyond EOF.
- int remaining = slowReadBuff.remaining();
- int position = slowReadBuff.position();
- int newPosition = position + (int)n;
-
- // if the new offset is already read into dataBuff, just reposition
- if (n <= remaining) {
- assert offsetFromChunkBoundary == 0;
- slowReadBuff.position(newPosition);
- return n;
- }
-
- // for small gap, read through to keep the data/checksum in sync
- if (n - remaining <= bytesPerChecksum) {
- slowReadBuff.position(position + remaining);
- if (skipBuf == null) {
- skipBuf = new byte[bytesPerChecksum];
- }
- int ret = read(skipBuf, 0, (int)(n - remaining));
- return ret;
+ int discardedFromBuf = 0;
+ long remaining = n;
+ if ((dataBuf != null) && dataBuf.hasRemaining()) {
+ discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
+ dataBuf.position(dataBuf.position() + discardedFromBuf);
+ remaining -= discardedFromBuf;
}
-
- // optimize for big gap: discard the current buffer, skip to
- // the beginning of the appropriate checksum chunk and then
- // read to the middle of that chunk to be in sync with checksums.
-
- // We can't use this.offsetFromChunkBoundary because we need to know how
- // many bytes of the offset were really read. Calling read(..) with a
- // positive this.offsetFromChunkBoundary causes that many bytes to get
- // silently skipped.
- int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
- long toskip = n - remaining - myOffsetFromChunkBoundary;
-
- slowReadBuff.position(slowReadBuff.limit());
- checksumBuff.position(checksumBuff.limit());
-
- IOUtils.skipFully(dataIn, toskip);
- long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
- IOUtils.skipFully(checksumIn, checkSumOffset);
-
- // read into the middle of the chunk
- if (skipBuf == null) {
- skipBuf = new byte[bytesPerChecksum];
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" +
+ filename + "): discarded " + discardedFromBuf + " bytes from " +
+ "dataBuf and advanced dataPos by " + remaining);
}
- assert skipBuf.length == bytesPerChecksum;
- assert myOffsetFromChunkBoundary < bytesPerChecksum;
-
- int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+ dataPos += remaining;
+ return n;
+ }
- if (ret == -1) { // EOS
- return toskip;
- } else {
- return (toskip + ret);
- }
+ @Override
+ public int available() throws IOException {
+ // We never do network I/O in BlockReaderLocal.
+ return Integer.MAX_VALUE;
}
@Override
public synchronized void close() throws IOException {
+ if (closed) return;
+ closed = true;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("close(filename=" + filename + ", block=" + block + ")");
+ }
if (clientMmap != null) {
clientMmap.unref();
clientMmap = null;
@@ -504,58 +644,55 @@ class BlockReaderLocal implements BlockR
LOG.debug("putting FileInputStream for " + filename +
" back into FileInputStreamCache");
}
- fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn});
+ fisCache.put(datanodeID, block, streams);
} else {
LOG.debug("closing FileInputStream for " + filename);
IOUtils.cleanup(LOG, dataIn, checksumIn);
}
- if (slowReadBuff != null) {
- bufferPool.returnBuffer(slowReadBuff);
- slowReadBuff = null;
- }
- if (checksumBuff != null) {
- bufferPool.returnBuffer(checksumBuff);
- checksumBuff = null;
- }
- startOffset = -1;
- checksum = null;
+ freeDataBufIfExists();
+ freeChecksumBufIfExists();
}
@Override
- public int readAll(byte[] buf, int offset, int len) throws IOException {
- return BlockReaderUtil.readAll(this, buf, offset, len);
- }
-
- @Override
- public void readFully(byte[] buf, int off, int len) throws IOException {
- BlockReaderUtil.readFully(this, buf, off, len);
+ public synchronized void readFully(byte[] arr, int off, int len)
+ throws IOException {
+ BlockReaderUtil.readFully(this, arr, off, len);
}
@Override
- public int available() throws IOException {
- // We never do network I/O in BlockReaderLocal.
- return Integer.MAX_VALUE;
+ public synchronized int readAll(byte[] buf, int off, int len)
+ throws IOException {
+ return BlockReaderUtil.readAll(this, buf, off, len);
}
@Override
public boolean isLocal() {
return true;
}
-
+
@Override
public boolean isShortCircuit() {
return true;
}
@Override
- public ClientMmap getClientMmap(LocatedBlock curBlock,
- ClientMmapManager mmapManager) {
+ public synchronized ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+ ClientMmapManager mmapManager) {
+ if ((!opts.contains(ReadOption.SKIP_CHECKSUMS)) &&
+ verifyChecksum && (!mlocked.get())) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("can't get an mmap for " + block + " of " + filename +
+ " since SKIP_CHECKSUMS was not given, " +
+ "we aren't skipping checksums, and the block is not mlocked.");
+ }
+ return null;
+ }
if (clientMmap == null) {
if (mmapDisabled) {
return null;
}
try {
- clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
+ clientMmap = mmapManager.fetch(datanodeID, block, streams[0]);
if (clientMmap == null) {
mmapDisabled = true;
return null;
@@ -572,4 +709,24 @@ class BlockReaderLocal implements BlockR
}
return clientMmap;
}
+
+ /**
+ * Set the mlocked state of the BlockReader.
+ * This method does NOT need to be synchronized because mlocked is atomic.
+ *
+ * @param mlocked the new mlocked state of the BlockReader.
+ */
+ public void setMlocked(boolean mlocked) {
+ this.mlocked.set(mlocked);
+ }
+
+ @VisibleForTesting
+ boolean getVerifyChecksum() {
+ return this.verifyChecksum;
+ }
+
+ @VisibleForTesting
+ int getMaxReadaheadLength() {
+ return this.maxReadaheadLength;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Tue Dec 17 20:57:00 2013
@@ -24,10 +24,12 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.commons.logging.Log;
@@ -706,8 +708,8 @@ class BlockReaderLocalLegacy implements
}
@Override
- public ClientMmap getClientMmap(LocatedBlock curBlock,
- ClientMmapManager mmapManager) {
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+ ClientMmapManager mmapManager) {
return null;
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Dec 17 20:57:00 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.io.ByteBufferPool;
@@ -1073,9 +1074,18 @@ implements ByteBufferReadable, CanSetDro
DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
"the FileInputStreamCache.");
}
- return new BlockReaderLocal(dfsClient.getConf(), file,
- block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum,
- fileInputStreamCache);
+ return new BlockReaderLocal.Builder(dfsClient.getConf()).
+ setFilename(file).
+ setBlock(block).
+ setStartOffset(startOffset).
+ setStreams(fis).
+ setDatanodeID(chosenNode).
+ setVerifyChecksum(verifyChecksum).
+ setBlockMetadataHeader(BlockMetadataHeader.
+ preadHeader(fis[1].getChannel())).
+ setFileInputStreamCache(fileInputStreamCache).
+ setCachingStrategy(cachingStrategy).
+ build();
}
// If the legacy local block reader is enabled and we are reading a local
@@ -1479,23 +1489,19 @@ implements ByteBufferReadable, CanSetDro
"at position " + pos);
}
}
- boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS);
- if (canSkipChecksums) {
- ByteBuffer buffer = tryReadZeroCopy(maxLength);
- if (buffer != null) {
- return buffer;
- }
+ ByteBuffer buffer = tryReadZeroCopy(maxLength, opts);
+ if (buffer != null) {
+ return buffer;
}
- ByteBuffer buffer = ByteBufferUtil.
- fallbackRead(this, bufferPool, maxLength);
+ buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
extendedReadBuffers.put(buffer, bufferPool);
}
return buffer;
}
- private synchronized ByteBuffer tryReadZeroCopy(int maxLength)
- throws IOException {
+ private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
+ EnumSet<ReadOption> opts) throws IOException {
// Java ByteBuffers can't be longer than 2 GB, because they use
// 4-byte signed integers to represent capacity, etc.
// So we can't mmap the parts of the block higher than the 2 GB offset.
@@ -1518,8 +1524,7 @@ implements ByteBufferReadable, CanSetDro
long blockPos = curPos - blockStartInFile;
long limit = blockPos + length;
ClientMmap clientMmap =
- blockReader.getClientMmap(currentLocatedBlock,
- dfsClient.getMmapManager());
+ blockReader.getClientMmap(opts, dfsClient.getMmapManager());
if (clientMmap == null) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Dec 17 20:57:00 2013
@@ -23,10 +23,12 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
@@ -490,8 +492,8 @@ public class RemoteBlockReader extends F
}
@Override
- public ClientMmap getClientMmap(LocatedBlock curBlock,
- ClientMmapManager mmapManager) {
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+ ClientMmapManager mmapManager) {
return null;
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Dec 17 20:57:00 2013
@@ -25,10 +25,12 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
@@ -455,8 +457,8 @@ public class RemoteBlockReader2 impleme
}
@Override
- public ClientMmap getClientMmap(LocatedBlock curBlock,
- ClientMmapManager manager) {
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+ ClientMmapManager mmapManager) {
return null;
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Tue Dec 17 20:57:00 2013
@@ -21,10 +21,13 @@ import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
@@ -67,7 +70,29 @@ public class BlockMetadataHeader {
return checksum;
}
-
+ /**
+ * Read the header without changing the position of the FileChannel.
+ *
+ * @param fc The FileChannel to read.
+ * @return the Metadata Header.
+ * @throws IOException on error.
+ */
+ public static BlockMetadataHeader preadHeader(FileChannel fc)
+ throws IOException {
+ byte arr[] = new byte[2 + DataChecksum.HEADER_LEN];
+ ByteBuffer buf = ByteBuffer.wrap(arr);
+
+ while (buf.hasRemaining()) {
+ if (fc.read(buf, 0) <= 0) {
+ throw new EOFException("unexpected EOF while reading " +
+ "metadata file header");
+ }
+ }
+ short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
+ DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2);
+ return new BlockMetadataHeader(version, dataChecksum);
+ }
+
/**
* This reads all the fields till the beginning of checksum.
* @param in
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Tue Dec 17 20:57:00 2013
@@ -1394,12 +1394,15 @@
<name>dfs.client.cache.readahead</name>
<value></value>
<description>
- Just like dfs.datanode.readahead.bytes, this setting causes the datanode to
+ When using remote reads, this setting causes the datanode to
read ahead in the block file using posix_fadvise, potentially decreasing
I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side
setting rather than a setting for the entire datanode. If present, this
setting will override the DataNode default.
+ When using local reads, this setting determines how much readahead we do in
+ BlockReaderLocal.
+
If the native libraries are not available to the DataNode, this
configuration has no effect.
</description>
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Tue Dec 17 20:57:00 2013
@@ -58,6 +58,7 @@ import org.apache.hadoop.util.VersionInf
import java.io.*;
import java.net.*;
+import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.TimeoutException;
@@ -1059,4 +1060,10 @@ public class DFSTestUtil {
public static void abortStream(DFSOutputStream out) throws IOException {
out.abort();
}
+
+ public static byte[] asArray(ByteBuffer buf) {
+ byte arr[] = new byte[buf.remaining()];
+ buf.duplicate().get(arr);
+ return arr;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Tue Dec 17 20:57:00 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@@ -92,22 +94,35 @@ public class TestBlockReaderLocal {
}
}
- private static interface BlockReaderLocalTest {
- final int TEST_LENGTH = 12345;
+ private static class BlockReaderLocalTest {
+ final static int TEST_LENGTH = 12345;
+ final static int BYTES_PER_CHECKSUM = 512;
+
+ public void setConfiguration(HdfsConfiguration conf) {
+ // default: no-op
+ }
public void setup(File blockFile, boolean usingChecksums)
- throws IOException;
+ throws IOException {
+ // default: no-op
+ }
public void doTest(BlockReaderLocal reader, byte original[])
- throws IOException;
+ throws IOException {
+ // default: no-op
+ }
}
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
- boolean checksum) throws IOException {
+ boolean checksum, long readahead) throws IOException {
MiniDFSCluster cluster = null;
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
+ conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+ BlockReaderLocalTest.BYTES_PER_CHECKSUM);
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
- FileInputStream dataIn = null, checkIn = null;
+ conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
+ test.setConfiguration(conf);
+ FileInputStream dataIn = null, metaIn = null;
final Path TEST_PATH = new Path("/a");
final long RANDOM_SEED = 4567L;
BlockReaderLocal blockReaderLocal = null;
@@ -143,45 +158,51 @@ public class TestBlockReaderLocal {
cluster.shutdown();
cluster = null;
test.setup(dataFile, checksum);
- dataIn = new FileInputStream(dataFile);
- checkIn = new FileInputStream(metaFile);
- blockReaderLocal = new BlockReaderLocal(new DFSClient.Conf(conf),
- TEST_PATH.getName(), block, 0, -1,
- dataIn, checkIn, datanodeID, checksum, null);
+ FileInputStream streams[] = {
+ new FileInputStream(dataFile),
+ new FileInputStream(metaFile)
+ };
+ dataIn = streams[0];
+ metaIn = streams[1];
+ blockReaderLocal = new BlockReaderLocal.Builder(
+ new DFSClient.Conf(conf)).
+ setFilename(TEST_PATH.getName()).
+ setBlock(block).
+ setStreams(streams).
+ setDatanodeID(datanodeID).
+ setCachingStrategy(new CachingStrategy(false, readahead)).
+ setVerifyChecksum(checksum).
+ setBlockMetadataHeader(BlockMetadataHeader.preadHeader(
+ metaIn.getChannel())).
+ build();
dataIn = null;
- checkIn = null;
+ metaIn = null;
test.doTest(blockReaderLocal, original);
+ // BlockReaderLocal should not alter the file position.
+ Assert.assertEquals(0, streams[0].getChannel().position());
+ Assert.assertEquals(0, streams[1].getChannel().position());
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (dataIn != null) dataIn.close();
- if (checkIn != null) checkIn.close();
+ if (metaIn != null) metaIn.close();
if (blockReaderLocal != null) blockReaderLocal.close();
}
}
private static class TestBlockReaderLocalImmediateClose
- implements BlockReaderLocalTest {
- @Override
- public void setup(File blockFile, boolean usingChecksums)
- throws IOException { }
- @Override
- public void doTest(BlockReaderLocal reader, byte original[])
- throws IOException { }
+ extends BlockReaderLocalTest {
}
@Test
public void testBlockReaderLocalImmediateClose() throws IOException {
- runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true);
- runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false);
+ runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
+ runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
}
private static class TestBlockReaderSimpleReads
- implements BlockReaderLocalTest {
- @Override
- public void setup(File blockFile, boolean usingChecksums)
- throws IOException { }
+ extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
@@ -194,24 +215,43 @@ public class TestBlockReaderLocal {
assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
reader.readFully(buf, 1537, 514);
assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+ // Readahead is always at least the size of one chunk in this test.
+ Assert.assertTrue(reader.getMaxReadaheadLength() >=
+ BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
}
@Test
public void testBlockReaderSimpleReads() throws IOException {
- runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true);
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderSimpleReadsShortReadahead() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
+ BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1);
}
@Test
public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
- runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false);
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderSimpleReadsNoReadahead() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0);
+ }
+
+ @Test
+ public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
}
private static class TestBlockReaderLocalArrayReads2
- implements BlockReaderLocalTest {
- @Override
- public void setup(File blockFile, boolean usingChecksums)
- throws IOException { }
+ extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
@@ -234,21 +274,30 @@ public class TestBlockReaderLocal {
@Test
public void testBlockReaderLocalArrayReads2() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
- true);
+ true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalArrayReads2NoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
- false);
+ false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalArrayReads2NoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
}
private static class TestBlockReaderLocalByteBufferReads
- implements BlockReaderLocalTest {
- @Override
- public void setup(File blockFile, boolean usingChecksums)
- throws IOException { }
+ extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
@@ -268,19 +317,105 @@ public class TestBlockReaderLocal {
@Test
public void testBlockReaderLocalByteBufferReads()
throws IOException {
- runBlockReaderLocalTest(
- new TestBlockReaderLocalByteBufferReads(), true);
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+ true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalByteBufferReadsNoChecksum()
throws IOException {
runBlockReaderLocalTest(
- new TestBlockReaderLocalByteBufferReads(), false);
+ new TestBlockReaderLocalByteBufferReads(),
+ false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferReadsNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+ true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+ false, 0);
+ }
+
+ /**
+ * Test reads that bypass the bounce buffer (because they are aligned
+ * and bigger than the readahead).
+ */
+ private static class TestBlockReaderLocalByteBufferFastLaneReads
+ extends BlockReaderLocalTest {
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH);
+ readFully(reader, buf, 0, 5120);
+ buf.flip();
+ assertArrayRegionsEqual(original, 0,
+ DFSTestUtil.asArray(buf), 0,
+ 5120);
+ reader.skip(1537);
+ readFully(reader, buf, 0, 1);
+ buf.flip();
+ assertArrayRegionsEqual(original, 6657,
+ DFSTestUtil.asArray(buf), 0,
+ 1);
+ reader.setMlocked(true);
+ readFully(reader, buf, 0, 5120);
+ buf.flip();
+ assertArrayRegionsEqual(original, 6658,
+ DFSTestUtil.asArray(buf), 0,
+ 5120);
+ reader.setMlocked(false);
+ readFully(reader, buf, 0, 513);
+ buf.flip();
+ assertArrayRegionsEqual(original, 11778,
+ DFSTestUtil.asArray(buf), 0,
+ 513);
+ reader.skip(3);
+ readFully(reader, buf, 0, 50);
+ buf.flip();
+ assertArrayRegionsEqual(original, 12294,
+ DFSTestUtil.asArray(buf), 0,
+ 50);
+ }
}
+ @Test
+ public void testBlockReaderLocalByteBufferFastLaneReads()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+ true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(
+ new TestBlockReaderLocalByteBufferFastLaneReads(),
+ false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+ true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+ false, 0);
+ }
+
private static class TestBlockReaderLocalReadCorruptStart
- implements BlockReaderLocalTest {
+ extends BlockReaderLocalTest {
boolean usingChecksums = false;
@Override
public void setup(File blockFile, boolean usingChecksums)
@@ -314,11 +449,12 @@ public class TestBlockReaderLocal {
@Test
public void testBlockReaderLocalReadCorruptStart()
throws IOException {
- runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true);
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
private static class TestBlockReaderLocalReadCorrupt
- implements BlockReaderLocalTest {
+ extends BlockReaderLocalTest {
boolean usingChecksums = false;
@Override
public void setup(File blockFile, boolean usingChecksums)
@@ -364,8 +500,136 @@ public class TestBlockReaderLocal {
@Test
public void testBlockReaderLocalReadCorrupt()
throws IOException {
- runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
- runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalReadCorruptNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalReadCorruptNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0);
+ }
+
+ private static class TestBlockReaderLocalWithMlockChanges
+ extends BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException {
+ }
+
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+ reader.skip(1);
+ readFully(reader, buf, 1, 9);
+ assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
+ readFully(reader, buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+ reader.setMlocked(true);
+ readFully(reader, buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+ reader.setMlocked(false);
+ reader.skip(1); // skip from offset 810 to offset 811
+ readFully(reader, buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalWithMlockChanges()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+ true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalWithMlockChangesNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+ false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalWithMlockChangesNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+ true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+ false, 0);
+ }
+
+ private static class TestBlockReaderLocalOnFileWithoutChecksum
+ extends BlockReaderLocalTest {
+ @Override
+ public void setConfiguration(HdfsConfiguration conf) {
+ conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
+ }
+
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ Assert.assertTrue(!reader.getVerifyChecksum());
+ ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+ reader.skip(1);
+ readFully(reader, buf, 1, 9);
+ assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
+ readFully(reader, buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+ reader.setMlocked(true);
+ readFully(reader, buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+ reader.setMlocked(false);
+ reader.skip(1); // skip from offset 810 to offset 811
+ readFully(reader, buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalOnFileWithoutChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+ true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+ false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ }
+
+ @Test
+ public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+ true, 0);
+ }
+
+ @Test
+ public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+ false, 0);
}
@Test(timeout=60000)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1551701&r1=1551700&r2=1551701&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Tue Dec 17 20:57:00 2013
@@ -259,7 +259,6 @@ public class TestShortCircuitLocalRead {
assertTrue("/ should be a directory", fs.getFileStatus(path)
.isDirectory() == true);
- // create a new file in home directory. Do not close it.
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
Path file1 = fs.makeQualified(new Path("filelocal.dat"));
FSDataOutputStream stm = createFile(fs, file1, 1);