You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/07/03 16:16:28 UTC
[25/29] hadoop git commit: HDFS-8703. Merge refactor of
DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)
HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bff5999d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bff5999d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bff5999d
Branch: refs/heads/HADOOP-12111
Commit: bff5999d07e9416a22846c849487e509ede55040
Parents: 37d7395
Author: Vinayakumar B <vi...@apache.org>
Authored: Thu Jul 2 16:11:50 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Thu Jul 2 16:11:50 2015 +0530
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../java/org/apache/hadoop/hdfs/DFSClient.java | 2 +-
.../org/apache/hadoop/hdfs/DFSInputStream.java | 233 +++++++++++++------
.../hadoop/hdfs/TestDFSClientRetries.java | 2 +-
4 files changed, 169 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index ec37542..7b96c56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -696,6 +696,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8666. Speedup the TestMover tests. (Walter Su via jing9)
+ HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch
+ (vinayakumarb)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index f4ceab3..4923a50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1181,7 +1181,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
// Get block info from namenode
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
try {
- return new DFSInputStream(this, src, verifyChecksum);
+ return new DFSInputStream(this, src, verifyChecksum, null);
} finally {
scope.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 6563d7b..7f3722f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
@@ -94,35 +95,35 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
- private final DFSClient dfsClient;
- private AtomicBoolean closed = new AtomicBoolean(false);
- private final String src;
- private final boolean verifyChecksum;
+ protected final DFSClient dfsClient;
+ protected AtomicBoolean closed = new AtomicBoolean(false);
+ protected final String src;
+ protected final boolean verifyChecksum;
// state by stateful read only:
// (protected by lock on this)
/////
private DatanodeInfo currentNode = null;
- private LocatedBlock currentLocatedBlock = null;
- private long pos = 0;
- private long blockEnd = -1;
+ protected LocatedBlock currentLocatedBlock = null;
+ protected long pos = 0;
+ protected long blockEnd = -1;
private BlockReader blockReader = null;
////
// state shared by stateful and positional read:
// (protected by lock on infoLock)
////
- private LocatedBlocks locatedBlocks = null;
+ protected LocatedBlocks locatedBlocks = null;
private long lastBlockBeingWrittenLength = 0;
private FileEncryptionInfo fileEncryptionInfo = null;
- private CachingStrategy cachingStrategy;
+ protected CachingStrategy cachingStrategy;
////
- private final ReadStatistics readStatistics = new ReadStatistics();
+ protected final ReadStatistics readStatistics = new ReadStatistics();
// lock for state shared between read and pread
// Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
// (it's OK to acquire this lock when the lock on <this> is held)
- private final Object infoLock = new Object();
+ protected final Object infoLock = new Object();
/**
* Track the ByteBuffers that we have handed out to readers.
@@ -239,7 +240,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* back to the namenode to get a new list of block locations, and is
* capped at maxBlockAcquireFailures
*/
- private int failures = 0;
+ protected int failures = 0;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@@ -252,24 +253,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
deadNodes.put(dnInfo, dnInfo);
}
- DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
- ) throws IOException, UnresolvedLinkException {
+ DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
+ LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
- openInfo();
+ this.locatedBlocks = locatedBlocks;
+ openInfo(false);
}
/**
* Grab the open-file info from namenode
+ * @param refreshLocatedBlocks whether to re-fetch locatedblocks
*/
- void openInfo() throws IOException, UnresolvedLinkException {
+ void openInfo(boolean refreshLocatedBlocks) throws IOException,
+ UnresolvedLinkException {
final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) {
- lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+ lastBlockBeingWrittenLength =
+ fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
@@ -281,7 +286,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(conf.getRetryIntervalForGetLastBlockLength());
- lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+ lastBlockBeingWrittenLength =
+ fetchLocatedBlocksAndGetLastBlockLength(true);
} else {
break;
}
@@ -302,8 +308,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
- private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
- final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
+ private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
+ throws IOException {
+ LocatedBlocks newInfo = locatedBlocks;
+ if (locatedBlocks == null || refresh) {
+ newInfo = dfsClient.getLocatedBlocks(src, 0);
+ }
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
}
@@ -441,7 +451,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return located block
* @throws IOException
*/
- private LocatedBlock getBlockAt(long offset) throws IOException {
+ protected LocatedBlock getBlockAt(long offset) throws IOException {
synchronized(infoLock) {
assert (locatedBlocks != null) : "locatedBlocks is null";
@@ -476,7 +486,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Fetch a block from namenode and cache it */
- private void fetchBlockAt(long offset) throws IOException {
+ protected void fetchBlockAt(long offset) throws IOException {
synchronized(infoLock) {
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
@@ -579,7 +589,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
// Will be getting a new BlockReader.
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
//
// Connect to best DataNode for desired Block, with potential offset
@@ -620,7 +630,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
- DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
@@ -631,8 +641,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
fetchBlockAt(target);
} else {
connectFailedOnce = true;
- DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block "
- +targetBlock.getBlock()+ ", add to deadNodes and continue. " + ex, ex);
+ DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ + ", add to deadNodes and continue. " + ex, ex);
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
@@ -696,7 +706,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
"unreleased ByteBuffers allocated by read(). " +
"Please release " + builder.toString() + ".");
}
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
super.close();
}
@@ -713,12 +723,22 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Wraps different possible read implementations so that readBuffer can be
* strategy-agnostic.
*/
- private interface ReaderStrategy {
+ interface ReaderStrategy {
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException;
+
+ /**
+ * Copy data from the src ByteBuffer into the read buffer.
+ * @param src The src buffer where the data is copied from
+ * @param offset Useful only when the ReadStrategy is based on a byte array.
+ * Indicate the offset of the byte array for copy.
+ * @param length Useful only when the ReadStrategy is based on a byte array.
+ * Indicate the length of the data to copy.
+ */
+ public int copyFrom(ByteBuffer src, int offset, int length);
}
- private void updateReadStatistics(ReadStatistics readStatistics,
+ protected void updateReadStatistics(ReadStatistics readStatistics,
int nRead, BlockReader blockReader) {
if (nRead <= 0) return;
synchronized(infoLock) {
@@ -749,12 +769,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
updateReadStatistics(readStatistics, nRead, blockReader);
return nRead;
}
+
+ @Override
+ public int copyFrom(ByteBuffer src, int offset, int length) {
+ ByteBuffer writeSlice = src.duplicate();
+ writeSlice.get(buf, offset, length);
+ return length;
+ }
}
/**
* Used to read bytes into a user-supplied ByteBuffer
*/
- private class ByteBufferStrategy implements ReaderStrategy {
+ protected class ByteBufferStrategy implements ReaderStrategy {
final ByteBuffer buf;
ByteBufferStrategy(ByteBuffer buf) {
this.buf = buf;
@@ -770,6 +797,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
int ret = blockReader.read(buf);
success = true;
updateReadStatistics(readStatistics, ret, blockReader);
+ if (ret == 0) {
+ DFSClient.LOG.warn("zero");
+ }
return ret;
} finally {
if (!success) {
@@ -779,6 +809,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
}
+
+ @Override
+ public int copyFrom(ByteBuffer src, int offset, int length) {
+ ByteBuffer writeSlice = src.duplicate();
+ int remaining = Math.min(buf.remaining(), writeSlice.remaining());
+ writeSlice.limit(writeSlice.position() + remaining);
+ buf.put(writeSlice);
+ return remaining;
+ }
}
/* This is a used by regular read() and handles ChecksumExceptions.
@@ -837,7 +876,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
- private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
+ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
@@ -926,7 +965,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
/**
* Add corrupted block replica into map.
*/
- private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
+ protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
Set<DatanodeInfo> dnSet = null;
if((corruptedBlockMap.containsKey(blk))) {
@@ -985,8 +1024,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} catch (InterruptedException iex) {
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
- openInfo();
- block = getBlockAt(block.getStartOffset());
+ openInfo(true);
+ block = refreshLocatedBlock(block);
failures++;
}
}
@@ -998,7 +1037,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param ignoredNodes Do not choose nodes in this array (may be null)
* @return The DNAddrPair of the best node. Null if no node can be chosen.
*/
- private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
+ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) {
DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
@@ -1058,15 +1097,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return errMsgr.toString();
}
- private void fetchBlockByteRange(long blockStartOffset, long start, long end,
+ protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
- LocatedBlock block = getBlockAt(blockStartOffset);
+ block = refreshLocatedBlock(block);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
try {
- actualGetFromOneDataNode(addressPair, blockStartOffset, start, end,
+ actualGetFromOneDataNode(addressPair, block, start, end,
buf, offset, corruptedBlockMap);
return;
} catch (IOException e) {
@@ -1077,7 +1116,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
- final long blockStartOffset, final long start, final long end,
+ final LocatedBlock block, final long start, final long end,
final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) {
@@ -1090,7 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
TraceScope scope =
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
try {
- actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
+ actualGetFromOneDataNode(datanode, block, start, end, buf,
offset, corruptedBlockMap);
return bb;
} finally {
@@ -1100,31 +1139,60 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
};
}
+ /**
+ * Used when reading contiguous blocks
+ */
private void actualGetFromOneDataNode(final DNAddrPair datanode,
- long blockStartOffset, final long start, final long end, byte[] buf,
+ LocatedBlock block, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
+ final int length = (int) (end - start + 1);
+ actualGetFromOneDataNode(datanode, block, start, end, buf,
+ new int[]{offset}, new int[]{length}, corruptedBlockMap);
+ }
+
+ /**
+ * Read data from one DataNode.
+ * @param datanode the datanode from which to read data
+ * @param block the located block containing the requested data
+ * @param startInBlk the startInBlk offset of the block
+ * @param endInBlk the endInBlk offset of the block
+ * @param buf the given byte array into which the data is read
+ * @param offsets the data may be read into multiple segments of the buf
+ * (when reading a striped block). this array indicates the
+ * offset of each buf segment.
+ * @param lengths the length of each buf segment
+ * @param corruptedBlockMap map recording list of datanodes with corrupted
+ * block replica
+ */
+ void actualGetFromOneDataNode(final DNAddrPair datanode,
+ LocatedBlock block, final long startInBlk, final long endInBlk,
+ byte[] buf, int[] offsets, int[] lengths,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
+ final int len = (int) (endInBlk - startInBlk + 1);
+ checkReadPortions(offsets, lengths, len);
while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
- LocatedBlock block = getBlockAt(blockStartOffset);
+ block = refreshLocatedBlock(block);
BlockReader reader = null;
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
- int len = (int) (end - start + 1);
- reader = getBlockReader(block, start, len, datanode.addr,
+ reader = getBlockReader(block, startInBlk, len, datanode.addr,
datanode.storageType, datanode.info);
- int nread = reader.readAll(buf, offset, len);
- updateReadStatistics(readStatistics, nread, reader);
-
- if (nread != len) {
- throw new IOException("truncated return from reader.read(): " +
- "excpected " + len + ", got " + nread);
+ for (int i = 0; i < offsets.length; i++) {
+ int nread = reader.readAll(buf, offsets[i], lengths[i]);
+ updateReadStatistics(readStatistics, nread, reader);
+ if (nread != lengths[i]) {
+ throw new IOException("truncated return from reader.read(): " +
+ "excpected " + lengths[i] + ", got " + nread);
+ }
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
return;
@@ -1169,11 +1237,40 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
- * Like {@link #fetchBlockByteRange} except we start up a second, parallel,
+ * Refresh cached block locations.
+ * @param block The currently cached block locations
+ * @return Refreshed block locations
+ * @throws IOException
+ */
+ protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
+ throws IOException {
+ return getBlockAt(block.getStartOffset());
+ }
+
+ /**
+ * This method verifies that the read portions are valid and do not overlap
+ * with each other.
+ */
+ private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
+ Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
+ int sum = 0;
+ for (int i = 0; i < lengths.length; i++) {
+ if (i > 0) {
+ int gap = offsets[i] - offsets[i - 1];
+ // make sure read portions do not overlap with each other
+ Preconditions.checkArgument(gap >= lengths[i - 1]);
+ }
+ sum += lengths[i];
+ }
+ Preconditions.checkArgument(sum == totalLen);
+ }
+
+ /**
+ * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
* 'hedged' read if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*/
- private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
+ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
@@ -1186,7 +1283,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
ByteBuffer bb = null;
int len = (int) (end - start + 1);
int hedgedReadId = 0;
- LocatedBlock block = getBlockAt(blockStartOffset);
+ block = refreshLocatedBlock(block);
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
@@ -1198,7 +1295,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
- chosenNode, block.getStartOffset(), start, end, bb,
+ chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
@@ -1235,7 +1332,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
- chosenNode, block.getStartOffset(), start, end, bb,
+ chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable);
@@ -1319,7 +1416,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return true if block access token has expired or invalid and it should be
* refetched
*/
- private static boolean tokenRefetchNeeded(IOException ex,
+ protected static boolean tokenRefetchNeeded(IOException ex,
InetSocketAddress targetAddr) {
/*
* Get a new access token and retry. Retry is needed in 2 cases. 1)
@@ -1389,13 +1486,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
if (dfsClient.isHedgedReadsEnabled()) {
- hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
- targetStart + bytesToRead - 1, buffer, offset,
- corruptedBlockMap);
+ hedgedFetchBlockByteRange(blk, targetStart,
+ targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
} else {
- fetchBlockByteRange(blk.getStartOffset(), targetStart,
- targetStart + bytesToRead - 1, buffer, offset,
- corruptedBlockMap);
+ fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+ buffer, offset, corruptedBlockMap);
}
} finally {
// Check and report if any block replicas are corrupted.
@@ -1427,7 +1522,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param corruptedBlockMap map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas
*/
- private void reportCheckSumFailure(
+ protected void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
int dataNodeCount) {
if (corruptedBlockMap.isEmpty()) {
@@ -1556,7 +1651,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
/**
*/
@Override
- public synchronized long getPos() throws IOException {
+ public synchronized long getPos() {
return pos;
}
@@ -1590,7 +1685,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Utility class to encapsulate data node info and its address. */
- private static final class DNAddrPair {
+ static final class DNAddrPair {
final DatanodeInfo info;
final InetSocketAddress addr;
final StorageType storageType;
@@ -1627,7 +1722,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
- private void closeCurrentBlockReader() {
+ protected void closeCurrentBlockReaders() {
if (blockReader == null) return;
// Close the current block reader so that the new caching settings can
// take effect immediately.
@@ -1647,7 +1742,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
}
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
}
@Override
@@ -1657,7 +1752,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
}
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
}
/**
@@ -1815,6 +1910,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@Override
public synchronized void unbuffer() {
- closeCurrentBlockReader();
+ closeCurrentBlockReaders();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 68cc155..43e0eb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -343,7 +343,7 @@ public class TestDFSClientRetries {
// we're starting a new operation on the user level.
doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
.when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
- is.openInfo();
+ is.openInfo(true);
// Seek to beginning forces a reopen of the BlockReader - otherwise it'll
// just keep reading on the existing stream and the fact that we've poisoned
// the block info won't do anything.