You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/04 19:58:16 UTC
[42/50] hadoop git commit: HDFS-8033. Erasure coding: stateful
(non-positional) read from files in striped layout. Contributed by Zhe Zhang.
HDFS-8033. Erasure coding: stateful (non-positional) read from files in striped layout. Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e51018a1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e51018a1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e51018a1
Branch: refs/heads/HDFS-7285
Commit: e51018a1b0c645071dbf1eb6ba0354b5593e8290
Parents: 395e29b
Author: Zhe Zhang <zh...@apache.org>
Authored: Fri Apr 24 22:36:15 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 4 10:13:30 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../org/apache/hadoop/hdfs/DFSInputStream.java | 55 ++--
.../hadoop/hdfs/DFSStripedInputStream.java | 311 ++++++++++++++++++-
.../hadoop/hdfs/TestDFSStripedInputStream.java | 43 +++
.../apache/hadoop/hdfs/TestReadStripedFile.java | 110 ++++++-
5 files changed, 465 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e51018a1/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index cf41a9b..e8db485 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -131,3 +131,6 @@
HDFS-8228. Erasure Coding: SequentialBlockGroupIdGenerator#nextValue may cause
block id conflicts (Jing Zhao via Zhe Zhang)
+
+ HDFS-8033. Erasure coding: stateful (non-positional) read from files in
+ striped layout (Zhe Zhang)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e51018a1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 16250dd..6eb25d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -95,34 +95,34 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
protected final DFSClient dfsClient;
- private AtomicBoolean closed = new AtomicBoolean(false);
- private final String src;
- private final boolean verifyChecksum;
+ protected AtomicBoolean closed = new AtomicBoolean(false);
+ protected final String src;
+ protected final boolean verifyChecksum;
// state by stateful read only:
// (protected by lock on this)
/////
private DatanodeInfo currentNode = null;
- private LocatedBlock currentLocatedBlock = null;
- private long pos = 0;
- private long blockEnd = -1;
+ protected LocatedBlock currentLocatedBlock = null;
+ protected long pos = 0;
+ protected long blockEnd = -1;
private BlockReader blockReader = null;
////
// state shared by stateful and positional read:
// (protected by lock on infoLock)
////
- private LocatedBlocks locatedBlocks = null;
+ protected LocatedBlocks locatedBlocks = null;
private long lastBlockBeingWrittenLength = 0;
private FileEncryptionInfo fileEncryptionInfo = null;
- private CachingStrategy cachingStrategy;
+ protected CachingStrategy cachingStrategy;
////
- private final ReadStatistics readStatistics = new ReadStatistics();
+ protected final ReadStatistics readStatistics = new ReadStatistics();
// lock for state shared between read and pread
// Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
// (it's OK to acquire this lock when the lock on <this> is held)
- private final Object infoLock = new Object();
+ protected final Object infoLock = new Object();
/**
* Track the ByteBuffers that we have handed out to readers.
@@ -239,7 +239,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* back to the namenode to get a new list of block locations, and is
* capped at maxBlockAcquireFailures
*/
- private int failures = 0;
+ protected int failures = 0;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@@ -476,7 +476,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Fetch a block from namenode and cache it */
- private void fetchBlockAt(long offset) throws IOException {
+ protected void fetchBlockAt(long offset) throws IOException {
synchronized(infoLock) {
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
@@ -579,7 +579,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
// Will be getting a new BlockReader.
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
//
// Connect to best DataNode for desired Block, with potential offset
@@ -620,7 +620,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
- DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
@@ -696,7 +696,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
"unreleased ByteBuffers allocated by read(). " +
"Please release " + builder.toString() + ".");
}
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
super.close();
}
@@ -718,7 +718,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
throws ChecksumException, IOException;
}
- private void updateReadStatistics(ReadStatistics readStatistics,
+ protected void updateReadStatistics(ReadStatistics readStatistics,
int nRead, BlockReader blockReader) {
if (nRead <= 0) return;
synchronized(infoLock) {
@@ -754,7 +754,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
/**
* Used to read bytes into a user-supplied ByteBuffer
*/
- private class ByteBufferStrategy implements ReaderStrategy {
+ protected class ByteBufferStrategy implements ReaderStrategy {
final ByteBuffer buf;
ByteBufferStrategy(ByteBuffer buf) {
this.buf = buf;
@@ -770,6 +770,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
int ret = blockReader.read(buf);
success = true;
updateReadStatistics(readStatistics, ret, blockReader);
+ if (ret == 0) {
+ DFSClient.LOG.warn("zero");
+ }
return ret;
} finally {
if (!success) {
@@ -837,7 +840,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
- private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
+ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
@@ -926,7 +929,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
/**
* Add corrupted block replica into map.
*/
- private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
+ protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
Set<DatanodeInfo> dnSet = null;
if((corruptedBlockMap.containsKey(blk))) {
@@ -996,7 +999,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param ignoredNodes Do not choose nodes in this array (may be null)
* @return The DNAddrPair of the best node. Null if no node can be chosen.
*/
- private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
+ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) {
DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
@@ -1365,7 +1368,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return true if block access token has expired or invalid and it should be
* refetched
*/
- private static boolean tokenRefetchNeeded(IOException ex,
+ protected static boolean tokenRefetchNeeded(IOException ex,
InetSocketAddress targetAddr) {
/*
* Get a new access token and retry. Retry is needed in 2 cases. 1)
@@ -1472,7 +1475,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param corruptedBlockMap map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas
*/
- private void reportCheckSumFailure(
+ protected void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
int dataNodeCount) {
if (corruptedBlockMap.isEmpty()) {
@@ -1669,7 +1672,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
- private void closeCurrentBlockReader() {
+ protected void closeCurrentBlockReaders() {
if (blockReader == null) return;
// Close the current block reader so that the new caching settings can
// take effect immediately.
@@ -1689,7 +1692,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
}
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
}
@Override
@@ -1699,7 +1702,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
}
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
}
/**
@@ -1857,6 +1860,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@Override
public synchronized void unbuffer() {
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e51018a1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index d0e2b68..fe9e101 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -18,20 +18,21 @@
package org.apache.hadoop.hdfs;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
-import org.apache.hadoop.hdfs.protocol.ECInfo;
-import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -125,6 +126,9 @@ public class DFSStripedInputStream extends DFSInputStream {
return results;
}
+ private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
+ private BlockReader[] blockReaders = null;
+ private DatanodeInfo[] currentNodes = null;
private final int cellSize;
private final short dataBlkNum;
private final short parityBlkNum;
@@ -143,13 +147,285 @@ public class DFSStripedInputStream extends DFSInputStream {
@Override
public synchronized int read(final ByteBuffer buf) throws IOException {
- throw new UnsupportedActionException("Stateful read is not supported");
+ ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
+ TraceScope scope =
+ dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src);
+ try {
+ return readWithStrategy(byteBufferReader, 0, buf.remaining());
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * When seeking into a new block group, create blockReader for each internal
+ * block in the group.
+ */
+ @VisibleForTesting
+ private synchronized DatanodeInfo[] blockSeekTo(long target)
+ throws IOException {
+ if (target >= getFileLength()) {
+ throw new IOException("Attempted to read past end of file");
+ }
+
+ // Will be getting a new BlockReader.
+ closeCurrentBlockReaders();
+
+ // Connect to best DataNode for desired Block, with potential offset
+ DatanodeInfo[] chosenNodes = new DatanodeInfo[groupSize];
+ int refetchToken = 1; // only need to get a new access token once
+ int refetchEncryptionKey = 1; // only need to get a new encryption key once
+
+ // Compute desired striped block group
+ LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
+
+ // Update current position
+ this.pos = target;
+ this.blockEnd = targetBlockGroup.getStartOffset() +
+ targetBlockGroup.getBlockSize() - 1;
+
+ long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset();
+ LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
+ targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
+ // The purpose is to get start offset into each block
+ ReadPortion[] readPortions = planReadPortions(groupSize, cellSize,
+ offsetIntoBlockGroup, 0, 0);
+ while (true) {
+ int i = 0;
+ InetSocketAddress targetAddr = null;
+ try {
+ blockReaders = new BlockReader[groupSize];
+ for (i = 0; i < groupSize; i++) {
+ LocatedBlock targetBlock = targetBlocks[i];
+ if (targetBlock == null) {
+ continue;
+ }
+ long offsetIntoBlock = readPortions[i].startOffsetInBlock;
+ DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
+ chosenNodes[i] = retval.info;
+ targetAddr = retval.addr;
+ StorageType storageType = retval.storageType;
+
+ ExtendedBlock blk = targetBlock.getBlock();
+ Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+ CachingStrategy curCachingStrategy;
+ boolean shortCircuitForbidden;
+ synchronized(infoLock) {
+ curCachingStrategy = cachingStrategy;
+ shortCircuitForbidden = shortCircuitForbidden();
+ }
+ blockReaders[i] = new BlockReaderFactory(dfsClient.getConf()).
+ setInetSocketAddress(targetAddr).
+ setRemotePeerFactory(dfsClient).
+ setDatanodeInfo(chosenNodes[i]).
+ setStorageType(storageType).
+ setFileName(src).
+ setBlock(blk).
+ setBlockToken(accessToken).
+ setStartOffset(offsetIntoBlock).
+ setVerifyChecksum(verifyChecksum).
+ setClientName(dfsClient.clientName).
+ setLength(blk.getNumBytes() - offsetIntoBlock).
+ setCachingStrategy(curCachingStrategy).
+ setAllowShortCircuitLocalReads(!shortCircuitForbidden).
+ setClientCacheContext(dfsClient.getClientContext()).
+ setUserGroupInformation(dfsClient.ugi).
+ setConfiguration(dfsClient.getConfiguration()).
+ build();
+ }
+ currentLocatedBlock = targetBlockGroup;
+ return chosenNodes;
+ } catch (IOException ex) {
+ // Retry in case of encryption key or token exceptions. Otherwise throw
+ // IOException: since each internal block is singly replicated, it's
+ // not meaningful trying to locate another replica.
+ if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ + "encryption key was invalid when connecting to " + targetAddr
+ + " : " + ex);
+ // The encryption key used is invalid.
+ refetchEncryptionKey--;
+ dfsClient.clearDataEncryptionKey();
+ } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
+ refetchToken--;
+ fetchBlockAt(target);
+ } else {
+ DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ + ", add to deadNodes and continue. " + ex, ex);
+ // Put chosen node into dead list and throw exception
+ addToDeadNodes(chosenNodes[i]);
+ throw ex;
+ }
+ }
+ }
+ }
+
+ /**
+ * Extend the super method with the logic of switching between cells.
+ * When reaching the end of a cell, proceed to the next cell and read it
+ * with the next blockReader.
+ */
+ @Override
+ protected void closeCurrentBlockReaders() {
+ if (blockReaders == null || blockReaders.length == 0) {
+ return;
+ }
+ for (int i = 0; i < groupSize; i++) {
+ if (blockReaders[i] == null) {
+ continue;
+ }
+ try {
+ blockReaders[i].close();
+ } catch (IOException e) {
+ DFSClient.LOG.error("error closing blockReader", e);
+ }
+ blockReaders[i] = null;
+ }
+ blockEnd = -1;
}
@Override
- public synchronized int read(final byte buf[], int off, int len)
+ protected synchronized int readWithStrategy(ReaderStrategy strategy,
+ int off, int len) throws IOException {
+ dfsClient.checkOpen();
+ if (closed.get()) {
+ throw new IOException("Stream closed");
+ }
+ Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
+ = new HashMap<>();
+ failures = 0;
+ if (pos < getFileLength()) {
+ int retries = 2;
+ /** Index of the target block in a stripe to read from */
+ int idxInGroup = (int) ((pos / cellSize) % dataBlkNum);
+ while (retries > 0) {
+ try {
+ // currentNode can be left as null if previous read had a checksum
+ // error on the same block. See HDFS-3067
+ if (pos > blockEnd || currentNodes == null) {
+ currentNodes = blockSeekTo(pos);
+ }
+ int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
+ synchronized(infoLock) {
+ if (locatedBlocks.isLastBlockComplete()) {
+ realLen = (int) Math.min(realLen,
+ locatedBlocks.getFileLength() - pos);
+ }
+ }
+
+ /** Number of bytes already read into buffer */
+ int result = 0;
+ while (result < realLen) {
+ /**
+ * Temporary position into the file; {@link pos} might not proceed
+ * to this temporary position in case of exceptions.
+ */
+ long tmpPos = pos + result;
+ /** Start and end offsets of a cell in the file */
+ long cellStart = (tmpPos / cellSize) * cellSize;
+ long cellEnd = cellStart + cellSize - 1;
+
+ /** Number of bytes to read from the current cell */
+ int realLenInCell = (int) Math.min(realLen - result,
+ cellEnd - tmpPos + 1L);
+ assert realLenInCell > 0 : "Temporary position shouldn't be " +
+ "after cellEnd";
+ // Read from one blockReader up to cell boundary
+ int cellRet = readBuffer(blockReaders[idxInGroup],
+ currentNodes[idxInGroup], strategy, off + result,
+ realLenInCell);
+ if (cellRet >= 0) {
+ result += cellRet;
+ if (cellRet < realLenInCell) {
+ // A short read indicates the current blockReader buffer is
+ // already drained. Should return the read call. Otherwise
+ // should proceed to the next cell.
+ break;
+ }
+ } else {
+ // got a EOS from reader though we expect more data on it.
+ throw new IOException("Unexpected EOS from the reader");
+ }
+ idxInGroup = (idxInGroup + 1) % dataBlkNum;
+ }
+
+ pos += result;
+
+ if (dfsClient.stats != null) {
+ dfsClient.stats.incrementBytesRead(result);
+ }
+ return result;
+ } catch (ChecksumException ce) {
+ throw ce;
+ } catch (IOException e) {
+ if (retries == 1) {
+ DFSClient.LOG.warn("DFS Read", e);
+ }
+ blockEnd = -1;
+ if (currentNodes[idxInGroup] != null) {
+ addToDeadNodes(currentNodes[idxInGroup]);
+ }
+ if (--retries == 0) {
+ throw e;
+ }
+ } finally {
+ // Check if need to report block replicas corruption either read
+ // was successful or ChecksumException occured.
+ reportCheckSumFailure(corruptedBlockMap,
+ currentLocatedBlock.getLocations().length);
+ }
+ }
+ }
+ return -1;
+ }
+
+ private synchronized int readBuffer(BlockReader blockReader,
+ DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len)
+ throws IOException {
+ IOException ioe;
+ while (true) {
+ try {
+ return readerStrategy.doRead(blockReader, off, len);
+ } catch ( ChecksumException ce ) {
+ DFSClient.LOG.warn("Found Checksum error for "
+ + getCurrentBlock() + " from " + currentNode
+ + " at " + ce.getPos());
+ // If current block group is corrupt, it's meaningless to retry.
+ // TODO: this should trigger decoding logic (HDFS-7678)
+ throw ce;
+ } catch ( IOException e ) {
+ ioe = e;
+ }
+
+ boolean sourceFound = seekToBlockSource(pos);
+ if (!sourceFound) {
+ throw ioe;
+ }
+ }
+ }
+
+ private boolean seekToBlockSource(long targetPos)
throws IOException {
- throw new UnsupportedActionException("Stateful read is not supported");
+ currentNodes = blockSeekTo(targetPos);
+ return true;
+ }
+
+ protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy {
+ ByteBufferStrategy(ByteBuffer buf) {
+ super(buf);
+ }
+
+ @Override
+ public int doRead(BlockReader blockReader, int off, int len)
+ throws ChecksumException, IOException {
+ int oldlimit = buf.limit();
+ if (buf.remaining() > len) {
+ buf.limit(buf.position() + len);
+ }
+ int ret = super.doRead(blockReader, off, len);
+ buf.limit(oldlimit);
+ return ret;
+ }
}
/**
@@ -188,8 +464,11 @@ public class DFSStripedInputStream extends DFSInputStream {
dataBlkNum, idx);
}
- private LocatedBlock getBlockGroupAt(long offset) throws IOException {
- return super.getBlockAt(offset);
+ private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
+ LocatedBlock lb = super.getBlockAt(offset);
+ assert lb instanceof LocatedStripedBlock : "NameNode" +
+ " should return a LocatedStripedBlock for a striped file";
+ return (LocatedStripedBlock)lb;
}
/**
@@ -206,10 +485,8 @@ public class DFSStripedInputStream extends DFSInputStream {
int len = (int) (end - start + 1);
// Refresh the striped block group
- LocatedBlock block = getBlockGroupAt(blockStartOffset);
- assert block instanceof LocatedStripedBlock : "NameNode" +
- " should return a LocatedStripedBlock for a striped file";
- LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
+ LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);
+
// Planning the portion of I/O for each shard
ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start,
@@ -308,7 +585,7 @@ public class DFSStripedInputStream extends DFSInputStream {
* +------------------------------------------------------+
*/
private long startOffsetInBlock = 0;
- private long readLength = 0;
+ private int readLength = 0;
private final List<Integer> offsetsInBuf = new ArrayList<>();
private final List<Integer> lengths = new ArrayList<>();
@@ -328,7 +605,7 @@ public class DFSStripedInputStream extends DFSInputStream {
return lens;
}
- long getReadLength() {
+ int getReadLength() {
return readLength;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e51018a1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 73c7350..cf10981 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -28,6 +28,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
public class TestDFSStripedInputStream {
private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
@@ -165,6 +166,7 @@ public class TestDFSStripedInputStream {
Assert.assertEquals("File length should be the same",
writeBytes, fileLength);
+ // pread
try (DFSStripedInputStream dis =
new DFSStripedInputStream(fs.getClient(), src, true)) {
byte[] buf = new byte[writeBytes + 100];
@@ -176,5 +178,46 @@ public class TestDFSStripedInputStream {
Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
}
}
+
+ // stateful read with byte array
+ try (DFSStripedInputStream dis =
+ new DFSStripedInputStream(fs.getClient(), src, true)) {
+ byte[] buf = new byte[writeBytes + 100];
+ int readLen = 0;
+ int ret;
+ do {
+ ret = dis.read(buf, readLen, buf.length - readLen);
+ if (ret > 0) {
+ readLen += ret;
+ }
+ } while (ret >= 0);
+
+ readLen = readLen >= 0 ? readLen : 0;
+ Assert.assertEquals("The length of file should be the same to write size",
+ writeBytes, readLen);
+ for (int i = 0; i < writeBytes; i++) {
+ Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
+ }
+ }
+
+ // stateful read with ByteBuffer
+ try (DFSStripedInputStream dis =
+ new DFSStripedInputStream(fs.getClient(), src, true)) {
+ ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
+ int readLen = 0;
+ int ret;
+ do {
+ ret = dis.read(buf);
+ if (ret > 0) {
+ readLen += ret;
+ }
+ } while (ret >= 0);
+ readLen = readLen >= 0 ? readLen : 0;
+ Assert.assertEquals("The length of file should be the same to write size",
+ writeBytes, readLen);
+ for (int i = 0; i < writeBytes; i++) {
+ Assert.assertEquals("Byte at i should be the same", getByte(i), buf.array()[i]);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e51018a1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
index b0631ce..d980bd6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -28,7 +30,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -38,6 +39,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
@@ -52,19 +54,21 @@ public class TestReadStripedFile {
private Path filePath = new Path(dirPath, "file");
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
- private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int NUM_STRIPE_PER_BLOCK = 2;
- private final int BLOCKSIZE = NUM_STRIPE_PER_BLOCK * DATA_BLK_NUM * CELLSIZE;
+ private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
+ private final int BLOCK_GROUP_SIZE = DATA_BLK_NUM * INTERNAL_BLOCK_SIZE;
@Before
public void setup() throws IOException {
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
SimulatedFSDataset.setFactory(conf);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(BLK_GROUP_SIZE)
- .build();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+ DATA_BLK_NUM + PARITY_BLK_NUM).build();
cluster.waitActive();
fs = cluster.getFileSystem();
+ fs.mkdirs(dirPath);
+ fs.getClient().createErasureCodingZone(dirPath.toString(), null);
}
@After
@@ -80,10 +84,10 @@ public class TestReadStripedFile {
@Test
public void testGetBlock() throws Exception {
final int numBlocks = 4;
- DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks,
- NUM_STRIPE_PER_BLOCK, true);
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
- filePath.toString(), 0, BLOCKSIZE * numBlocks);
+ filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
final DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
@@ -103,11 +107,11 @@ public class TestReadStripedFile {
@Test
public void testPread() throws Exception {
- final int numBlocks = 4;
- DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks,
- NUM_STRIPE_PER_BLOCK, true);
+ final int numBlocks = 2;
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
- filePath.toString(), 0, BLOCKSIZE);
+ filePath.toString(), 0, BLOCK_GROUP_SIZE);
assert lbs.get(0) instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
@@ -121,11 +125,89 @@ public class TestReadStripedFile {
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
- int readSize = BLOCKSIZE;
+ int readSize = BLOCK_GROUP_SIZE;
byte[] readBuffer = new byte[readSize];
int ret = in.read(0, readBuffer, 0, readSize);
assertEquals(readSize, ret);
// TODO: verify read results with patterned data from HDFS-8117
}
+
+ @Test
+ public void testStatefulRead() throws Exception {
+ testStatefulRead(false, false);
+ testStatefulRead(true, false);
+ testStatefulRead(true, true);
+ }
+
+ private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
+ throws Exception {
+ final int numBlocks = 2;
+ final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
+ if (cellMisalignPacket) {
+ conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1);
+ tearDown();
+ setup();
+ }
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ NUM_STRIPE_PER_BLOCK, false);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, fileSize);
+
+ assert lbs.getLocatedBlocks().size() == numBlocks;
+ for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+ assert lb instanceof LocatedStripedBlock;
+ LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
+ for (int i = 0; i < DATA_BLK_NUM; i++) {
+ Block blk = new Block(bg.getBlock().getBlockId() + i,
+ NUM_STRIPE_PER_BLOCK * CELLSIZE,
+ bg.getBlock().getGenerationStamp());
+ blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+ cluster.injectBlocks(i, Arrays.asList(blk),
+ bg.getBlock().getBlockPoolId());
+ }
+ }
+
+ DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(), filePath.toString(),
+ false);
+
+ byte[] expected = new byte[fileSize];
+
+ for (LocatedBlock bg : lbs.getLocatedBlocks()) {
+ /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+ for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
+ for (int k = 0; k < CELLSIZE; k++) {
+ int posInBlk = i * CELLSIZE + k;
+ int posInFile = (int) bg.getStartOffset() +
+ i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+ expected[posInFile] = SimulatedFSDataset.simulatedByte(
+ new Block(bg.getBlock().getBlockId() + j), posInBlk);
+ }
+ }
+ }
+ }
+
+ if (useByteBuffer) {
+ ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
+ int done = 0;
+ while (done < fileSize) {
+ int ret = in.read(readBuffer);
+ assertTrue(ret > 0);
+ done += ret;
+ }
+ assertArrayEquals(expected, readBuffer.array());
+ } else {
+ byte[] readBuffer = new byte[fileSize];
+ int done = 0;
+ while (done < fileSize) {
+ int ret = in.read(readBuffer, done, fileSize - done);
+ assertTrue(ret > 0);
+ done += ret;
+ }
+ assertArrayEquals(expected, readBuffer);
+ }
+ fs.delete(filePath, true);
+ }
}