You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/06/30 23:13:41 UTC
tez git commit: TEZ-3293. Fetch failures can cause a shuffle hang
waiting for memory merge that never starts. Contributed by Jason Lowe.
Repository: tez
Updated Branches:
refs/heads/master ac9cfb9c9 -> 71bb2defe
TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory
merge that never starts. Contributed by Jason Lowe.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/71bb2def
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/71bb2def
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/71bb2def
Branch: refs/heads/master
Commit: 71bb2defe97e55e3bf7dbb299fe33ab8a667e7a1
Parents: ac9cfb9
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jun 30 16:13:23 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jun 30 16:13:23 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../FetchedInputAllocatorOrderedGrouped.java | 1 +
.../shuffle/orderedgrouped/InMemoryReader.java | 2 +-
.../shuffle/orderedgrouped/MergeManager.java | 7 +++++-
.../orderedgrouped/TestMergeManager.java | 26 ++++++++++++++++++++
5 files changed, 37 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/71bb2def/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 29bede2..ebc4e79 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
TEZ-3314. Double counting input bytes in MultiMRInput.
TEZ-3308. Add counters to capture input split length.
TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout.
@@ -69,6 +70,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
TEZ-3314. Double counting input bytes in MultiMRInput.
TEZ-3308. Add counters to capture input split length.
TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout.
@@ -526,6 +528,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
TEZ-3304. TestHistoryParser fails with Hadoop 2.7.
TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements
http://git-wip-us.apache.org/repos/asf/tez/blob/71bb2def/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
index 7276f74..e145632 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
@@ -36,4 +36,5 @@ public interface FetchedInputAllocatorOrderedGrouped {
void unreserve(long bytes);
+ void releaseCommittedMemory(long bytes);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/71bb2def/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
index 7860377..12fe057 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
@@ -258,7 +258,7 @@ public class InMemoryReader extends Reader {
buffer = null;
// Inform the MergeManager
if (merger != null) {
- merger.unreserve(bufferSize);
+ merger.releaseCommittedMemory(bufferSize);
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/71bb2def/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 9e2fbd4..26bdca7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -451,7 +451,6 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
@Override
public synchronized void unreserve(long size) {
- commitMemory -= size;
usedMemory -= size;
if (LOG.isDebugEnabled()) {
LOG.debug("Notifying unreserve : size=" + size + ", commitMemory=" + commitMemory + ", usedMemory=" + usedMemory
@@ -461,6 +460,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
}
@Override
+ public synchronized void releaseCommittedMemory(long size) {
+ commitMemory -= size;
+ unreserve(size);
+ }
+
+ @Override
public synchronized void closeInMemoryFile(MapOutput mapOutput) {
inMemoryMapOutputs.add(mapOutput);
LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
http://git-wip-us.apache.org/repos/asf/tez/blob/71bb2def/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 9eb4cae..9209ff4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -178,6 +178,32 @@ public class TestMergeManager {
Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable);
}
+ @Test(timeout = 10000)
+ public void testReservationAccounting() throws IOException {
+ Configuration conf = new TezConfiguration(defaultConf);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
+ MergeManager mergeManager =
+ new MergeManager(conf, localFs, null, inputContext, null, null, null, null,
+ mock(ExceptionReporter.class), 2000000, null, false, -1);
+ mergeManager.configureAndStart();
+ assertEquals(0, mergeManager.getUsedMemory());
+ assertEquals(0, mergeManager.getCommitMemory());
+ MapOutput mapOutput = mergeManager.reserve(null, 1, 1, 0);
+ assertEquals(1, mergeManager.getUsedMemory());
+ assertEquals(0, mergeManager.getCommitMemory());
+ mapOutput.abort();
+ assertEquals(0, mergeManager.getUsedMemory());
+ assertEquals(0, mergeManager.getCommitMemory());
+ mapOutput = mergeManager.reserve(null, 2, 2, 0);
+ mergeManager.closeInMemoryFile(mapOutput);
+ assertEquals(2, mergeManager.getUsedMemory());
+ assertEquals(2, mergeManager.getCommitMemory());
+ mergeManager.releaseCommittedMemory(2);
+ assertEquals(0, mergeManager.getUsedMemory());
+ assertEquals(0, mergeManager.getCommitMemory());
+ }
+
@Test(timeout=20000)
public void testIntermediateMemoryMergeAccounting() throws Exception {
Configuration conf = new TezConfiguration(defaultConf);