You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/23 00:10:48 UTC
git commit: TEZ-969. Fix a bug which could cause the Merger to get
stuck when merging 0 segments, in case of container re-use. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master bd6fa6ee4 -> 8c6cab4a2
TEZ-969. Fix a bug which could cause the Merger to get stuck when
merging 0 segments, in case of container re-use. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8c6cab4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8c6cab4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8c6cab4a
Branch: refs/heads/master
Commit: 8c6cab4a23109e60a1fa4cc0267bda959aee9396
Parents: bd6fa6e
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Mar 22 16:08:13 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Mar 22 16:08:13 2014 -0700
----------------------------------------------------------------------
.../common/sort/impl/PipelinedSorter.java | 4 +-
.../library/common/sort/impl/TezMerger.java | 91 +++++++++++++-------
.../common/sort/impl/dflt/DefaultSorter.java | 4 +-
3 files changed, 63 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8c6cab4a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index e6d7d31..fedbcb5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -333,8 +333,6 @@ public class PipelinedSorter extends ExternalSorter {
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
- TezMerger.considerFinalMergeForProgress();
-
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
@@ -371,7 +369,7 @@ public class PipelinedSorter extends ExternalSorter {
segmentList, mergeFactor,
new Path(uniqueIdentifier),
(RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
- nullProgressable, sortSegments,
+ nullProgressable, sortSegments, true,
null, spilledRecordsCounter, null,
null); // Not using any Progress in TezMerger. Should just work.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8c6cab4a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index a546970..ae03370 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -76,7 +76,7 @@ public class TezMerger {
throws IOException {
return
new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
- ifileReadAheadLength, ifileBufferSize, comparator,
+ ifileReadAheadLength, ifileBufferSize, false, comparator,
reporter, null).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
@@ -101,7 +101,7 @@ public class TezMerger {
throws IOException {
return
new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
- ifileReadAheadLength, ifileBufferSize, comparator,
+ ifileReadAheadLength, ifileBufferSize, false, comparator,
reporter, mergedMapOutputsCounter).merge(
keyClass, valueClass,
mergeFactor, tmpDir,
@@ -141,7 +141,7 @@ public class TezMerger {
Progress mergePhase)
throws IOException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments).merge(keyClass, valueClass,
+ sortSegments, false).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter, mergePhase);
@@ -155,13 +155,15 @@ public class TezMerger {
int mergeFactor, Path tmpDir,
RawComparator comparator, Progressable reporter,
boolean sortSegments,
+ boolean considerFinalMergeForProgress,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments, codec).merge(keyClass, valueClass,
+ sortSegments, codec, considerFinalMergeForProgress).
+ merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter,
@@ -181,7 +183,7 @@ public class TezMerger {
Progress mergePhase)
throws IOException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments).merge(keyClass, valueClass,
+ sortSegments, false).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
@@ -204,7 +206,7 @@ public class TezMerger {
Progress mergePhase)
throws IOException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments, codec).merge(keyClass, valueClass,
+ sortSegments, codec, false).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
@@ -380,21 +382,7 @@ public class TezMerger {
}
}
}
-
- // Boolean variable for including/considering final merge as part of sort
- // phase or not. This is true in map task, false in reduce task. It is
- // used in calculating mergeProgress.
- static boolean includeFinalMerge = false;
-
- /**
- * Sets the boolean variable includeFinalMerge to true. Called from
- * map task before calling merge() so that final merge of map task
- * is also considered as part of sort phase.
- */
- public static void considerFinalMergeForProgress() {
- includeFinalMerge = true;
- }
-
+
private static class MergeQueue<K extends Object, V extends Object>
extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
Configuration conf;
@@ -412,7 +400,11 @@ public class TezMerger {
private long totalBytesProcessed;
private float progPerByte;
private Progress mergeProgress = new Progress();
-
+ // Boolean variable for including/considering final merge as part of sort
+ // phase or not. This is true in map task, false in reduce task. It is
+ // used in calculating mergeProgress.
+ private final boolean considerFinalMergeForProgress;
+
Progressable reporter;
DataInputBuffer key;
@@ -435,6 +427,7 @@ public class TezMerger {
Path[] inputs, boolean deleteInputs,
CompressionCodec codec, boolean ifileReadAhead,
int ifileReadAheadLength, int ifileBufferSize,
+ boolean considerFinalMergeForProgress,
RawComparator comparator, Progressable reporter,
TezCounter mergedMapOutputsCounter)
throws IOException {
@@ -446,6 +439,7 @@ public class TezMerger {
this.codec = codec;
this.comparator = comparator;
this.reporter = reporter;
+ this.considerFinalMergeForProgress = considerFinalMergeForProgress;
for (Path file : inputs) {
LOG.debug("MergeQ: adding: " + file);
@@ -463,12 +457,13 @@ public class TezMerger {
public MergeQueue(Configuration conf, FileSystem fs,
List<Segment> segments, RawComparator comparator,
- Progressable reporter, boolean sortSegments) {
+ Progressable reporter, boolean sortSegments, boolean considerFinalMergeForProgress) {
this.conf = conf;
this.fs = fs;
this.comparator = comparator;
this.segments = segments;
this.reporter = reporter;
+ this.considerFinalMergeForProgress = considerFinalMergeForProgress;
if (sortSegments) {
Collections.sort(segments, segmentComparator);
}
@@ -476,8 +471,9 @@ public class TezMerger {
public MergeQueue(Configuration conf, FileSystem fs,
List<Segment> segments, RawComparator comparator,
- Progressable reporter, boolean sortSegments, CompressionCodec codec) {
- this(conf, fs, segments, comparator, reporter, sortSegments);
+ Progressable reporter, boolean sortSegments, CompressionCodec codec,
+ boolean considerFinalMergeForProgress) {
+ this(conf, fs, segments, comparator, reporter, sortSegments, considerFinalMergeForProgress);
this.codec = codec;
}
@@ -576,6 +572,10 @@ public class TezMerger {
Progress mergePhase)
throws IOException {
LOG.info("Merging " + segments.size() + " sorted segments");
+ if (segments.size() == 0) {
+ LOG.info("Nothing to merge. Returning an empty iterator");
+ return new EmptyIterator();
+ }
/*
* If there are inMemory segments, then they come first in the segments
@@ -655,7 +655,7 @@ public class TezMerger {
//if we have lesser number of segments remaining, then just return the
//iterator, else do another single level merge
if (numSegments <= factor) { // Will always kick in if only in-mem segments are provided.
- if (!includeFinalMerge) { // for reduce task
+ if (!considerFinalMergeForProgress) { // for reduce task
// Reset totalBytesProcessed and recalculate totalBytes from the
// remaining segments to track the progress of the final merge.
@@ -815,7 +815,7 @@ public class TezMerger {
// If includeFinalMerge is true, allow the following while loop iterate
// for 1 more iteration. This is to include final merge as part of the
// computation of expected input bytes of merges
- boolean considerFinalMerge = includeFinalMerge;
+ boolean considerFinalMerge = considerFinalMergeForProgress;
while (n > f || considerFinalMerge) {
if (n <=f ) {
@@ -834,9 +834,7 @@ public class TezMerger {
pos = -pos-1;
}
segmentSizes.add(pos, mergedSize);
-
- n -= (f-1);
- f = factor;
+
}
return totalBytes;
@@ -847,4 +845,37 @@ public class TezMerger {
}
}
+
+ private static class EmptyIterator implements TezRawKeyValueIterator {
+ final Progress progress;
+
+ EmptyIterator() {
+ progress = new Progress();
+ progress.set(1.0f);
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ throw new RuntimeException("No keys on an empty iterator");
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ throw new RuntimeException("No values on an empty iterator");
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Progress getProgress() {
+ return progress;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8c6cab4a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 9ae7957..e311e93 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -1054,8 +1054,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
return;
}
else {
- TezMerger.considerFinalMergeForProgress();
-
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
for (int parts = 0; parts < partitions; parts++) {
//create the segments to be merged
@@ -1089,7 +1087,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
segmentList, mergeFactor,
new Path(taskIdentifier),
(RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
- nullProgressable, sortSegments,
+ nullProgressable, sortSegments, true,
null, spilledRecordsCounter, additionalSpillBytesRead,
null); // Not using any Progress in TezMerger. Should just work.