You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/01/21 00:16:52 UTC
tez git commit: TEZ-1855. Avoid scanning for previously written files
within Inputs / Outputs (Rajesh Balamohan)
Repository: tez
Updated Branches:
refs/heads/master a5579c4ab -> ecf1ff58f
TEZ-1855. Avoid scanning for previously written files within Inputs / Outputs (Rajesh Balamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ecf1ff58
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ecf1ff58
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ecf1ff58
Branch: refs/heads/master
Commit: ecf1ff58f85d350d539755c05625a7ffc2388d94
Parents: a5579c4
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Wed Jan 21 04:46:42 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Wed Jan 21 04:46:42 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../processor/map/TestMapProcessor.java | 13 +++-
.../common/sort/impl/ExternalSorter.java | 18 ++++-
.../common/sort/impl/PipelinedSorter.java | 34 ++++----
.../common/sort/impl/dflt/DefaultSorter.java | 28 ++++---
.../common/task/local/output/TezTaskOutput.java | 52 -------------
.../task/local/output/TezTaskOutputFiles.java | 81 --------------------
.../writers/UnorderedPartitionedKVWriter.java | 14 +++-
.../output/OrderedPartitionedKVOutput.java | 2 +-
.../TestUnorderedPartitionedKVWriter.java | 59 ++++----------
10 files changed, 91 insertions(+), 211 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ed13462..3ec7002 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1855. Avoid scanning for previously written files within Inputs / Outputs.
TEZ-1902. Fix findbugs warnings in tez-mapreduce.
TEZ-1963. Fix post memory merge to be > 2 GB.
TEZ-1901. Fix findbugs warnings in tez-examples.
http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 3b62dfc..68d8cef 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -24,7 +24,9 @@ import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.LongWritable;
@@ -96,6 +98,15 @@ public class TestMapProcessor {
job.setNumReduceTasks(1);
}
+ private Path getMapOutputFile(Configuration jobConf, OutputContext outputContext)
+ throws IOException {
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+ Path attemptOutput = new Path(new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, outputContext.getUniqueIdentifier()),
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+ Path mapOutputFile = lDirAlloc.getLocalPathToRead(attemptOutput.toString(), jobConf);
+ return mapOutputFile;
+ }
+
@Before
@After
public void cleanup() throws Exception {
@@ -152,7 +163,7 @@ public class TestMapProcessor {
// .getCommitter().getClass().getName());
// t.close();
- Path mapOutputFile = mapOutputs.getOutputFile();
+ Path mapOutputFile = getMapOutputFile(jobConf, outputContext);
LOG.info("mapOutputFile = " + mapOutputFile);
IFile.Reader reader =
new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);
http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 9aee53a..19d60e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -22,7 +22,9 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
+import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -63,7 +65,10 @@ public abstract class ExternalSorter {
private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
- public abstract void close() throws IOException;
+ public void close() throws IOException {
+ spillFileIndexPaths.clear();
+ spillFilePaths.clear();
+ }
public abstract void flush() throws IOException;
@@ -103,6 +108,12 @@ public abstract class ExternalSorter {
// Compression for map-outputs
protected final CompressionCodec codec;
+ protected final Map<Integer, Path> spillFilePaths = Maps.newHashMap();
+ protected final Map<Integer, Path> spillFileIndexPaths = Maps.newHashMap();
+
+ protected Path finalOutputFile;
+ protected Path finalIndexFile;
+
// Counters
// MR compatilbity layer needs to rename counters back to what MR requries.
@@ -243,6 +254,11 @@ public abstract class ExternalSorter {
return mapOutputFile;
}
+ @Private
+ public Path getFinalIndexFile() {
+ return finalIndexFile;
+ }
+
protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
Writer writer) throws IOException {
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 35ea954..9b171ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -246,7 +246,8 @@ public class PipelinedSorter extends ExternalSorter {
+ (partitions * APPROX_HEADER_LENGTH);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
- mapOutputFile.getSpillFileForWrite(numSpills, size);
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
+ spillFilePaths.put(numSpills, filename);
FSDataOutputStream out = rfs.create(filename, true, 4096);
try {
@@ -281,6 +282,7 @@ public class PipelinedSorter extends ExternalSorter {
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ spillFileIndexPaths.put(numSpills, indexFilename);
// TODO: cache
spillRec.writeToFile(indexFilename, conf);
++numSpills;
@@ -294,9 +296,9 @@ public class PipelinedSorter extends ExternalSorter {
@Override
public void flush() throws IOException {
final String uniqueIdentifier = outputContext.getUniqueIdentifier();
- Path finalOutputFile =
+ finalOutputFile =
mapOutputFile.getOutputFileForWrite(0); //TODO
- Path finalIndexFile =
+ finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(0); //TODO
LOG.info("Starting flush of map output");
@@ -312,12 +314,13 @@ public class PipelinedSorter extends ExternalSorter {
if(numSpills == 1) {
// someday be able to pass this directly to shuffle
// without writing to disk
- final Path filename =
- mapOutputFile.getSpillFile(0);
- Path indexFilename =
- mapOutputFile.getSpillIndexFile(0);
- sameVolRename(filename, mapOutputFile.getOutputFileForWriteInVolume(filename));
- sameVolRename(indexFilename, mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename));
+ final Path filename = spillFilePaths.get(0);
+ final Path indexFilename = spillFileIndexPaths.get(0);
+ finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename);
+ finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);
+
+ sameVolRename(filename, finalOutputFile);
+ sameVolRename(indexFilename, finalIndexFile);
return;
}
@@ -325,11 +328,10 @@ public class PipelinedSorter extends ExternalSorter {
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
- final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
for(int i = 0; i < numSpills; i++) {
// TODO: build this cache before
- Path indexFilename = mapOutputFile.getSpillIndexFile(i);
+ Path indexFilename = spillFileIndexPaths.get(i);
TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
indexCacheList.add(spillIndex);
}
@@ -339,7 +341,7 @@ public class PipelinedSorter extends ExternalSorter {
List<Segment> segmentList =
new ArrayList<Segment>(numSpills);
for(int i = 0; i < numSpills; i++) {
- Path spillFilename = mapOutputFile.getSpillFile(i);
+ Path spillFilename = spillFilePaths.get(i);
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment s =
@@ -390,14 +392,16 @@ public class PipelinedSorter extends ExternalSorter {
spillRec.writeToFile(finalIndexFile, conf);
finalOut.close();
for(int i = 0; i < numSpills; i++) {
- Path indexFilename = mapOutputFile.getSpillIndexFile(i);
- Path spillFilename = mapOutputFile.getSpillFile(i);
+ Path indexFilename = spillFileIndexPaths.get(i);
+ Path spillFilename = spillFilePaths.get(i);
rfs.delete(indexFilename,true);
rfs.delete(spillFilename,true);
}
+
+ spillFileIndexPaths.clear();
+ spillFilePaths.clear();
}
- public void close() { }
private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
int getPartition();
http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index f9e6935..b99f319 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -636,7 +636,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
//FIXME
//kvbuffer = null;
mergeParts();
- Path outputPath = mapOutputFile.getOutputFile();
+ Path outputPath = finalOutputFile;
fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
}
@@ -747,6 +747,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
+ spillFilePaths.put(numSpills, filename);
out = rfs.create(filename);
int spindex = mstart;
@@ -820,6 +821,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ spillFileIndexPaths.put(numSpills, indexFilename);
spillRec.writeToFile(indexFilename, conf);
} else {
indexCacheList.add(spillRec);
@@ -847,6 +849,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
+ spillFilePaths.put(numSpills, filename);
out = rfs.create(filename);
// we don't run the combiner for a single record
@@ -895,7 +898,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, conf);
+ spillFileIndexPaths.put(numSpills, indexFilename);
+ spillRec.writeToFile(indexFilename, conf);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
@@ -1001,25 +1005,25 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
final String taskIdentifier = outputContext.getUniqueIdentifier();
for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(i);
+ filename[i] = spillFilePaths.get(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
- sameVolRename(filename[0],
- mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
+ finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename[0]);
+ finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
+
+ sameVolRename(filename[0], finalOutputFile);
if (indexCacheList.size() == 0) {
- sameVolRename(mapOutputFile.getSpillIndexFile(0),
- mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
+ sameVolRename(spillFileIndexPaths.get(0), finalIndexFile);
} else {
- indexCacheList.get(0).writeToFile(
- mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
+ indexCacheList.get(0).writeToFile(finalIndexFile, conf);
}
return;
}
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
- Path indexFileName = mapOutputFile.getSpillIndexFile(i);
+ Path indexFileName = spillFileIndexPaths.get(i);
indexCacheList.add(new TezSpillRecord(indexFileName, conf));
}
@@ -1027,9 +1031,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutputFile =
+ finalOutputFile =
mapOutputFile.getOutputFileForWrite(finalOutFileSize);
- Path finalIndexFile =
+ finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//The output stream for the final single output file
http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
index 1c08f28..e9f33af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
/**
* Manipulate the working area for the transient store for components in tez-runtime-library
@@ -53,16 +52,6 @@ public abstract class TezTaskOutput {
}
/**
- * Return the path to local output file created earlier.
- *
- * TODO TEZ-1855: Remove this. Leads to an extra localdir scan just to update counters.
- *
- * @return path the path of the local output file
- * @throws IOException
- */
- public abstract Path getOutputFile() throws IOException;
-
- /**
* Create a local output file name.
*
* @param size the size of the file
@@ -90,16 +79,6 @@ public abstract class TezTaskOutput {
public abstract Path getOutputFileForWriteInVolume(Path existing);
/**
- * Return the path to a local output index file created earlier
- *
- * TODO TEZ-1855: Remove this. Leads to an additional scan to find empty partitions.
- *
- * @return path the path of the index file
- * @throws IOException
- */
- public abstract Path getOutputIndexFile() throws IOException;
-
- /**
* Create a local output index file name.
*
* @param size the size of the file
@@ -117,16 +96,6 @@ public abstract class TezTaskOutput {
public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
/**
- * Return a local output spill file created earlier.
- *
- * @param spillNumber the spill number
- * @return path the path of the previously written spill file corresponding to the spillNumber
- * @throws IOException
- * // KKK Try removing this. Unnecessary file scans - can be stored in memory instead.
- */
- public abstract Path getSpillFile(int spillNumber) throws IOException;
-
- /**
* Create a local output spill file name.
*
* @param spillNumber the spill number
@@ -137,16 +106,6 @@ public abstract class TezTaskOutput {
public abstract Path getSpillFileForWrite(int spillNumber, long size)
throws IOException;
- /**
- * Return a local output spill index file created earlier
- *
- * TODO TEZ-1855: Remove this. Should be possible to cache this instead of requiring a directory scan.
- *
- * @param spillNumber the spill number
- * @return path the path of the previously written spill index file corresponding to the spillNumber
- * @throws IOException
- */
- public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
/**
* Create a local output spill index file name.
@@ -160,17 +119,6 @@ public abstract class TezTaskOutput {
throws IOException;
/**
- * Return a local input file created earlier
- *
- * TODO: TEZ-1855. Remove this.
- *
- * @param attemptIdentifier The identifier for the source
- * @return path the path to the input file
- * @throws IOException
- */
- public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
-
- /**
* Create a local input file name.
*
* @param srcIdentifier The identifier for the source
http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index be7e4ab..59aab17 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.common.Constants;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
/**
* Manipulate the working area for the transient store for components in tez-runtime-library
@@ -71,23 +70,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
return new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, uniqueId);
}
- /**
- * Return the path to local output file created earlier.
- *
- * ${appDir}/output/${uniqueId}/file.out
- * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out
- *
- * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
- *
- * @return path the path of the local output file
- * @throws IOException
- */
- @Override
- public Path getOutputFile() throws IOException {
- Path attemptOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
- return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
- }
/**
* Create a local output file name.
@@ -147,24 +129,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
return new Path(attemptOutputDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
}
- /**
- * Return the path to a local output index file created earlier
- *
- * ${appDir}/output/${uniqueId}/file.out.index
- * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out.index
- *
- * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
- *
- * @return path the path of the index file
- * @throws IOException
- */
- @Override
- public Path getOutputIndexFile() throws IOException {
- Path attemptIndexOutput =
- new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
- Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
- }
/**
* Create a local output index file name.
@@ -206,23 +170,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
}
/**
- * Return a local output spill file created earlier.
- *
- * ${appDir}/${uniqueId}_spill_${spillNumber}.out
- * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10003_spill_0.out
- *
- * @param spillNumber the spill number
- * @return path the path of the previously written spill file corresponding to the spillNumber
- * @throws IOException
- */
- @Override
- public Path getSpillFile(int spillNumber) throws IOException {
- return lDirAlloc.getLocalPathToRead(
- String.format(SPILL_FILE_PATTERN,
- uniqueId, spillNumber), conf);
- }
-
- /**
* Create a local spill file name.
*
* ${appDir}/${uniqueId}_spill_${spillNumber}.out
@@ -242,20 +189,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
}
/**
- * Return a local map spill index file created earlier
- *
- * @param spillNumber the number
- * @return path the path of the previously written spill index file corresponding to the spillNumber
- * @throws IOException
- */
- @Override
- public Path getSpillIndexFile(int spillNumber) throws IOException {
- return lDirAlloc.getLocalPathToRead(
- String.format(SPILL_INDEX_FILE_PATTERN,
- uniqueId, spillNumber), conf);
- }
-
- /**
* Create a local output spill index file name.
*
* ${appDir}/${uniqueId}_spill_${spillNumber}.out.index
@@ -274,20 +207,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
uniqueId, spillNumber), size, conf);
}
- /**
- * Return a local input file created earlier
- *
- * ${appDir}/${uniqueId}_spill_${spillNumber}.out
- * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10004_spill_0.out
- *
- * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
- * @return path the path to the input file
- * @throws IOException
- */
- @Override
- public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
- throw new UnsupportedOperationException("Incompatible with LocalRunner");
- }
/**
* Create a local input file name.
http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 2a1df40..33c2122 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -126,6 +126,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
final AtomicInteger numSpills = new AtomicInteger(0);
private final AtomicInteger pendingSpillCount = new AtomicInteger(0);
+ @VisibleForTesting
+ Path finalIndexPath;
+ @VisibleForTesting
+ Path finalOutPath;
+
private final ReentrantLock spillLock = new ReentrantLock();
private final Condition spillInProgress = spillLock.newCondition();
@@ -334,6 +339,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
Path outPath = null;
if (isFinalSpill) {
outPath = outputFileHandler.getOutputFileForWrite(spillSize);
+ finalOutPath = outPath;
} else {
outPath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize);
}
@@ -370,8 +376,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
}
if (isFinalSpill) {
long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalSpillFile = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
- spillRecord.writeToFile(finalSpillFile, conf);
+ finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+ spillRecord.writeToFile(finalIndexPath, conf);
fileOutputBytesCounter.increment(indexFileSizeEstimate);
LOG.info("Finished final and only spill");
} else {
@@ -516,8 +522,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
}
long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize);
- Path finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+ finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize);
+ finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
TezSpillRecord finalSpillRecord = new TezSpillRecord(numPartitions);
http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 5af9f86..de9e877 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -158,7 +158,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
boolean outputGenerated = true;
if (sendEmptyPartitionDetails) {
- Path indexFile = sorter.getMapOutput().getOutputIndexFile();
+ Path indexFile = sorter.getFinalIndexFile();
TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
BitSet emptyPartitionDetails = new BitSet();
int emptyPartitions = 0;
http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 9950d30..cb385ea 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
@@ -56,7 +55,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
@@ -335,27 +333,13 @@ public class TestUnorderedPartitionedKVWriter {
// Verify the data
// Verify the actual data
TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
- Path outputFilePath = null;
- Path spillFilePath = null;
- try {
- outputFilePath = taskOutput.getOutputFile();
- } catch (DiskErrorException e) {
- if (numRecordsWritten > 0) {
- fail();
- } else {
- // Record checking not required.
- return;
- }
- }
- try {
- spillFilePath = taskOutput.getOutputIndexFile();
- } catch (DiskErrorException e) {
- if (numRecordsWritten > 0) {
- fail();
- } else {
- // Record checking not required.
- return;
- }
+ Path outputFilePath = kvWriter.finalOutPath;
+ Path spillFilePath = kvWriter.finalIndexPath;
+ if (numRecordsWritten > 0) {
+ assertTrue(localFs.exists(outputFilePath));
+ assertTrue(localFs.exists(spillFilePath));
+ } else {
+ return;
}
// Special case for 0 records.
@@ -533,27 +517,14 @@ public class TestUnorderedPartitionedKVWriter {
// Verify the actual data
TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
- Path outputFilePath = null;
- Path spillFilePath = null;
- try {
- outputFilePath = taskOutput.getOutputFile();
- } catch (DiskErrorException e) {
- if (numRecordsWritten > 0) {
- fail();
- } else {
- // Record checking not required.
- return;
- }
- }
- try {
- spillFilePath = taskOutput.getOutputIndexFile();
- } catch (DiskErrorException e) {
- if (numRecordsWritten > 0) {
- fail();
- } else {
- // Record checking not required.
- return;
- }
+ Path outputFilePath = kvWriter.finalOutPath;
+ Path spillFilePath = kvWriter.finalIndexPath;
+
+ if (numRecordsWritten > 0) {
+ assertTrue(localFs.exists(outputFilePath));
+ assertTrue(localFs.exists(spillFilePath));
+ } else {
+ return;
}
// Special case for 0 records.