You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/08/14 14:01:07 UTC
[23/26] hadoop git commit: HDFS-11303. Hedged read might hang
infinitely if read data from all DN failed . Contributed by Chen Zhang,
Wei-chiu Chuang, and John Zhuge.
HDFS-11303. Hedged read might hang infinitely if read data from all DN failed . Contributed by Chen Zhang, Wei-chiu Chuang, and John Zhuge.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b242f09
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b242f09
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b242f09
Branch: refs/heads/HADOOP-13345
Commit: 8b242f09a61a7536d2422546bfa6c2aaf1d57ed6
Parents: 28d97b7
Author: John Zhuge <jz...@cloudera.com>
Authored: Thu Aug 10 14:04:36 2017 -0700
Committer: John Zhuge <jz...@apache.org>
Committed: Fri Aug 11 19:42:07 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSInputStream.java | 11 ++--
.../java/org/apache/hadoop/hdfs/TestPread.java | 63 ++++++++++++++++++++
2 files changed, 70 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b242f09/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index dcc997c..6bff172 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1131,8 +1131,9 @@ public class DFSInputStream extends FSInputStream
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(firstRequest);
+ Future<ByteBuffer> future = null;
try {
- Future<ByteBuffer> future = hedgedService.poll(
+ future = hedgedService.poll(
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
if (future != null) {
ByteBuffer result = future.get();
@@ -1142,16 +1143,18 @@ public class DFSInputStream extends FSInputStream
}
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
+ "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
- // Ignore this node on next go around.
- ignored.add(chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
// continue; no need to refresh block locations
} catch (ExecutionException e) {
- // Ignore
+ futures.remove(future);
} catch (InterruptedException e) {
throw new InterruptedIOException(
"Interrupted while waiting for reading task");
}
+ // Ignore this node on next go around.
+ // If poll timeout and the request still ongoing, don't consider it
+ // again. If read data failed, don't consider it either.
+ ignored.add(chosenNode.info);
} else {
// We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b242f09/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index 85fc97b..bcb02b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -59,6 +59,8 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
/**
* This class tests the DFS positional read functionality in a single node
@@ -72,6 +74,9 @@ public class TestPread {
boolean simulatedStorage;
boolean isHedgedRead;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestPread.class.getName());
+
@Before
public void setup() {
simulatedStorage = false;
@@ -551,6 +556,64 @@ public class TestPread {
}
}
+ @Test(timeout=30000)
+ public void testHedgedReadFromAllDNFailed() throws IOException {
+ Configuration conf = new Configuration();
+ int numHedgedReadPoolThreads = 5;
+ final int hedgedReadTimeoutMillis = 50;
+
+ conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
+ numHedgedReadPoolThreads);
+ conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
+ hedgedReadTimeoutMillis);
+ conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
+ // Set up the InjectionHandler
+ DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
+ DFSClientFaultInjector injector = DFSClientFaultInjector.get();
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ if (true) {
+ LOG.info("-------------- throw Checksum Exception");
+ throw new ChecksumException("ChecksumException test", 100);
+ }
+ return null;
+ }
+ }).when(injector).fetchFromDatanodeException();
+
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+ .format(true).build();
+ DistributedFileSystem fileSys = cluster.getFileSystem();
+ DFSClient dfsClient = fileSys.getClient();
+ FSDataOutputStream output = null;
+ DFSInputStream input = null;
+ String filename = "/hedgedReadMaxOut.dat";
+ DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
+ // Metrics instance is static, so we need to reset counts from prior tests.
+ metrics.hedgedReadOps.set(0);
+ try {
+ Path file = new Path(filename);
+ output = fileSys.create(file, (short) 2);
+ byte[] data = new byte[64 * 1024];
+ output.write(data);
+ output.flush();
+ output.close();
+ byte[] buffer = new byte[64 * 1024];
+ input = dfsClient.open(filename);
+ input.read(0, buffer, 0, 1024);
+ Assert.fail("Reading the block should have thrown BlockMissingException");
+ } catch (BlockMissingException e) {
+ assertEquals(3, input.getHedgedReadOpsLoopNumForTesting());
+ assertTrue(metrics.getHedgedReadOps() == 0);
+ } finally {
+ Mockito.reset(injector);
+ IOUtils.cleanupWithLogger(LOG, input);
+ IOUtils.cleanupWithLogger(LOG, output);
+ fileSys.close();
+ cluster.shutdown();
+ }
+ }
+
/**
* Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
* 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org