You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/04/25 20:35:06 UTC
tez git commit: TEZ-3685. ShuffleHandler completedInputSet off-by-one
error (jeagles)
Repository: tez
Updated Branches:
refs/heads/TEZ-3334 e5d01a60a -> d283063c5
TEZ-3685. ShuffleHandler completedInputSet off-by-one error (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d283063c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d283063c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d283063c
Branch: refs/heads/TEZ-3334
Commit: d283063c5778bdb1db0046a4096143cf0518c667
Parents: e5d01a6
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Tue Apr 25 15:34:59 2017 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Tue Apr 25 15:34:59 2017 -0500
----------------------------------------------------------------------
TEZ-3334-CHANGES.txt | 1 +
.../common/shuffle/impl/ShuffleManager.java | 127 +++++++------------
.../orderedgrouped/ShuffleScheduler.java | 2 +-
3 files changed, 51 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d283063c/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 61473e0..e7f0211 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
INCOMPATIBLE CHANGES:
ALL CHANGES:
+ TEZ-3685. ShuffleHandler completedInputSet off-by-one error
TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback
TEZ-3683. LocalContainerLauncher#shouldDelete member variable is not used
TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler
http://git-wip-us.apache.org/repos/asf/tez/blob/d283063c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 3436fc7..57cf4d0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -444,20 +444,17 @@ public class ShuffleManager implements FetcherCallback {
}
// Avoid adding attempts which have already completed.
- if(input instanceof CompositeInputAttemptIdentifier) {
- if (completedInputSet.nextClearBit(input.getInputIdentifier()) >=
- input.getInputIdentifier() + ((CompositeInputAttemptIdentifier) input).getInputIdentifierCount()) {
- inputIter.remove();
- continue;
- }
+ boolean alreadyCompleted;
+ if (input instanceof CompositeInputAttemptIdentifier) {
+ CompositeInputAttemptIdentifier compositeInput = (CompositeInputAttemptIdentifier)input;
+ int nextClearBit = completedInputSet.nextClearBit(compositeInput.getInputIdentifier());
+ int maxClearBit = compositeInput.getInputIdentifier() + compositeInput.getInputIdentifierCount();
+ alreadyCompleted = nextClearBit > maxClearBit;
} else {
- if (completedInputSet.get(input.getInputIdentifier())) {
- inputIter.remove();
- continue;
- }
+ alreadyCompleted = completedInputSet.get(input.getInputIdentifier());
}
- // Avoid adding attempts which have been marked as OBSOLETE
- if (obsoletedInputs.contains(input)) {
+ // Avoid adding attempts which have already completed or have been marked as OBSOLETE
+ if (alreadyCompleted || obsoletedInputs.contains(input)) {
inputIter.remove();
continue;
}
@@ -537,23 +534,17 @@ public class ShuffleManager implements FetcherCallback {
if (LOG.isDebugEnabled()) {
LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
}
-
- if (!completedInputSet.get(inputIdentifier)) {
- synchronized (completedInputSet) {
- if (!completedInputSet.get(inputIdentifier)) {
- NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier);
- if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
- registerCompletedInput(fetchedInput);
- } else {
- registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
- }
- }
- }
- }
-
- // Awake the loop to check for termination.
lock.lock();
try {
+ if (!completedInputSet.get(inputIdentifier)) {
+ NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier);
+ if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
+ registerCompletedInput(fetchedInput);
+ } else {
+ registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
+ }
+ }
+ // Awake the loop to check for termination.
wakeLoop.signal();
} finally {
lock.unlock();
@@ -570,7 +561,7 @@ public class ShuffleManager implements FetcherCallback {
lastEventReceived.setValue(relativeTime);
}
- public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
+ void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
obsoletedInputs.add(srcAttemptIdentifier);
// TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
}
@@ -632,60 +623,40 @@ public class ShuffleManager implements FetcherCallback {
lock.lock();
try {
lastProgressTime = System.currentTimeMillis();
- } finally {
- lock.unlock();
- }
-
- inputContext.notifyProgress();
- boolean committed = false;
- if (!completedInputSet.get(inputIdentifier)) {
- synchronized (completedInputSet) {
- if (!completedInputSet.get(inputIdentifier)) {
- fetchedInput.commit();
- committed = true;
- ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
- fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
-
- // Processing counters for completed and commit fetches only. Need
- // additional counters for excessive fetches - which primarily comes
- // in after speculation or retries.
- shuffledInputsCounter.increment(1);
- bytesShuffledCounter.increment(fetchedBytes);
- if (fetchedInput.getType() == Type.MEMORY) {
- bytesShuffledToMemCounter.increment(fetchedBytes);
- } else if (fetchedInput.getType() == Type.DISK) {
- bytesShuffledToDiskCounter.increment(fetchedBytes);
- } else if (fetchedInput.getType() == Type.DISK_DIRECT) {
- bytesShuffledDirectDiskCounter.increment(fetchedBytes);
- }
- decompressedDataSizeCounter.increment(decompressedLength);
-
- if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
- registerCompletedInput(fetchedInput);
- } else {
- registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
- }
+ inputContext.notifyProgress();
+ if (!completedInputSet.get(inputIdentifier)) {
+ fetchedInput.commit();
+ ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
+ fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
+
+ // Processing counters for completed and commit fetches only. Need
+ // additional counters for excessive fetches - which primarily comes
+ // in after speculation or retries.
+ shuffledInputsCounter.increment(1);
+ bytesShuffledCounter.increment(fetchedBytes);
+ if (fetchedInput.getType() == Type.MEMORY) {
+ bytesShuffledToMemCounter.increment(fetchedBytes);
+ } else if (fetchedInput.getType() == Type.DISK) {
+ bytesShuffledToDiskCounter.increment(fetchedBytes);
+ } else if (fetchedInput.getType() == Type.DISK_DIRECT) {
+ bytesShuffledDirectDiskCounter.increment(fetchedBytes);
+ }
+ decompressedDataSizeCounter.increment(decompressedLength);
- lock.lock();
- try {
- totalBytesShuffledTillNow += fetchedBytes;
- logProgress();
- } finally {
- lock.unlock();
- }
+ if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
+ registerCompletedInput(fetchedInput);
+ } else {
+ registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
}
- }
- }
- if (!committed) {
- fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
- } else {
- lock.lock();
- try {
- // Signal the wakeLoop to check for termination.
+
+ totalBytesShuffledTillNow += fetchedBytes;
+ logProgress();
wakeLoop.signal();
- } finally {
- lock.unlock();
+ } else {
+ fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
}
+ } finally {
+ lock.unlock();
}
// TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d283063c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 6e42bca..b6171a3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -1301,7 +1301,7 @@ class ShuffleScheduler {
boolean isInputFinished(int inputIndex, int inputEnd) {
synchronized (finishedMaps) {
- return finishedMaps.nextClearBit(inputIndex) >= inputEnd;
+ return finishedMaps.nextClearBit(inputIndex) > inputEnd;
}
}