You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/11/03 22:40:15 UTC
git commit: TEZ-1731. OnDiskMerger can end up clobbering files across
tasks with LocalDiskFetch. (sseth)
Repository: tez
Updated Branches:
refs/heads/master 8433c6119 -> 57827160a
TEZ-1731. OnDiskMerger can end up clobbering files across tasks with
LocalDiskFetch. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57827160
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57827160
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57827160
Branch: refs/heads/master
Commit: 57827160a8772858f667aacba65c6589234c641b
Parents: 8433c61
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Nov 3 13:39:49 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Nov 3 13:39:49 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/hadoop/io/FileChunk.java | 30 ++-
.../orderedgrouped/FetcherOrderedGrouped.java | 4 +-
.../shuffle/orderedgrouped/MapOutput.java | 7 +-
.../shuffle/orderedgrouped/MergeManager.java | 45 ++--
.../local/output/TezLocalTaskOutputFiles.java | 10 +-
.../common/task/local/output/TezTaskOutput.java | 7 +
.../task/local/output/TezTaskOutputFiles.java | 7 +-
.../orderedgrouped/TestMergeManager.java | 237 +++++++++++++++++++
9 files changed, 318 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ce5a0d2..f741109 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -97,6 +97,7 @@ ALL CHANGES:
TEZ-1725. Fix nanosecond to millis conversion in TezMxBeanResourceCalculator.
TEZ-1726. Build broken against Hadoop-2.6.0 due to change in NodeReport.
TEZ-1579. MR examples should be setting mapreduce.framework.name to yarn-tez.
+ TEZ-1731. OnDiskMerger can end up clobbering files across tasks with LocalDiskFetch enabled.
Release 0.5.1: 2014-10-02
http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
index a7eb90a..0ba39e2 100644
--- a/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
+++ b/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
@@ -18,22 +18,34 @@
package org.apache.hadoop.io;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@Private
public class FileChunk implements Comparable<FileChunk> {
private final long offset;
private final long length;
- private final boolean preserveAfterUse;
+ private final boolean isLocalFile;
private final Path path;
+ private final InputAttemptIdentifier identifier;
- public FileChunk(Path path, long offset, long length, boolean preserveAfterUse) {
+ public FileChunk(Path path, long offset, long length, boolean isLocalFile,
+ InputAttemptIdentifier identifier) {
this.path = path;
this.offset = offset;
this.length = length;
- this.preserveAfterUse = preserveAfterUse;
+ this.isLocalFile = isLocalFile;
+ this.identifier = identifier;
+ if (isLocalFile) {
+ Preconditions.checkNotNull(identifier);
+ }
+ }
+
+ public FileChunk(Path path, long offset, long length) {
+ this(path, offset, length, false, null);
}
@Override
@@ -87,11 +99,15 @@ public class FileChunk implements Comparable<FileChunk> {
return length;
}
- public boolean preserveAfterUse() {
- return preserveAfterUse;
- }
-
public Path getPath() {
return path;
}
+
+ public boolean isLocalFile() {
+ return this.isLocalFile;
+ }
+
+ public InputAttemptIdentifier getInputAttemptIdentifier() {
+ return this.identifier;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 2b5a863..e83c705 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -441,7 +441,7 @@ class FetcherOrderedGrouped extends Thread {
LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
}
-
+
// Get the location for the map output - either in-memory or on-disk
try {
mapOutput = merger.reserve(srcAttemptId, decompressedLength, compressedLength, id);
@@ -467,7 +467,7 @@ class FetcherOrderedGrouped extends Thread {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map " +
mapOutput.getAttemptIdentifier() + " decomp: " +
- decompressedLength + " len: " + compressedLength);
+ decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
if (mapOutput.getType() == Type.MEMORY) {
ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
(int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index c735a43..231975b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -89,8 +89,11 @@ class MapOutput {
this.disk = null;
if (type == Type.DISK || type == Type.DISK_DIRECT) {
- boolean preserve = (type == Type.DISK_DIRECT); // type disk are temp files.
- this.outputPath = new FileChunk(outputPath, offset, size, preserve);
+ if (type == Type.DISK_DIRECT) {
+ this.outputPath = new FileChunk(outputPath, offset, size, true, attemptIdentifier);
+ } else {
+ this.outputPath = new FileChunk(outputPath, offset, size, false, attemptIdentifier);
+ }
} else {
this.outputPath = null;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 0db5237..6081f91 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -88,9 +89,11 @@ public class MergeManager {
private final Set<MapOutput> inMemoryMapOutputs =
new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
private final InMemoryMerger inMemoryMerger;
-
- private final Set<FileChunk> onDiskMapOutputs = new TreeSet<FileChunk>();
- private final OnDiskMerger onDiskMerger;
+
+ @VisibleForTesting
+ final Set<FileChunk> onDiskMapOutputs = new TreeSet<FileChunk>();
+ @VisibleForTesting
+ final OnDiskMerger onDiskMerger;
private final long memoryLimit;
private final int postMergeMemLimit;
@@ -131,7 +134,7 @@ public class MergeManager {
/**
* Construct the MergeManager. Must call start before it becomes usable.
*/
- public MergeManager(Configuration conf,
+ public MergeManager(Configuration conf,
FileSystem localFS,
LocalDirAllocator localDirAllocator,
InputContext inputContext,
@@ -213,7 +216,7 @@ public class MergeManager {
}
LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
- + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem="
+ + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + ". Updated to: ShuffleMem="
+ this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
this.ioSortFactor =
@@ -253,7 +256,7 @@ public class MergeManager {
throw new RuntimeException("Invlaid configuration: "
+ "maxSingleShuffleLimit should be less than mergeThreshold"
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
- + "mergeThreshold: " + this.mergeThreshold);
+ + ", mergeThreshold: " + this.mergeThreshold);
}
boolean allowMemToMemMerge =
@@ -494,6 +497,7 @@ public class MergeManager {
return;
}
+
InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
List<Segment> inMemorySegments = new ArrayList<Segment>();
long mergeOutputSize =
@@ -629,7 +633,7 @@ public class MergeManager {
}
// Note the output of the merge
- closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen, false));
+ closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen));
}
}
@@ -637,7 +641,8 @@ public class MergeManager {
/**
* Merges multiple on-disk segments
*/
- private class OnDiskMerger extends MergeThread<FileChunk> {
+ @VisibleForTesting
+ class OnDiskMerger extends MergeThread<FileChunk> {
public OnDiskMerger(MergeManager manager) {
super(manager, ioSortFactor, exceptionReporter);
@@ -668,7 +673,7 @@ public class MergeManager {
for (FileChunk fileChunk : inputs) {
final long offset = fileChunk.getOffset();
final long size = fileChunk.getLength();
- final boolean preserve = fileChunk.preserveAfterUse();
+ final boolean preserve = fileChunk.isLocalFile();
final Path file = fileChunk.getPath();
approxOutputSize += size;
Segment segment = new Segment(conf, rfs, file, offset, size, codec, ifileReadAhead,
@@ -681,9 +686,19 @@ public class MergeManager {
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
- Path outputPath =
- localDirAllocator.getLocalPathForWrite(inputs.get(0).getPath().toString(),
- approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
+ FileChunk file0 = inputs.get(0);
+ String namePart;
+ if (file0.isLocalFile()) {
+ // This is setup the same way a type DISK MapOutput is setup when fetching.
+ namePart = mapOutputFile.getSpillFileName(
+ file0.getInputAttemptIdentifier().getInputIdentifier().getInputIndex());
+
+ } else {
+ namePart = file0.getPath().getName().toString();
+ }
+ Path outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf);
+ outputPath = outputPath.suffix(Constants.MERGED_OUTPUT_PREFIX);
+
Writer writer =
new Writer(conf, rfs, outputPath,
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
@@ -712,7 +727,7 @@ public class MergeManager {
}
final long outputLen = localFS.getFileStatus(outputPath).getLen();
- closeOnDiskFile(new FileChunk(outputPath, 0, outputLen, false));
+ closeOnDiskFile(new FileChunk(outputPath, 0, outputLen));
LOG.info(inputContext.getUniqueIdentifier() +
" Finished merging " + inputs.size() +
@@ -857,7 +872,7 @@ public class MergeManager {
final FileStatus fStatus = localFS.getFileStatus(outputPath);
// add to list of final disk outputs.
- onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen(), false));
+ onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen()));
LOG.info("Merged " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes to disk to satisfy " +
@@ -885,7 +900,7 @@ public class MergeManager {
file.toString().endsWith(Constants.MERGED_OUTPUT_PREFIX) ? null : mergedMapOutputsCounter;
final long fileOffset = fileChunk.getOffset();
- final boolean preserve = fileChunk.preserveAfterUse();
+ final boolean preserve = fileChunk.isLocalFile();
diskSegments.add(new Segment(job, fs, file, fileOffset, fileLength, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, preserve, counter));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
index 3d83010..e02011f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -222,11 +222,17 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
public Path getInputFileForWrite(int taskId,
long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(String.format(
- Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING, Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, taskId),
+ return lDirAlloc.getLocalPathForWrite(getSpillFileName(taskId),
size, conf);
}
+ @Override
+ public String getSpillFileName(int spillNum) {
+ return (String.format(
+ Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING, Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR,
+ spillNum));
+ }
+
/** Removes all of the files related to a task. */
@Override
public void removeAll()
http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
index d3e7d27..e34e399 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
@@ -160,6 +160,13 @@ public abstract class TezTaskOutput {
public abstract Path getInputFileForWrite(
int taskIdentifier, long size) throws IOException;
+ /**
+ * Construct a spill file name, given a spill number
+ * @param spillNum
+ * @return
+ */
+ public abstract String getSpillFileName(int spillNum);
+
/** Removes all of the files related to a task. */
public abstract void removeAll() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index 8ae1a04..86a83ac 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -234,11 +234,14 @@ public class TezTaskOutputFiles extends TezTaskOutput {
*/
public Path getInputFileForWrite(int srcTaskId,
long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(String.format(SPILL_FILE_PATTERN,
- uniqueId, srcTaskId),
+ return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcTaskId),
size, conf);
}
+ public String getSpillFileName(int spillNum) {
+ return String.format(SPILL_FILE_PATTERN, uniqueId, spillNum);
+ }
+
/** Removes all of the files related to a task. */
public void removeAll() throws IOException {
throw new UnsupportedOperationException("Incompatible with LocalRunner");
http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
new file mode 100644
index 0000000..7615ba7
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FileChunk;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMergeManager {
+
+
+ private static final Log LOG = LogFactory.getLog(TestMergeManager.class);
+
+ private static Configuration defaultConf = new TezConfiguration();
+ private static FileSystem localFs = null;
+ private static Path workDir = null;
+
+ static {
+ try {
+ defaultConf.set("fs.defaultFS", "file:///");
+ localFs = FileSystem.getLocal(defaultConf);
+ workDir =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ TestMergeManager.class.getSimpleName());
+ workDir = localFs.makeQualified(workDir);
+ localFs.mkdirs(workDir);
+ LOG.info("Using workDir: " + workDir);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+
+ @Before
+ @After
+ public void cleanup() throws IOException {
+ localFs.delete(workDir, true);
+ }
+
+ @Test(timeout = 10000)
+ public void testLocalDiskMergeMultipleTasks() throws IOException {
+
+ Configuration conf = new TezConfiguration(defaultConf);
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
+
+ Path localDir = new Path(workDir, "local");
+ Path srcDir = new Path(workDir, "srcData");
+ localFs.mkdirs(localDir);
+ localFs.mkdirs(srcDir);
+
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString());
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ LocalDirAllocator localDirAllocator =
+ new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+ InputContext t0inputContext = createMockInputContext(UUID.randomUUID().toString());
+ InputContext t1inputContext = createMockInputContext(UUID.randomUUID().toString());
+
+ ExceptionReporter t0exceptionReporter = mock(ExceptionReporter.class);
+ ExceptionReporter t1exceptionReporter = mock(ExceptionReporter.class);
+
+ MergeManager t0mergeManagerReal =
+ new MergeManager(conf, localFs, localDirAllocator, t0inputContext, null, null, null, null,
+ t0exceptionReporter, 2000000, null, false, -1);
+ MergeManager t0mergeManager = spy(t0mergeManagerReal);
+
+ MergeManager t1mergeManagerReal =
+ new MergeManager(conf, localFs, localDirAllocator, t1inputContext, null, null, null, null,
+ t1exceptionReporter, 2000000, null, false, -1);
+ MergeManager t1mergeManager = spy(t1mergeManagerReal);
+
+ // Partition 0 Keys 0-2, Partition 1 Keys 3-5
+ SrcFileInfo src1Info =
+ createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src1.out"),
+ 2, 3, 0);
+ // Partition 0 Keys 6-8, Partition 1 Keys 9-11
+ SrcFileInfo src2Info =
+ createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src2.out"),
+ 2, 3, 6);
+
+
+ // Simulating Task 0 fetches partition 0. (targetIndex = 0,1)
+
+ // Simulating Task 1 fetches partition 1. (targetIndex = 0,1)
+
+ InputAttemptIdentifier t0Identifier0 =
+ new InputAttemptIdentifier(0, 0, src1Info.path.getName());
+ InputAttemptIdentifier t0Identifier1 =
+ new InputAttemptIdentifier(1, 0, src2Info.path.getName());
+
+ InputAttemptIdentifier t1Identifier0 =
+ new InputAttemptIdentifier(0, 0, src1Info.path.getName());
+ InputAttemptIdentifier t1Identifier1 =
+ new InputAttemptIdentifier(1, 0, src2Info.path.getName());
+
+
+ MapOutput t0MapOutput0 =
+ getMapOutputForDirectDiskFetch(t0Identifier0, src1Info.path, src1Info.indexedRecords[0],
+ t0mergeManager);
+ MapOutput t0MapOutput1 =
+ getMapOutputForDirectDiskFetch(t0Identifier1, src2Info.path, src2Info.indexedRecords[0],
+ t0mergeManager);
+
+ MapOutput t1MapOutput0 =
+ getMapOutputForDirectDiskFetch(t1Identifier0, src1Info.path, src1Info.indexedRecords[1],
+ t1mergeManager);
+ MapOutput t1MapOutput1 =
+ getMapOutputForDirectDiskFetch(t1Identifier1, src2Info.path, src2Info.indexedRecords[1],
+ t1mergeManager);
+
+
+ t0MapOutput0.commit();
+ t0MapOutput1.commit();
+ verify(t0mergeManager).closeOnDiskFile(t0MapOutput0.getOutputPath());
+ verify(t0mergeManager).closeOnDiskFile(t0MapOutput1.getOutputPath());
+ // Run the OnDiskMerge via MergeManager
+ // Simulate the thread invocation - remove files, and invoke merge
+ List<FileChunk> t0MergeFiles = new LinkedList<FileChunk>();
+ t0MergeFiles.addAll(t0mergeManager.onDiskMapOutputs);
+ t0mergeManager.onDiskMapOutputs.clear();
+ t0mergeManager.onDiskMerger.merge(t0MergeFiles);
+ Assert.assertEquals(1, t0mergeManager.onDiskMapOutputs.size());
+
+
+ t1MapOutput0.commit();
+ t1MapOutput1.commit();
+ verify(t1mergeManager).closeOnDiskFile(t1MapOutput0.getOutputPath());
+ verify(t1mergeManager).closeOnDiskFile(t1MapOutput1.getOutputPath());
+ // Run the OnDiskMerge via MergeManager
+ // Simulate the thread invocation - remove files, and invoke merge
+ List<FileChunk> t1MergeFiles = new LinkedList<FileChunk>();
+ t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs);
+ t1mergeManager.onDiskMapOutputs.clear();
+ t1mergeManager.onDiskMerger.merge(t1MergeFiles);
+ Assert.assertEquals(1, t1mergeManager.onDiskMapOutputs.size());
+
+ Assert.assertNotEquals(t0mergeManager.onDiskMapOutputs.iterator().next().getPath(),
+ t1mergeManager.onDiskMapOutputs.iterator().next().getPath());
+
+ Assert.assertTrue(t0mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
+ .contains(t0inputContext.getUniqueIdentifier()));
+ Assert.assertTrue(t1mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
+ .contains(t1inputContext.getUniqueIdentifier()));
+
+ }
+
+ private InputContext createMockInputContext(String uniqueId) {
+ InputContext inputContext = mock(InputContext.class);
+ doReturn(new TezCounters()).when(inputContext).getCounters();
+ doReturn(200 * 1024 * 1024l).when(inputContext).getTotalMemoryAvailableToTask();
+ doReturn("srcVertexName").when(inputContext).getSourceVertexName();
+ doReturn(uniqueId).when(inputContext).getUniqueIdentifier();
+ return inputContext;
+ }
+
+ private SrcFileInfo createFile(Configuration conf, FileSystem fs, Path path, int numPartitions,
+ int numKeysPerPartition, int startKey) throws IOException {
+ FSDataOutputStream outStream = fs.create(path);
+ int currentKey = startKey;
+ SrcFileInfo srcFileInfo = new SrcFileInfo();
+ srcFileInfo.indexedRecords = new TezIndexRecord[numPartitions];
+ srcFileInfo.path = path;
+ for (int i = 0; i < numPartitions; i++) {
+ long pos = outStream.getPos();
+ IFile.Writer writer =
+ new IFile.Writer(conf, outStream, IntWritable.class, IntWritable.class, null, null, null);
+ for (int j = 0; j < numKeysPerPartition; j++) {
+ writer.append(new IntWritable(currentKey), new IntWritable(currentKey));
+ currentKey++;
+ }
+ writer.close();
+ srcFileInfo.indexedRecords[i] =
+ new TezIndexRecord(pos, writer.getRawLength(), writer.getCompressedLength());
+ }
+ outStream.close();
+ return srcFileInfo;
+ }
+
+ private class SrcFileInfo {
+ private Path path;
+ private TezIndexRecord[] indexedRecords;
+ }
+
+ // Copied from FetcherOrderedGrouped
+ private static MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier srcAttemptId,
+ Path filename, TezIndexRecord indexRecord,
+ MergeManager merger)
+ throws IOException {
+ return MapOutput.createLocalDiskMapOutput(srcAttemptId, merger, filename,
+ indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
+ }
+}