You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/02/02 21:56:56 UTC
tez git commit: TEZ-3599. Unordered Fetcher can hang if empty
partitions are present (Kuhu Shukla via jeagles)
Repository: tez
Updated Branches:
refs/heads/TEZ-3334 bc3c17a57 -> 0cf1ce26c
TEZ-3599. Unordered Fetcher can hang if empty partitions are present (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0cf1ce26
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0cf1ce26
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0cf1ce26
Branch: refs/heads/TEZ-3334
Commit: 0cf1ce26c96139abd726e8e00cfcfacfafd2b4f8
Parents: bc3c17a
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Thu Feb 2 15:56:43 2017 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Thu Feb 2 15:56:43 2017 -0600
----------------------------------------------------------------------
TEZ-3334-CHANGES.txt | 1 +
.../runtime/library/common/shuffle/Fetcher.java | 114 +++++++++++--------
.../common/shuffle/impl/ShuffleManager.java | 28 +++--
.../library/common/shuffle/TestFetcher.java | 46 +++++---
.../common/shuffle/impl/TestShuffleManager.java | 2 +-
.../shuffle/orderedgrouped/TestFetcher.java | 4 +-
6 files changed, 122 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 6c9a858..8416cc9 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
INCOMPATIBLE CHANGES:
ALL CHANGES:
+ TEZ-3599. Unordered Fetcher can hang if empty partitions are present
TEZ-3596. Number of Empty DME logged for Composite fetch is too high
TEZ-3597. Composite Fetch hangs on certain DME empty events.
TEZ-3595. Composite Fetch account error for disk direct
http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 7b5ca17..a083daa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -584,6 +584,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptIdentifier[] failedInputs = null;
while (!srcAttemptsRemaining.isEmpty() && failedInputs == null) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ srcAttemptsRemaining.entrySet().iterator().next().getValue();
if (isShutDown.get()) {
shutdownInternal(true);
if (isDebugEnabled) {
@@ -595,6 +597,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
}
try {
failedInputs = fetchInputs(input, callback);
+ if(failedInputs == null || failedInputs.length == 0) {
+ srcAttemptsRemaining.remove(inputAttemptIdentifier.toString());
+ }
} catch (FetcherReadTimeoutException e) {
//clean up connection
shutdownInternal(true);
@@ -635,6 +640,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
Iterator<Entry<String, InputAttemptIdentifier>> iterator = srcAttemptsRemaining.entrySet().iterator();
while (iterator.hasNext()) {
+ boolean hasFailures = false;
if (isShutDown.get()) {
if (isDebugEnabled) {
LOG.debug(
@@ -643,54 +649,67 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
break;
}
InputAttemptIdentifier srcAttemptId = iterator.next().getValue();
- long startTime = System.currentTimeMillis();
+ for (int curPartition = 0; curPartition < partitionCount; curPartition++) {
+ int reduceId = curPartition + partition;
+ srcAttemptId = pathToAttemptMap.get(new PathPartition(srcAttemptId.getPathComponent(), reduceId));
+ long startTime = System.currentTimeMillis();
- FetchedInput fetchedInput = null;
- try {
- TezIndexRecord idxRecord;
- // for missing files, this will throw an exception
- idxRecord = getTezIndexRecord(srcAttemptId);
-
- fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
- idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId,
- getShuffleInputFileName(srcAttemptId.getPathComponent(), null), conf,
- new FetchedInputCallback() {
- @Override
- public void fetchComplete(FetchedInput fetchedInput) {}
-
- @Override
- public void fetchFailed(FetchedInput fetchedInput) {}
-
- @Override
- public void freeResources(FetchedInput fetchedInput) {}
- });
- if (isDebugEnabled) {
- LOG.debug("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
- + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
- + " to " + fetchedInput.getType());
- }
-
- long endTime = System.currentTimeMillis();
- fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(),
- idxRecord.getRawLength(), (endTime - startTime));
- iterator.remove();
- } catch (IOException e) {
- cleanupFetchedInput(fetchedInput);
- if (isShutDown.get()) {
+ FetchedInput fetchedInput = null;
+ try {
+ TezIndexRecord idxRecord;
+ // for missing files, this will throw an exception
+ idxRecord = getTezIndexRecord(srcAttemptId, reduceId);
+
+ fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
+ idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId,
+ getShuffleInputFileName(srcAttemptId.getPathComponent(), null),
+ conf,
+ new FetchedInputCallback() {
+ @Override
+ public void fetchComplete(FetchedInput fetchedInput) {
+ }
+
+ @Override
+ public void fetchFailed(FetchedInput fetchedInput) {
+ }
+
+ @Override
+ public void freeResources(FetchedInput fetchedInput) {
+ }
+ });
if (isDebugEnabled) {
- LOG.debug(
- "Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId +
- " from host " +
- host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
+ LOG.debug("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
+ + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
+ + " to " + fetchedInput.getType());
+ }
+
+ long endTime = System.currentTimeMillis();
+ fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(),
+ idxRecord.getRawLength(), (endTime - startTime));
+ } catch (IOException e) {
+ hasFailures = true;
+ cleanupFetchedInput(fetchedInput);
+ if (isShutDown.get()) {
+ if (isDebugEnabled) {
+ LOG.debug(
+ "Already shutdown. Ignoring Local Fetch Failure for " +
+ srcAttemptId +
+ " from host " +
+ host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
+ }
+ break;
+ }
+ if (failMissing) {
+ LOG.warn(
+ "Failed to shuffle output of " + srcAttemptId + " from " +
+ host + "(local fetch)",
+ e);
}
- break;
- }
- if (failMissing) {
- LOG.warn(
- "Failed to shuffle output of " + srcAttemptId + " from " + host + "(local fetch)",
- e);
}
}
+ if(!hasFailures) {
+ iterator.remove();
+ }
}
InputAttemptIdentifier[] failedFetches = null;
@@ -713,7 +732,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
}
@VisibleForTesting
- protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId) throws
+ protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId, int partition) throws
IOException {
TezIndexRecord idxRecord;
Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(),
@@ -745,6 +764,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
return localDirAllocator.getLocalPathToRead(pathFromLocalDir, conf);
}
+ @VisibleForTesting
+ public Map<PathPartition, InputAttemptIdentifier> getPathToAttemptMap() {
+ return pathToAttemptMap;
+ }
+
static class HostFetchResult {
private final FetchResult fetchResult;
private final InputAttemptIdentifier[] failedInputs;
@@ -946,8 +970,6 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
compressedLength, decompressedLength, (endTime - startTime));
// Note successful shuffle
- srcAttemptsRemaining.remove(srcAttemptId.toString());
-
// metrics.successFetch();
}
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 3964431..a23ce72 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -117,7 +117,7 @@ public class ShuffleManager implements FetcherCallback {
private final BlockingQueue<FetchedInput> completedInputs;
private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
@VisibleForTesting
- final Set<Integer> completedInputSet;
+ final BitSet completedInputSet;
private final ConcurrentMap<HostPort, InputHost> knownSrcHosts;
private final BlockingQueue<InputHost> pendingHosts;
private final Set<InputAttemptIdentifier> obsoletedInputs;
@@ -217,7 +217,7 @@ public class ShuffleManager implements FetcherCallback {
this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
- completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>(numInputs));
+ completedInputSet = new BitSet(numInputs);
/**
* In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt.
* We do not know upfront the number of spills from source.
@@ -445,9 +445,17 @@ public class ShuffleManager implements FetcherCallback {
}
// Avoid adding attempts which have already completed.
- if (completedInputSet.contains(input.getInputIdentifier())) {
- inputIter.remove();
- continue;
+ if(input instanceof CompositeInputAttemptIdentifier) {
+ if (completedInputSet.nextClearBit(input.getInputIdentifier()) >=
+ input.getInputIdentifier() + ((CompositeInputAttemptIdentifier) input).getInputIdentifierCount()) {
+ inputIter.remove();
+ continue;
+ }
+ } else {
+ if (completedInputSet.get(input.getInputIdentifier())) {
+ inputIter.remove();
+ continue;
+ }
}
// Avoid adding attempts which have been marked as OBSOLETE
if (obsoletedInputs.contains(input)) {
@@ -531,9 +539,9 @@ public class ShuffleManager implements FetcherCallback {
LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
}
- if (!completedInputSet.contains(inputIdentifier)) {
+ if (!completedInputSet.get(inputIdentifier)) {
synchronized (completedInputSet) {
- if (!completedInputSet.contains(inputIdentifier)) {
+ if (!completedInputSet.get(inputIdentifier)) {
NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier);
if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
registerCompletedInput(fetchedInput);
@@ -631,9 +639,9 @@ public class ShuffleManager implements FetcherCallback {
inputContext.notifyProgress();
boolean committed = false;
- if (!completedInputSet.contains(inputIdentifier)) {
+ if (!completedInputSet.get(inputIdentifier)) {
synchronized (completedInputSet) {
- if (!completedInputSet.contains(inputIdentifier)) {
+ if (!completedInputSet.get(inputIdentifier)) {
fetchedInput.commit();
committed = true;
ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
@@ -710,7 +718,7 @@ public class ShuffleManager implements FetcherCallback {
private void adjustCompletedInputs(FetchedInput fetchedInput) {
lock.lock();
try {
- completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
+ completedInputSet.set(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
int numComplete = numCompletedInputs.incrementAndGet();
if (numComplete == numInputs) {
http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 17a065c..b031154 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -18,6 +18,8 @@
package org.apache.tez.runtime.library.common.shuffle;
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -31,6 +33,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
@@ -138,13 +141,13 @@ public class TestFetcher {
@Test(timeout = 3000)
public void testSetupLocalDiskFetch() throws Exception {
-
- InputAttemptIdentifier[] srcAttempts = {
- new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
- new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
- new InputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2"),
- new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3"),
- new InputAttemptIdentifier(4, 5, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4")
+
+ CompositeInputAttemptIdentifier[] srcAttempts = {
+ new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", 1),
+ new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", 1),
+ new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", 1),
+ new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", 1),
+ new CompositeInputAttemptIdentifier(4, 5, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", 1)
};
final int FIRST_FAILED_ATTEMPT_IDX = 2;
final int SECOND_FAILED_ATTEMPT_IDX = 4;
@@ -156,10 +159,25 @@ public class TestFetcher {
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
- false, true, false);
- builder.assignWork(HOST, PORT, partition, 1, Arrays.asList(srcAttempts));
+ false, true, true);
+ ArrayList<InputAttemptIdentifier> inputAttemptIdentifiers = new ArrayList<>();
+ for(CompositeInputAttemptIdentifier compositeInputAttemptIdentifier : srcAttempts) {
+ for(int i=0;i<compositeInputAttemptIdentifier.getInputIdentifierCount();i++) {
+ inputAttemptIdentifiers.add(compositeInputAttemptIdentifier.expand(i));
+ }
+ }
+ ArrayList<InputAttemptIdentifier> list = new ArrayList<InputAttemptIdentifier>();
+ list.addAll(Arrays.asList(srcAttempts));
+ builder.assignWork(HOST, PORT, partition, 1, list);
Fetcher fetcher = spy(builder.build());
-
+ for(CompositeInputAttemptIdentifier compositeInputAttemptIdentifier : srcAttempts) {
+ for(int i=0;i<compositeInputAttemptIdentifier.getInputIdentifierCount();i++) {
+ inputAttemptIdentifiers.add(compositeInputAttemptIdentifier.expand(i));
+ Fetcher.PathPartition pathPartition =
+ new Fetcher.PathPartition(compositeInputAttemptIdentifier.getPathComponent(),partition + i);
+ fetcher.getPathToAttemptMap().put(pathPartition, compositeInputAttemptIdentifier.expand(i));
+ }
+ }
doAnswer(new Answer<Path>() {
@Override
public Path answer(InvocationOnMock invocation) throws Throwable {
@@ -183,7 +201,7 @@ public class TestFetcher {
// match with params for copySucceeded below.
return new TezIndexRecord(p * 10, p * 1000, p * 100);
}
- }).when(fetcher).getTezIndexRecord(any(InputAttemptIdentifier.class));
+ }).when(fetcher).getTezIndexRecord(any(InputAttemptIdentifier.class), anyInt());
doNothing().when(fetcher).shutdown();
doNothing().when(callback).fetchSucceeded(anyString(), any(InputAttemptIdentifier.class),
@@ -214,14 +232,14 @@ public class TestFetcher {
srcAttempts[SECOND_FAILED_ATTEMPT_IDX]);
}
- protected void verifyFetchSucceeded(FetcherCallback callback, InputAttemptIdentifier srcAttempId, Configuration conf) throws IOException {
+ protected void verifyFetchSucceeded(FetcherCallback callback, CompositeInputAttemptIdentifier srcAttempId, Configuration conf) throws IOException {
String pathComponent = srcAttempId.getPathComponent();
int len = pathComponent.length();
long p = Long.valueOf(pathComponent.substring(len - 1, len));
ArgumentCaptor<LocalDiskFetchedInput> capturedFetchedInput =
ArgumentCaptor.forClass(LocalDiskFetchedInput.class);
verify(callback)
- .fetchSucceeded(eq(HOST), eq(srcAttempId), capturedFetchedInput.capture(), eq(p * 100),
+ .fetchSucceeded(eq(HOST), eq(srcAttempId.expand(0)), capturedFetchedInput.capture(), eq(p * 100),
eq(p * 1000), anyLong());
LocalDiskFetchedInput f = capturedFetchedInput.getValue();
Assert.assertEquals("success callback filename", f.getInputFile().toString(),
@@ -230,7 +248,7 @@ public class TestFetcher {
Assert.assertEquals("success callback filesystem", f.getStartOffset(), p * 10);
Assert.assertEquals("success callback raw size", f.getActualSize(), p * 1000);
Assert.assertEquals("success callback compressed size", f.getCompressedSize(), p * 100);
- Assert.assertEquals("success callback input id", f.getInputAttemptIdentifier(), srcAttempId);
+ Assert.assertEquals("success callback input id", f.getInputAttemptIdentifier(), srcAttempId.expand(0));
Assert.assertEquals("success callback type", f.getType(), FetchedInput.Type.DISK_DIRECT);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index f026cb2..b3b8688 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -220,7 +220,7 @@ public class TestShuffleManager {
}
public int getNumOfCompletedInputs() {
- return completedInputSet.size();
+ return completedInputSet.cardinality();
}
boolean isFetcherExecutorShutdown() {
http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 3686d17..54b0279 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -107,7 +107,7 @@ public class TestFetcher {
static final Logger LOG = LoggerFactory.getLogger(TestFetcher.class);
- @Test (timeout = 5000)
+ @Test(timeout = 5000)
public void testInputsReturnedOnConnectionException() throws Exception {
Configuration conf = new TezConfiguration();
ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
@@ -333,7 +333,7 @@ public class TestFetcher {
verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
}
- @Test(timeout = 5000000)
+ @Test(timeout = 5000)
public void testSetupLocalDiskFetchAutoReduce() throws Exception {
Configuration conf = new TezConfiguration();
ShuffleScheduler scheduler = mock(ShuffleScheduler.class);