You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 02:06:20 UTC
tez git commit: TEZ-2734. Add a test to verify the filename generated
by OnDiskMerge. (sseth)
Repository: tez
Updated Branches:
refs/heads/master f9942ccf6 -> c17d4a0a4
TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c17d4a0a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c17d4a0a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c17d4a0a
Branch: refs/heads/master
Commit: c17d4a0a441477caed06d61d2ebe868946ff71a6
Parents: f9942cc
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Aug 21 17:05:58 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 17:05:58 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../orderedgrouped/TestMergeManager.java | 140 ++++++++++++++++++-
2 files changed, 143 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c17d4a0a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fc61bbf..89c7cb0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
@@ -76,6 +77,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
@@ -300,6 +302,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2630. TezChild receives IP address instead of FQDN.
@@ -509,6 +512,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
TEZ-2687. ATS History shutdown happens before the min-held containers are released
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2630. TezChild receives IP address instead of FQDN.
http://git-wip-us.apache.org/repos/asf/tez/blob/c17d4a0a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 0faa22a..f3b8e99 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -18,6 +18,9 @@
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -43,7 +46,6 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -190,6 +192,141 @@ public class TestMergeManager {
testLocalDiskMergeMultipleTasks(true);
}
+ @Test(timeout = 10000)
+ public void testOnDiskMergerFilenames() throws IOException, InterruptedException {
+ Configuration conf = new TezConfiguration(defaultConf);
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
+
+ Path localDir = new Path(workDir, "local");
+ Path srcDir = new Path(workDir, "srcData");
+ localFs.mkdirs(localDir);
+ localFs.mkdirs(srcDir);
+
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString());
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+
+ LocalDirAllocator localDirAllocator =
+ new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+ InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
+
+ ExceptionReporter exceptionReporter = mock(ExceptionReporter.class);
+
+ MergeManager mergeManagerReal =
+ new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null,
+ exceptionReporter, 1 * 1024l * 1024l, null, false, -1);
+ MergeManager mergeManager = spy(mergeManagerReal);
+
+ // Partition 0 Keys 0-2, Partition 1 Keys 3-5
+ SrcFileInfo file1Info =
+ createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src1.out"),
+ 2, 3, 6);
+
+ SrcFileInfo file2Info =
+ createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src2.out"),
+ 2, 3, 0);
+
+ InputAttemptIdentifier iIdentifier1 =
+ new InputAttemptIdentifier(0, 0, file1Info.path.getName());
+ InputAttemptIdentifier iIdentifier2 =
+ new InputAttemptIdentifier(1, 0, file2Info.path.getName());
+
+ MapOutput mapOutput1 =
+ getMapOutputForDirectDiskFetch(iIdentifier1, file1Info.path, file1Info.indexedRecords[0],
+ mergeManager);
+ MapOutput mapOutput2 =
+ getMapOutputForDirectDiskFetch(iIdentifier2, file2Info.path, file2Info.indexedRecords[0],
+ mergeManager);
+
+ mapOutput1.commit();
+ mapOutput2.commit();
+ verify(mergeManager).closeOnDiskFile(mapOutput1.getOutputPath());
+ verify(mergeManager).closeOnDiskFile(mapOutput2.getOutputPath());
+
+ List<FileChunk> mergeFiles = new LinkedList<FileChunk>();
+ mergeFiles.addAll(mergeManager.onDiskMapOutputs);
+ mergeManager.onDiskMapOutputs.clear();
+
+ mergeManager.onDiskMerger.merge(mergeFiles);
+ Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
+
+ FileChunk fcMerged1 = mergeManager.onDiskMapOutputs.iterator().next();
+ Path m1Path = fcMerged1.getPath();
+ assertTrue(m1Path.toString().endsWith("merged0"));
+
+ // Add another file. Make sure the filename is different, and does not get clobbered.
+
+ SrcFileInfo file3Info =
+ createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src3.out"),
+ 2, 22, 5);
+ InputAttemptIdentifier iIdentifier3 =
+ new InputAttemptIdentifier(2, 0, file1Info.path.getName());
+ MapOutput mapOutput3 =
+ getMapOutputForDirectDiskFetch(iIdentifier3, file3Info.path, file3Info.indexedRecords[0],
+ mergeManager);
+ mapOutput3.commit();
+ verify(mergeManager).closeOnDiskFile(mapOutput3.getOutputPath());
+
+ mergeFiles = new LinkedList<FileChunk>();
+ mergeFiles.addAll(mergeManager.onDiskMapOutputs);
+ mergeManager.onDiskMapOutputs.clear();
+
+ mergeManager.onDiskMerger.merge(mergeFiles);
+ Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
+
+ FileChunk fcMerged2 = mergeManager.onDiskMapOutputs.iterator().next();
+ Path m2Path = fcMerged2.getPath();
+ assertTrue(m2Path.toString().endsWith("merged1"));
+ assertNotEquals(m1Path, m2Path);
+
+ // Add another file. This time add it to the head of the list.
+ SrcFileInfo file4Info =
+ createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src4.out"),
+ 2, 45, 35);
+ InputAttemptIdentifier iIdentifier4 =
+ new InputAttemptIdentifier(3, 0, file4Info.path.getName());
+ MapOutput mapOutput4 =
+ getMapOutputForDirectDiskFetch(iIdentifier4, file4Info.path, file4Info.indexedRecords[0],
+ mergeManager);
+ mapOutput4.commit();
+ verify(mergeManager).closeOnDiskFile(mapOutput4.getOutputPath());
+
+ // Add in reverse order this time.
+ List<FileChunk> tmpList = new LinkedList<>();
+ mergeFiles = new LinkedList<>();
+ assertEquals(2, mergeManager.onDiskMapOutputs.size());
+ tmpList.addAll(mergeManager.onDiskMapOutputs);
+ mergeFiles.add(tmpList.get(1));
+ mergeFiles.add(tmpList.get(0));
+
+ mergeManager.onDiskMapOutputs.clear();
+
+ mergeManager.onDiskMerger.merge(mergeFiles);
+ Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
+
+ FileChunk fcMerged3 = mergeManager.onDiskMapOutputs.iterator().next();
+ Path m3Path = fcMerged3.getPath();
+
+ assertTrue(m3Path.toString().endsWith("merged2"));
+ assertNotEquals(m2Path, m3Path);
+
+ // Ensure the lengths are the same - since the source file names are the same. No append happening.
+ assertEquals(m1Path.toString().length(), m2Path.toString().length());
+ assertEquals(m2Path.toString().length(), m3Path.toString().length());
+
+ // Ensure the filenames are used correctly - based on the first file given to the merger.
+ String m1Prefix = m1Path.toString().substring(0, m1Path.toString().indexOf("."));
+ String m2Prefix = m2Path.toString().substring(0, m2Path.toString().indexOf("."));
+ String m3Prefix = m3Path.toString().substring(0, m3Path.toString().indexOf("."));
+
+ assertEquals(m1Prefix, m2Prefix);
+ assertNotEquals(m1Prefix, m3Prefix);
+ assertNotEquals(m2Prefix, m3Prefix);
+
+ }
+
void testLocalDiskMergeMultipleTasks(boolean interruptInMiddle)
throws IOException, InterruptedException {
@@ -367,4 +504,5 @@ public class TestMergeManager {
return MapOutput.createLocalDiskMapOutput(srcAttemptId, merger, filename,
indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
}
+
}