You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/08/01 21:58:48 UTC
tez git commit: TEZ-3804. FetcherOrderedGrouped#setupLocalDiskFetch
should ignore empty partition records (Kuhu Shukla via jeagles)
Repository: tez
Updated Branches:
refs/heads/master 2358521fa -> 614937c5d
TEZ-3804. FetcherOrderedGrouped#setupLocalDiskFetch should ignore empty partition records (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/614937c5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/614937c5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/614937c5
Branch: refs/heads/master
Commit: 614937c5df88b79c85ae9fc6394652fb65d98081
Parents: 2358521
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Tue Aug 1 16:58:42 2017 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Tue Aug 1 16:58:42 2017 -0500
----------------------------------------------------------------------
.../orderedgrouped/FetcherOrderedGrouped.java | 3 +
.../shuffle/orderedgrouped/TestFetcher.java | 83 +++++++++++++++++++-
2 files changed, 82 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/614937c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 2c3aac3..68a54e9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -715,6 +715,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
srcAttemptId = scheduler.getIdentifierForFetchedOutput(srcAttemptId.getPathComponent(), reduceId);
Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(), reduceId);
+ if(!indexRecord.hasData()) {
+ continue;
+ }
mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
long endTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/tez/blob/614937c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index ef371c2..6d30448 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -299,7 +299,7 @@ public class TestFetcher {
throw new IOException("failing to simulate failure case");
}
// match with params for copySucceeded below.
- return new TezIndexRecord(p * 10, p * 1000, p * 100);
+ return new TezIndexRecord(p * 10, (p+1) * 1000, (p+2) * 100);
}
}).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i));
}
@@ -327,6 +327,81 @@ public class TestFetcher {
verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
}
+ @Test (timeout = 5000)
+ public void testSetupLocalDiskFetchEmptyPartitions() throws Exception {
+ Configuration conf = new TezConfiguration();
+ ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+ MergeManager merger = mock(MergeManager.class);
+ Shuffle shuffle = mock(Shuffle.class);
+ InputContext inputContext = mock(InputContext.class);
+ when(inputContext.getCounters()).thenReturn(new TezCounters());
+ when(inputContext.getSourceVertexName()).thenReturn("");
+
+ MapHost host = new MapHost(HOST, PORT, 1, 1);
+ FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
+ null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+ false, false, true, false);
+ FetcherOrderedGrouped spyFetcher = spy(fetcher);
+
+ final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList(
+ new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", 1),
+ new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", 1),
+ new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", 1),
+ new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", 1),
+ new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", 1)
+ );
+
+ doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
+
+ final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>();
+ for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) {
+ for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) {
+ ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(), host.getPartitionId() + i);
+ pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i));
+ }
+ }
+ doAnswer(new Answer<InputAttemptIdentifier>() {
+ @Override
+ public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ String path = (String) args[0];
+ int reduceId = (int) args[1];
+ return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId));
+ }
+ }).when(scheduler)
+ .getIdentifierForFetchedOutput(any(String.class), any(int.class));
+
+ doAnswer(new Answer<Path>() {
+ @Override
+ public Path answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
+ }
+ }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString());
+
+ for (int i = 0; i < host.getPartitionCount(); i++) {
+ doAnswer(new Answer<TezIndexRecord>() {
+ @Override
+ public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ String pathComponent = (String) args[0];
+ int len = pathComponent.length();
+ long p = Long.valueOf(pathComponent.substring(len - 1, len));
+ // match with params for copySucceeded below.
+ return new TezIndexRecord(p * 10, 0, 0);
+ }
+ }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i));
+ }
+
+ doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
+ anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
+ spyFetcher.setupLocalDiskFetch(host);
+ verify(scheduler, times(0)).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
+ anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
+ verify(spyFetcher).putBackRemainingMapOutputs(host);
+ }
+
@Test(timeout = 5000)
public void testSetupLocalDiskFetchAutoReduce() throws Exception {
Configuration conf = new TezConfiguration();
@@ -412,7 +487,7 @@ public class TestFetcher {
throw new IOException("Thowing exception to simulate failure case");
}
// match with params for copySucceeded below.
- return new TezIndexRecord(p * 10, p * 1000, p * 100);
+ return new TezIndexRecord(p * 10, (p + 1) * 1000, (p + 2) * 100);
}
}).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i));
}
@@ -455,8 +530,8 @@ public class TestFetcher {
InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p).expand(j);
String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent();
ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class);
- verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100),
- eq(p * 1000), anyLong(), captureMapOutput.capture(), anyBoolean());
+ verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq((p+2) * 100),
+ eq((p+1) * 1000), anyLong(), captureMapOutput.capture(), anyBoolean());
// cannot use the equals of MapOutput as it compares id which is private. so doing it manually
MapOutput m = captureMapOutput.getAllValues().get(0);