You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/06/30 23:13:41 UTC

tez git commit: TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts. Contributed by Jason Lowe.

Repository: tez
Updated Branches:
  refs/heads/master ac9cfb9c9 -> 71bb2defe


TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory
merge that never starts. Contributed by Jason Lowe.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/71bb2def
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/71bb2def
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/71bb2def

Branch: refs/heads/master
Commit: 71bb2defe97e55e3bf7dbb299fe33ab8a667e7a1
Parents: ac9cfb9
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jun 30 16:13:23 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jun 30 16:13:23 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 .../FetchedInputAllocatorOrderedGrouped.java    |  1 +
 .../shuffle/orderedgrouped/InMemoryReader.java  |  2 +-
 .../shuffle/orderedgrouped/MergeManager.java    |  7 +++++-
 .../orderedgrouped/TestMergeManager.java        | 26 ++++++++++++++++++++
 5 files changed, 37 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/71bb2def/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 29bede2..ebc4e79 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
   TEZ-3314. Double counting input bytes in MultiMRInput.
   TEZ-3308. Add counters to capture input split length.
   TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout.
@@ -69,6 +70,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
   TEZ-3314. Double counting input bytes in MultiMRInput.
   TEZ-3308. Add counters to capture input split length.
   TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout.
@@ -526,6 +528,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
   TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
   TEZ-3304. TestHistoryParser fails with Hadoop 2.7.
   TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements

http://git-wip-us.apache.org/repos/asf/tez/blob/71bb2def/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
index 7276f74..e145632 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
@@ -36,4 +36,5 @@ public interface FetchedInputAllocatorOrderedGrouped {
 
   void unreserve(long bytes);
 
+  void releaseCommittedMemory(long bytes);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/71bb2def/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
index 7860377..12fe057 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
@@ -258,7 +258,7 @@ public class InMemoryReader extends Reader {
     buffer = null;
     // Inform the MergeManager
     if (merger != null) {
-      merger.unreserve(bufferSize);
+      merger.releaseCommittedMemory(bufferSize);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/71bb2def/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 9e2fbd4..26bdca7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -451,7 +451,6 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
 
   @Override
   public synchronized void unreserve(long size) {
-    commitMemory -= size;
     usedMemory -= size;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Notifying unreserve : size=" + size + ", commitMemory=" + commitMemory + ", usedMemory=" + usedMemory
@@ -461,6 +460,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
   }
 
   @Override
+  public synchronized void releaseCommittedMemory(long size) {
+    commitMemory -= size;
+    unreserve(size);
+  }
+
+  @Override
   public synchronized void closeInMemoryFile(MapOutput mapOutput) { 
     inMemoryMapOutputs.add(mapOutput);
     LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()

http://git-wip-us.apache.org/repos/asf/tez/blob/71bb2def/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 9eb4cae..9209ff4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -178,6 +178,32 @@ public class TestMergeManager {
     Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable);
   }
 
+  @Test(timeout = 10000)
+  public void testReservationAccounting() throws IOException {
+    Configuration conf = new TezConfiguration(defaultConf);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
+    MergeManager mergeManager =
+        new MergeManager(conf, localFs, null, inputContext, null, null, null, null,
+        mock(ExceptionReporter.class), 2000000, null, false, -1);
+    mergeManager.configureAndStart();
+    assertEquals(0, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+    MapOutput mapOutput = mergeManager.reserve(null, 1, 1, 0);
+    assertEquals(1, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+    mapOutput.abort();
+    assertEquals(0, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+    mapOutput = mergeManager.reserve(null, 2, 2, 0);
+    mergeManager.closeInMemoryFile(mapOutput);
+    assertEquals(2, mergeManager.getUsedMemory());
+    assertEquals(2, mergeManager.getCommitMemory());
+    mergeManager.releaseCommittedMemory(2);
+    assertEquals(0, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+  }
+
   @Test(timeout=20000)
   public void testIntermediateMemoryMergeAccounting() throws Exception {
     Configuration conf = new TezConfiguration(defaultConf);