You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/08/04 02:25:44 UTC
tez git commit: TEZ-2172. FetcherOrderedGrouped using List to store
InputAttemptIdentifier can lead to some inefficiency during remove()
operation (Saikat via rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 4d381d778 -> 73da831e8
TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation (Saikat via rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73da831e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73da831e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73da831e
Branch: refs/heads/master
Commit: 73da831e8e5acab5e2e044c74a29384bcda4cbc6
Parents: 4d381d7
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Aug 4 05:59:12 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Aug 4 05:59:12 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../orderedgrouped/FetcherOrderedGrouped.java | 52 +++++++-------
.../shuffle/orderedgrouped/TestFetcher.java | 74 ++++++++++++++++++--
3 files changed, 96 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/73da831e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 303171a..59307b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
TEZ-2646. Add scheduling casual dependency for attempts
ALL CHANGES:
+ TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation.
TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation.
TEZ-2645. Provide standard analyzers for job analysis.
TEZ-2627. Support for Tez Job Priorities.
http://git-wip-us.apache.org/repos/asf/tez/blob/73da831e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index a4d38ce..d8be8dd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -22,9 +22,11 @@ import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tez.http.BaseHttpConnection;
@@ -87,8 +89,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
@VisibleForTesting
- LinkedList<InputAttemptIdentifier> remaining;
-
+ Map<String, InputAttemptIdentifier> remaining;
volatile DataInputStream input;
volatile BaseHttpConnection httpConnection;
@@ -228,25 +229,19 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
retryStartTime = 0;
// Get completed maps on 'host'
List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
-
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (srcAttempts.size() == 0) {
return;
}
-
if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+ srcAttempts + ", partitionId: " + currentPartition);
}
-
- // List of maps to be fetched yet
- remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts);
-
+ populateRemainingMap(srcAttempts);
// Construct the url and connect
-
try {
- if (!setupConnection(host, srcAttempts)) {
+ if (!setupConnection(host, remaining.values())) {
if (stopped) {
cleanupCurrentConnection(true);
}
@@ -273,7 +268,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
return;
}
// Connect with retry
- if (!setupConnection(host, new LinkedList<InputAttemptIdentifier>(remaining))) {
+ if (!setupConnection(host, remaining.values())) {
if (stopped) {
cleanupCurrentConnection(true);
LOG.info("Not reporting connection re-establishment failure since fetcher is stopped");
@@ -310,7 +305,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
}
@VisibleForTesting
- boolean setupConnection(MapHost host, List<InputAttemptIdentifier> attempts)
+ boolean setupConnection(MapHost host, Collection<InputAttemptIdentifier> attempts)
throws IOException {
boolean connectSucceeded = false;
try {
@@ -347,7 +342,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
// At this point, either the connection failed, or the initial header verification failed.
// The error does not relate to any specific Input. Report all of them as failed.
// This ends up indirectly penalizing the host (multiple failures reported on the single host)
- for(InputAttemptIdentifier left: remaining) {
+ for (InputAttemptIdentifier left : remaining.values()) {
// Need to be handling temporary glitches ..
// Report read error to the AM to trigger source failure heuristics
scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded);
@@ -361,7 +356,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
// Cycle through remaining MapOutputs
boolean isFirst = true;
InputAttemptIdentifier first = null;
- for (InputAttemptIdentifier left : remaining) {
+ for (InputAttemptIdentifier left : remaining.values()) {
if (isFirst) {
first = left;
isFirst = false;
@@ -487,7 +482,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
endTime - startTime, mapOutput);
// Note successful shuffle
- remaining.remove(srcAttemptId);
+ remaining.remove(srcAttemptId.toString());
metrics.successFetch();
return null;
} catch (IOException ioe) {
@@ -514,12 +509,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
srcAttemptId + " decomp: " +
decompressedLength + ", " + compressedLength, ioe);
if(srcAttemptId == null) {
- return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+ return remaining.values().toArray(new InputAttemptIdentifier[remaining.values().size()]);
} else {
return new InputAttemptIdentifier[] {srcAttemptId};
}
}
-
LOG.warn("Failed to shuffle output of " + srcAttemptId +
" from " + host.getHostIdentifier(), ioe);
@@ -528,7 +522,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
metrics.failedFetch();
return new InputAttemptIdentifier[] {srcAttemptId};
}
-
}
/**
@@ -572,7 +565,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
* @return true/false, based on if the verification succeeded or not
*/
private boolean verifySanity(long compressedLength, long decompressedLength,
- int forReduce, List<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
+ int forReduce, Map<String, InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
if (compressedLength < 0 || decompressedLength < 0) {
wrongLengthErrs.increment(1);
LOG.warn(logIdentifier + " invalid lengths in map output header: id: " +
@@ -592,7 +585,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
}
// Sanity check
- if (!remaining.contains(srcAttemptId)) {
+ if (remaining.get(srcAttemptId.toString()) == null) {
wrongMapErrs.increment(1);
LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
return false;
@@ -603,7 +596,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
private InputAttemptIdentifier getNextRemainingAttempt() {
if (remaining.size() > 0) {
- return remaining.iterator().next();
+ return remaining.values().iterator().next();
} else {
return null;
}
@@ -626,10 +619,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
}
// List of maps to be fetched yet
- remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts);
+ populateRemainingMap(srcAttempts);
try {
- final Iterator<InputAttemptIdentifier> iter = remaining.iterator();
+ final Iterator<InputAttemptIdentifier> iter = remaining.values().iterator();
while (iter.hasNext()) {
// Avoid fetching more if already stopped
if (stopped) {
@@ -701,5 +694,16 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
return MapOutput.createLocalDiskMapOutput(srcAttemptId, allocator, filename,
indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
}
+
+ @VisibleForTesting
+ void populateRemainingMap(List<InputAttemptIdentifier> origlist) {
+ if (remaining == null) {
+ remaining = new LinkedHashMap<String, InputAttemptIdentifier>(origlist.size());
+ }
+ for (InputAttemptIdentifier id : origlist) {
+ remaining.put(id.toString(), id);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/73da831e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 385b7b0..7415570 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -42,9 +41,15 @@ import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map.Entry;
import com.google.common.collect.Lists;
+
import org.apache.tez.http.HttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.common.counters.TezCounter;
@@ -373,7 +378,7 @@ public class TestFetcher {
new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3")
);
doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
- doReturn(true).when(fetcher).setupConnection(host, srcAttempts);
+ doReturn(true).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class));
URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts, false);
fetcher.httpConnection = new FakeHttpConnection(url, null, "", null);
@@ -394,8 +399,7 @@ public class TestFetcher {
@Override public Void answer(InvocationOnMock invocation) throws Throwable {
// Emulate host down for 4 seconds.
Thread.sleep(4000);
- doReturn(false).when(fetcher).setupConnection(host, srcAttempts);
-
+ doReturn(false).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class));
// Throw IOException when fetcher tries to connect again to the same node
throw new FetcherReadTimeoutException("creating fetcher socket read timeout exception");
}
@@ -407,7 +411,7 @@ public class TestFetcher {
//ignore
}
//setup connection should be called twice (1 for connect and another for retry)
- verify(fetcher, times(2)).setupConnection(any(MapHost.class), anyList());
+ verify(fetcher, times(2)).setupConnection(any(MapHost.class), any(Collection.class));
//since copyMapOutput consistently fails, it should call copyFailed once
verify(scheduler, times(1)).copyFailed(any(InputAttemptIdentifier.class), any(MapHost.class),
anyBoolean(), anyBoolean());
@@ -458,8 +462,7 @@ public class TestFetcher {
wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, true);
final FetcherOrderedGrouped fetcher = spy(mockFetcher);
- fetcher.remaining = Lists.newLinkedList();
-
+ fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>();
final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
@@ -477,4 +480,61 @@ public class TestFetcher {
fail();
}
}
+
+ @Test(timeout = 1000)
+ public void testInputAttemptIdentifierMap() {
+ InputAttemptIdentifier[] srcAttempts = {
+ new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+ false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+ //duplicate entry
+ new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+ false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+ // pipeline shuffle based identifiers, with multiple attempts
+ new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+ new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+ new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+ false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
+ new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
+ new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
+ };
+ InputAttemptIdentifier[] expectedSrcAttempts = {
+ new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+ false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+ // pipeline shuffle based identifiers
+ new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+ new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+ new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+ false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
+ new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
+ new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
+ };
+
+ Configuration conf = new TezConfiguration();
+ ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+ MergeManager merger = mock(MergeManager.class);
+ ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+ Shuffle shuffle = mock(Shuffle.class);
+ MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+ FetcherOrderedGrouped fetcher =
+ new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter,
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+ fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
+ Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size());
+ Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator();
+ int count = 0;
+ while(iterator.hasNext()) {
+ String key = iterator.next().getKey();
+ Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key) == 0);
+ }
+ }
}