You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/02/29 17:12:19 UTC
[1/3] tez git commit: TEZ-1911. MergeManager's unconditionalReserve()
should check for memory limits before allocating memory to
IntermediateMemoryToMemoryMerger (rbalamohan) (cherry picked from commit
15d7339e9fb64cdd0d995da9832dff721c14eacf)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 1832554c7 -> 0c4d0cbfc
TEZ-1911. MergeManager's unconditionalReserve() should check for memory limits before allocating memory to IntermediateMemoryToMemoryMerger (rbalamohan)
(cherry picked from commit 15d7339e9fb64cdd0d995da9832dff721c14eacf)
Conflicts:
CHANGES.txt
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2238443d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2238443d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2238443d
Branch: refs/heads/branch-0.7
Commit: 2238443d03612090d0dcf27ec941fbb5ac9d6747
Parents: 1832554
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Feb 29 15:56:35 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Feb 29 15:56:35 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../shuffle/orderedgrouped/MergeManager.java | 63 +++++-
.../orderedgrouped/TestMergeManager.java | 216 +++++++++++++++++++
3 files changed, 269 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/2238443d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 169b1f4..92d9de8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES:
+ TEZ-1911. MergeManager's unconditionalReserve() should check for memory limits before allocating.
TEZ-3141. mapreduce.task.timeout is not translated to container heartbeat timeout
TEZ-3128. Avoid stopping containers on the AM shutdown thread.
TEZ-3129. Tez task and task attempt UI needs application fails with NotFoundException
http://git-wip-us.apache.org/repos/asf/tez/blob/2238443d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index def8175..4a31d06 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -63,7 +64,6 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
-import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
/**
* Usage. Create instance. setInitialMemoryAvailable(long), configureAndStart()
@@ -90,11 +90,13 @@ public class MergeManager {
};
private final Combiner combiner;
- private final Set<MapOutput> inMemoryMergedMapOutputs =
+ @VisibleForTesting
+ final Set<MapOutput> inMemoryMergedMapOutputs =
new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
private final IntermediateMemoryToMemoryMerger memToMemMerger;
- private final Set<MapOutput> inMemoryMapOutputs =
+ @VisibleForTesting
+ final Set<MapOutput> inMemoryMapOutputs =
new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
private final InMemoryMerger inMemoryMerger;
@@ -586,19 +588,58 @@ public class MergeManager {
InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
List<Segment> inMemorySegments = new ArrayList<Segment>();
- long mergeOutputSize =
- createInMemorySegments(inputs, inMemorySegments, 0);
+
+ MapOutput mergedMapOutputs = null;
+
+ long mergeOutputSize = 0l;
+ //Lock manager so that fetcher threads can not change the mem size
+ synchronized (manager) {
+
+ Iterator<MapOutput> it = inputs.iterator();
+ while(it.hasNext() && !Thread.currentThread().isInterrupted()) {
+ MapOutput mo = it.next();
+ if ((mergeOutputSize + mo.getSize() + usedMemory) > memoryLimit) {
+ //Search for smaller segments that can fit into existing mem
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Size is greater than usedMemory. "
+ + "mergeOutputSize=" + mergeOutputSize
+ + ", moSize=" + mo.getSize()
+ + ", usedMemory=" + usedMemory
+ + ", memoryLimit=" + memoryLimit);
+ }
+ continue;
+ } else {
+ mergeOutputSize += mo.getSize();
+ IFile.Reader reader = new InMemoryReader(MergeManager.this,
+ mo.getAttemptIdentifier(), mo.getMemory(), 0, mo.getMemory().length);
+ inMemorySegments.add(new Segment(reader, true,
+ (mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null)));
+ it.remove();
+ LOG.debug("Added segment for merging. mergeOutputSize=" + mergeOutputSize);
+ }
+ }
+
+ //Add any unused MapOutput back
+ inMemoryMapOutputs.addAll(inputs);
+
+ if (inMemorySegments.size() <= 1) {
+ return; //no need to proceed further.
+ }
+
+ mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, false);
+ }
+
int noInMemorySegments = inMemorySegments.size();
- MapOutput mergedMapOutputs =
- unconditionalReserve(dummyMapId, mergeOutputSize, false);
-
- Writer writer =
- new InMemoryWriter(mergedMapOutputs.getArrayStream());
+ Writer writer = new InMemoryWriter(mergedMapOutputs.getArrayStream());
LOG.info(inputContext.getSourceVertexName() + ": " + "Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
+ if (Thread.currentThread().isInterrupted()) {
+ return; // early exit
+ }
+
// Nothing will be materialized to disk because the sort factor is being
// set to the number of in memory segments.
// TODO Is this doing any combination ?
@@ -615,7 +656,7 @@ public class MergeManager {
LOG.info(inputContext.getSourceVertexName() +
" Memory-to-Memory merge of the " + noInMemorySegments +
- " files in-memory complete.");
+ " files in-memory complete with mergeOutputSize=" + mergeOutputSize);
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
http://git-wip-us.apache.org/repos/asf/tez/blob/2238443d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 72273e0..1a8cb7a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -228,6 +228,222 @@ public class TestMergeManager {
assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
}
+ @Test(timeout = 60000l)
+ public void testIntermediateMemoryMerge() throws Throwable {
+ Configuration conf = new TezConfiguration(defaultConf);
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, true);
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 3);
+
+ Path localDir = new Path(workDir, "local");
+ Path srcDir = new Path(workDir, "srcData");
+ localFs.mkdirs(localDir);
+ localFs.mkdirs(srcDir);
+
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString());
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ LocalDirAllocator localDirAllocator =
+ new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+ InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
+
+ ExceptionReporter exceptionReporter = mock(ExceptionReporter.class);
+
+ MergeManager mergeManager =
+ new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null,
+ exceptionReporter, 2000000, null, false, -1);
+ mergeManager.configureAndStart();
+
+ assertEquals(0, mergeManager.getUsedMemory());
+ assertEquals(0, mergeManager.getCommitMemory());
+
+ /**
+ * Test #1
+ * - Have 4 segments where all of them can fit into memory.
+ * - After 3 segment commits, it would trigger mem-to-mem merge.
+ * - All of them can be merged in memory.
+ */
+ byte[] data1 = generateDataBySize(conf, 10);
+ byte[] data2 = generateDataBySize(conf, 20);
+ byte[] data3 = generateDataBySize(conf, 200);
+ byte[] data4 = generateDataBySize(conf, 20000);
+
+ MapOutput mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
+ MapOutput mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
+ MapOutput mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
+ MapOutput mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+
+ assertEquals(MapOutput.Type.MEMORY, mo1.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo2.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo3.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo4.getType());
+ assertEquals(0, mergeManager.getCommitMemory());
+
+ //size should be ~20230.
+ assertEquals(data1.length + data2.length + data3.length + data4.length,
+ mergeManager.getUsedMemory());
+
+
+ System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
+ System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
+ System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
+ System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
+
+ //Committing 3 segments should trigger mem-to-mem merge
+ mo1.commit();
+ mo2.commit();
+ mo3.commit();
+ mo4.commit();
+
+ //Wait for mem-to-mem to complete
+ mergeManager.waitForMemToMemMerge();
+
+ assertEquals(1, mergeManager.inMemoryMergedMapOutputs.size());
+ assertEquals(1, mergeManager.inMemoryMapOutputs.size());
+
+ mergeManager.close();
+
+
+ /**
+ * Test #2
+ * - Have 4 segments where all of them can fit into memory, but one of
+ * them would be big enough that it can not be fit in memory during
+ * mem-to-mem merging.
+ *
+ * - After 3 segment commits, it would trigger mem-to-mem merge.
+ * - Smaller segments which can be fit in additional memory allocated gets
+ * merged.
+ */
+ mergeManager =
+ new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null,
+ exceptionReporter, 2000000, null, false, -1);
+ mergeManager.configureAndStart();
+
+ //Single shuffle limit is 25% of 2000000
+ data1 = generateDataBySize(conf, 10);
+ data2 = generateDataBySize(conf, 400000);
+ data3 = generateDataBySize(conf, 400000);
+ data4 = generateDataBySize(conf, 400000);
+
+ mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
+ mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
+ mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
+ mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+
+ assertEquals(MapOutput.Type.MEMORY, mo1.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo2.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo3.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo4.getType());
+ assertEquals(0, mergeManager.getCommitMemory());
+
+ assertEquals(data1.length + data2.length + data3.length + data4.length,
+ mergeManager.getUsedMemory());
+
+ System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
+ System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
+ System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
+ System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
+
+ //Committing 3 segments should trigger mem-to-mem merge
+ mo1.commit();
+ mo2.commit();
+ mo3.commit();
+ mo4.commit();
+
+ //Wait for mem-to-mem to complete
+ mergeManager.waitForMemToMemMerge();
+
+ /**
+ * Already all segments are in memory which is around 120000. It
+ * would not be able to allocate more than 800000 for mem-to-mem. So it
+ * would pick up only 2 small segments which can be accomodated within
+ * 800000.
+ */
+ assertEquals(1, mergeManager.inMemoryMergedMapOutputs.size());
+ assertEquals(2, mergeManager.inMemoryMapOutputs.size());
+
+ mergeManager.close();
+
+ /**
+ * Test #3
+ * - Set number of segments for merging to 4.
+ * - Have 4 in-memory segments of size 400000 each
+ * - Committing 4 segments would trigger mem-to-mem
+ * - But none of them can be merged as there is no enough head room for
+ * merging in memory.
+ */
+ mergeManager =
+ new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null,
+ exceptionReporter, 2000000, null, false, -1);
+ mergeManager.configureAndStart();
+
+ //Single shuffle limit is 25% of 2000000
+ data1 = generateDataBySize(conf, 400000);
+ data2 = generateDataBySize(conf, 400000);
+ data3 = generateDataBySize(conf, 400000);
+ data4 = generateDataBySize(conf, 400000);
+
+ mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
+ mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
+ mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
+ mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+
+ assertEquals(MapOutput.Type.MEMORY, mo1.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo2.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo3.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo4.getType());
+ assertEquals(0, mergeManager.getCommitMemory());
+
+ assertEquals(data1.length + data2.length + data3.length + data4.length,
+ mergeManager.getUsedMemory());
+
+ System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
+ System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
+ System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
+ System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
+
+ //Committing 3 segments should trigger mem-to-mem merge
+ mo1.commit();
+ mo2.commit();
+ mo3.commit();
+ mo4.commit();
+
+ //Wait for mem-to-mem to complete
+ mergeManager.waitForMemToMemMerge();
+
+ // None of them can be merged as new mem needed for mem-to-mem can't
+ // accomodate any segements
+ assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
+ assertEquals(4, mergeManager.inMemoryMapOutputs.size());
+
+ mergeManager.close();
+
+ }
+
+ private byte[] generateDataBySize(Configuration conf, int rawLen) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+ IFile.Writer writer =
+ new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
+ int i = 0;
+ while(true) {
+ writer.append(new IntWritable(i), new IntWritable(i));
+ i++;
+ if (writer.getRawLength() > rawLen) {
+ break;
+ }
+ }
+ writer.close();
+ int compressedLength = (int)writer.getCompressedLength();
+ int rawLength = (int)writer.getRawLength();
+ byte[] data = new byte[rawLength];
+ ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()),
+ rawLength, compressedLength, null, false, 0, LOG, "sometask");
+ return data;
+ }
+
private byte[] generateData(Configuration conf, int numEntries) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
[2/3] tez git commit: TEZ-1911. addendum patch. MergeManager's
unconditionalReserve() should check for memory limits before allocating
memory to IntermediateMemoryToMemoryMerger (rbalamohan) (cherry picked from
commit 12695f3d030b287d10d221fe28d69b6fa5bb
Posted by jl...@apache.org.
TEZ-1911. addendum patch. MergeManager's unconditionalReserve() should check for memory limits before allocating memory to IntermediateMemoryToMemoryMerger (rbalamohan)
(cherry picked from commit 12695f3d030b287d10d221fe28d69b6fa5bbc09a)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ef61eb28
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ef61eb28
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ef61eb28
Branch: refs/heads/branch-0.7
Commit: ef61eb28daa517ec3f870e15581f5b72a6733352
Parents: 2238443
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Sat Feb 27 07:03:18 2016 +0530
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Feb 29 15:57:02 2016 +0000
----------------------------------------------------------------------
.../library/common/shuffle/orderedgrouped/MergeManager.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ef61eb28/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 4a31d06..b716c01 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -598,13 +598,13 @@ public class MergeManager {
Iterator<MapOutput> it = inputs.iterator();
while(it.hasNext() && !Thread.currentThread().isInterrupted()) {
MapOutput mo = it.next();
- if ((mergeOutputSize + mo.getSize() + usedMemory) > memoryLimit) {
+ if ((mergeOutputSize + mo.getSize() + manager.getUsedMemory()) > memoryLimit) {
//Search for smaller segments that can fit into existing mem
if (LOG.isDebugEnabled()) {
LOG.debug("Size is greater than usedMemory. "
+ "mergeOutputSize=" + mergeOutputSize
+ ", moSize=" + mo.getSize()
- + ", usedMemory=" + usedMemory
+ + ", usedMemory=" + manager.getUsedMemory()
+ ", memoryLimit=" + memoryLimit);
}
continue;
@@ -1105,7 +1105,7 @@ public class MergeManager {
}
@VisibleForTesting
- long getUsedMemory() {
+ synchronized long getUsedMemory() {
return usedMemory;
}
[3/3] tez git commit: TEZ-3147. Intermediate mem-to-mem: Fix early
exit when only one segment can fit into memory (rbalamohan) (cherry picked
from commit 3e409ae0ee7233b4cf631cac1bc366679a08b7d1)
Posted by jl...@apache.org.
TEZ-3147. Intermediate mem-to-mem: Fix early exit when only one segment can fit into memory (rbalamohan)
(cherry picked from commit 3e409ae0ee7233b4cf631cac1bc366679a08b7d1)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0c4d0cbf
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0c4d0cbf
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0c4d0cbf
Branch: refs/heads/branch-0.7
Commit: 0c4d0cbfc13c5a57e0e4c974480ca848ed46c7ec
Parents: ef61eb2
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Feb 29 16:11:48 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Feb 29 16:11:48 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../shuffle/orderedgrouped/MergeManager.java | 8 ++-
.../orderedgrouped/TestMergeManager.java | 59 ++++++++++++++++++++
3 files changed, 67 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0c4d0cbf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 92d9de8..df8cf59 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES:
+ TEZ-3147. Intermediate mem-to-mem: Fix early exit when only one segment can fit into memory
TEZ-1911. MergeManager's unconditionalReserve() should check for memory limits before allocating.
TEZ-3141. mapreduce.task.timeout is not translated to container heartbeat timeout
TEZ-3128. Avoid stopping containers on the AM shutdown thread.
http://git-wip-us.apache.org/repos/asf/tez/blob/0c4d0cbf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index b716c01..2d519d2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -596,6 +596,7 @@ public class MergeManager {
synchronized (manager) {
Iterator<MapOutput> it = inputs.iterator();
+ MapOutput lastAddedMapOutput = null;
while(it.hasNext() && !Thread.currentThread().isInterrupted()) {
MapOutput mo = it.next();
if ((mergeOutputSize + mo.getSize() + manager.getUsedMemory()) > memoryLimit) {
@@ -614,6 +615,7 @@ public class MergeManager {
mo.getAttemptIdentifier(), mo.getMemory(), 0, mo.getMemory().length);
inMemorySegments.add(new Segment(reader, true,
(mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null)));
+ lastAddedMapOutput = mo;
it.remove();
LOG.debug("Added segment for merging. mergeOutputSize=" + mergeOutputSize);
}
@@ -622,8 +624,12 @@ public class MergeManager {
//Add any unused MapOutput back
inMemoryMapOutputs.addAll(inputs);
+ //Exit early, if 0 or 1 segment is available
if (inMemorySegments.size() <= 1) {
- return; //no need to proceed further.
+ if (lastAddedMapOutput != null) {
+ inMemoryMapOutputs.add(lastAddedMapOutput);
+ }
+ return;
}
mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, false);
http://git-wip-us.apache.org/repos/asf/tez/blob/0c4d0cbf/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 1a8cb7a..398e94e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -420,6 +420,65 @@ public class TestMergeManager {
mergeManager.close();
+ /**
+ * Test #4
+ * - Set number of segments for merging to 4.
+ * - Have 4 in-memory segments of size {490000,490000,490000,230000}
+ * - Committing 4 segments would trigger mem-to-mem
+ * - But only 300000 can fit into memory. This should not be
+ * merged as there is no point in merging single segment. It should be
+ * added back to the inMemorySegments
+ */
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 4);
+ mergeManager =
+ new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null,
+ exceptionReporter, 2000000, null, false, -1);
+ mergeManager.configureAndStart();
+
+ //Single shuffle limit is 25% of 2000000
+ data1 = generateDataBySize(conf, 490000);
+ data2 = generateDataBySize(conf, 490000);
+ data3 = generateDataBySize(conf, 490000);
+ data4 = generateDataBySize(conf, 230000);
+
+ mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
+ mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
+ mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
+ mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+
+ assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 23000));
+
+ assertEquals(MapOutput.Type.MEMORY, mo1.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo2.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo3.getType());
+ assertEquals(MapOutput.Type.MEMORY, mo4.getType());
+ assertEquals(0, mergeManager.getCommitMemory());
+
+ assertEquals(data1.length + data2.length + data3.length + data4.length,
+ mergeManager.getUsedMemory());
+
+ System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
+ System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
+ System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
+ System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
+
+ //Committing 4 segments should trigger mem-to-mem merge
+ mo1.commit();
+ mo2.commit();
+ mo3.commit();
+ mo4.commit();
+
+ //4 segments were there originally in inMemoryMapOutput.
+ int numberOfMapOutputs = 4;
+
+ //Wait for mem-to-mem to complete. Since only 1 segment (230000) can fit
+ //into memory, it should return early
+ mergeManager.waitForMemToMemMerge();
+
+ //Check if inMemorySegment has got the MapOutput back for merging later
+ assertEquals(numberOfMapOutputs, mergeManager.inMemoryMapOutputs.size());
+
+ mergeManager.close();
}
private byte[] generateDataBySize(Configuration conf, int rawLen) throws IOException {