You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/04/25 20:35:06 UTC

tez git commit: TEZ-3685. ShuffleHandler completedInputSet off-by-one error (jeagles)

Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 e5d01a60a -> d283063c5


TEZ-3685. ShuffleHandler completedInputSet off-by-one error (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d283063c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d283063c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d283063c

Branch: refs/heads/TEZ-3334
Commit: d283063c5778bdb1db0046a4096143cf0518c667
Parents: e5d01a6
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Tue Apr 25 15:34:59 2017 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Tue Apr 25 15:34:59 2017 -0500

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |   1 +
 .../common/shuffle/impl/ShuffleManager.java     | 127 +++++++------------
 .../orderedgrouped/ShuffleScheduler.java        |   2 +-
 3 files changed, 51 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d283063c/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 61473e0..e7f0211 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3685. ShuffleHandler completedInputSet off-by-one error
   TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback
   TEZ-3683. LocalContainerLauncher#shouldDelete member variable is not used
   TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler

http://git-wip-us.apache.org/repos/asf/tez/blob/d283063c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 3436fc7..57cf4d0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -444,20 +444,17 @@ public class ShuffleManager implements FetcherCallback {
       }
 
       // Avoid adding attempts which have already completed.
-      if(input instanceof CompositeInputAttemptIdentifier) {
-        if (completedInputSet.nextClearBit(input.getInputIdentifier()) >=
-            input.getInputIdentifier() + ((CompositeInputAttemptIdentifier) input).getInputIdentifierCount()) {
-          inputIter.remove();
-          continue;
-        }
+      boolean alreadyCompleted;
+      if (input instanceof CompositeInputAttemptIdentifier) {
+        CompositeInputAttemptIdentifier compositeInput = (CompositeInputAttemptIdentifier)input;
+        int nextClearBit = completedInputSet.nextClearBit(compositeInput.getInputIdentifier());
+        int maxClearBit = compositeInput.getInputIdentifier() + compositeInput.getInputIdentifierCount();
+        alreadyCompleted = nextClearBit > maxClearBit;
       } else {
-        if (completedInputSet.get(input.getInputIdentifier())) {
-          inputIter.remove();
-          continue;
-        }
+          alreadyCompleted = completedInputSet.get(input.getInputIdentifier());
       }
-      // Avoid adding attempts which have been marked as OBSOLETE 
-      if (obsoletedInputs.contains(input)) {
+      // Avoid adding attempts which have already completed or have been marked as OBSOLETE
+      if (alreadyCompleted || obsoletedInputs.contains(input)) {
         inputIter.remove();
         continue;
       }
@@ -537,23 +534,17 @@ public class ShuffleManager implements FetcherCallback {
     if (LOG.isDebugEnabled()) {
       LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
     }
-    
-    if (!completedInputSet.get(inputIdentifier)) {
-      synchronized (completedInputSet) {
-        if (!completedInputSet.get(inputIdentifier)) {
-          NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier);
-          if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
-            registerCompletedInput(fetchedInput);
-          } else {
-            registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
-          }
-        }
-      }
-    }
-
-    // Awake the loop to check for termination.
     lock.lock();
     try {
+      if (!completedInputSet.get(inputIdentifier)) {
+        NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier);
+        if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
+          registerCompletedInput(fetchedInput);
+        } else {
+          registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
+        }
+      }
+      // Awake the loop to check for termination.
       wakeLoop.signal();
     } finally {
       lock.unlock();
@@ -570,7 +561,7 @@ public class ShuffleManager implements FetcherCallback {
     lastEventReceived.setValue(relativeTime);
   }
 
-  public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
+  void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
     obsoletedInputs.add(srcAttemptIdentifier);
     // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
   }
@@ -632,60 +623,40 @@ public class ShuffleManager implements FetcherCallback {
     lock.lock();
     try {
       lastProgressTime = System.currentTimeMillis();
-    } finally {
-      lock.unlock();
-    }
-    
-    inputContext.notifyProgress();
-    boolean committed = false;
-    if (!completedInputSet.get(inputIdentifier)) {
-      synchronized (completedInputSet) {
-        if (!completedInputSet.get(inputIdentifier)) {
-          fetchedInput.commit();
-          committed = true;
-          ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
-              fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
-
-          // Processing counters for completed and commit fetches only. Need
-          // additional counters for excessive fetches - which primarily comes
-          // in after speculation or retries.
-          shuffledInputsCounter.increment(1);
-          bytesShuffledCounter.increment(fetchedBytes);
-          if (fetchedInput.getType() == Type.MEMORY) {
-            bytesShuffledToMemCounter.increment(fetchedBytes);
-          } else if (fetchedInput.getType() == Type.DISK) {
-            bytesShuffledToDiskCounter.increment(fetchedBytes);
-          } else if (fetchedInput.getType() == Type.DISK_DIRECT) {
-            bytesShuffledDirectDiskCounter.increment(fetchedBytes);
-          }
-          decompressedDataSizeCounter.increment(decompressedLength);
-
-          if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
-            registerCompletedInput(fetchedInput);
-          } else {
-            registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
-          }
+      inputContext.notifyProgress();
+      if (!completedInputSet.get(inputIdentifier)) {
+        fetchedInput.commit();
+        ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
+            fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
+
+        // Processing counters for completed and commit fetches only. Need
+        // additional counters for excessive fetches - which primarily comes
+        // in after speculation or retries.
+        shuffledInputsCounter.increment(1);
+        bytesShuffledCounter.increment(fetchedBytes);
+        if (fetchedInput.getType() == Type.MEMORY) {
+          bytesShuffledToMemCounter.increment(fetchedBytes);
+        } else if (fetchedInput.getType() == Type.DISK) {
+          bytesShuffledToDiskCounter.increment(fetchedBytes);
+        } else if (fetchedInput.getType() == Type.DISK_DIRECT) {
+          bytesShuffledDirectDiskCounter.increment(fetchedBytes);
+        }
+        decompressedDataSizeCounter.increment(decompressedLength);
 
-          lock.lock();
-          try {
-            totalBytesShuffledTillNow += fetchedBytes;
-            logProgress();
-          } finally {
-            lock.unlock();
-          }
+        if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
+          registerCompletedInput(fetchedInput);
+        } else {
+          registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
         }
-      }
-    }
-    if (!committed) {
-      fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
-    } else {
-      lock.lock();
-      try {
-        // Signal the wakeLoop to check for termination.
+
+        totalBytesShuffledTillNow += fetchedBytes;
+        logProgress();
         wakeLoop.signal();
-      } finally {
-        lock.unlock();
+      } else {
+        fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
       }
+    } finally {
+      lock.unlock();
     }
     // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/d283063c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 6e42bca..b6171a3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -1301,7 +1301,7 @@ class ShuffleScheduler {
 
   boolean isInputFinished(int inputIndex, int inputEnd) {
     synchronized (finishedMaps) {
-      return finishedMaps.nextClearBit(inputIndex) >= inputEnd;
+      return finishedMaps.nextClearBit(inputIndex) > inputEnd;
     }
   }