You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/23 00:10:48 UTC

git commit: TEZ-969. Fix a bug which could cause the Merger to get stuck when merging 0 segments, in case of container re-use. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master bd6fa6ee4 -> 8c6cab4a2


TEZ-969. Fix a bug which could cause the Merger to get stuck when
merging 0 segments, in case of container re-use. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8c6cab4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8c6cab4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8c6cab4a

Branch: refs/heads/master
Commit: 8c6cab4a23109e60a1fa4cc0267bda959aee9396
Parents: bd6fa6e
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Mar 22 16:08:13 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Mar 22 16:08:13 2014 -0700

----------------------------------------------------------------------
 .../common/sort/impl/PipelinedSorter.java       |  4 +-
 .../library/common/sort/impl/TezMerger.java     | 91 +++++++++++++-------
 .../common/sort/impl/dflt/DefaultSorter.java    |  4 +-
 3 files changed, 63 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8c6cab4a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index e6d7d31..fedbcb5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -333,8 +333,6 @@ public class PipelinedSorter extends ExternalSorter {
     //The output stream for the final single output file
     FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
 
-    TezMerger.considerFinalMergeForProgress();
-
     final TezSpillRecord spillRec = new TezSpillRecord(partitions);
     final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
 
@@ -371,7 +369,7 @@ public class PipelinedSorter extends ExternalSorter {
                      segmentList, mergeFactor,
                      new Path(uniqueIdentifier),
                      (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), 
-                     nullProgressable, sortSegments,
+                     nullProgressable, sortSegments, true,
                      null, spilledRecordsCounter, null,
                      null); // Not using any Progress in TezMerger. Should just work.
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8c6cab4a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index a546970..ae03370 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -76,7 +76,7 @@ public class TezMerger {
   throws IOException {
     return 
       new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
-                           ifileReadAheadLength, ifileBufferSize, comparator, 
+                           ifileReadAheadLength, ifileBufferSize, false, comparator, 
                            reporter, null).merge(keyClass, valueClass,
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter,
@@ -101,7 +101,7 @@ public class TezMerger {
   throws IOException {
     return 
       new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
-                           ifileReadAheadLength, ifileBufferSize, comparator, 
+                           ifileReadAheadLength, ifileBufferSize, false, comparator, 
                            reporter, mergedMapOutputsCounter).merge(
                                            keyClass, valueClass,
                                            mergeFactor, tmpDir,
@@ -141,7 +141,7 @@ public class TezMerger {
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(keyClass, valueClass,
+                           sortSegments, false).merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                bytesReadCounter, mergePhase);
@@ -155,13 +155,15 @@ public class TezMerger {
                             int mergeFactor, Path tmpDir,
                             RawComparator comparator, Progressable reporter,
                             boolean sortSegments,
+                            boolean considerFinalMergeForProgress,
                             TezCounter readsCounter,
                             TezCounter writesCounter,
                             TezCounter bytesReadCounter,
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue(conf, fs, segments, comparator, reporter,
-                           sortSegments, codec).merge(keyClass, valueClass,
+                           sortSegments, codec, considerFinalMergeForProgress).
+                                         merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                bytesReadCounter,
@@ -181,7 +183,7 @@ public class TezMerger {
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(keyClass, valueClass,
+                           sortSegments, false).merge(keyClass, valueClass,
                                                mergeFactor, inMemSegments,
                                                tmpDir,
                                                readsCounter, writesCounter,
@@ -204,7 +206,7 @@ public class TezMerger {
                           Progress mergePhase)
     throws IOException {
   return new MergeQueue(conf, fs, segments, comparator, reporter,
-                         sortSegments, codec).merge(keyClass, valueClass,
+                         sortSegments, codec, false).merge(keyClass, valueClass,
                                              mergeFactor, inMemSegments,
                                              tmpDir,
                                              readsCounter, writesCounter,
@@ -380,21 +382,7 @@ public class TezMerger {
       }
     }
   }
-  
-  // Boolean variable for including/considering final merge as part of sort
-  // phase or not. This is true in map task, false in reduce task. It is
-  // used in calculating mergeProgress.
-  static boolean includeFinalMerge = false;
-  
-  /**
-   * Sets the boolean variable includeFinalMerge to true. Called from
-   * map task before calling merge() so that final merge of map task
-   * is also considered as part of sort phase.
-   */
-  public static void considerFinalMergeForProgress() {
-    includeFinalMerge = true;
-  }
-  
+
   private static class MergeQueue<K extends Object, V extends Object> 
   extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
     Configuration conf;
@@ -412,7 +400,11 @@ public class TezMerger {
     private long totalBytesProcessed;
     private float progPerByte;
     private Progress mergeProgress = new Progress();
-    
+    // Boolean variable for including/considering final merge as part of sort
+    // phase or not. This is true in map task, false in reduce task. It is
+    // used in calculating mergeProgress.
+    private final boolean considerFinalMergeForProgress;
+
     Progressable reporter;
     
     DataInputBuffer key;
@@ -435,6 +427,7 @@ public class TezMerger {
                       Path[] inputs, boolean deleteInputs, 
                       CompressionCodec codec, boolean ifileReadAhead,
                       int ifileReadAheadLength, int ifileBufferSize,
+                      boolean considerFinalMergeForProgress,
                       RawComparator comparator, Progressable reporter, 
                       TezCounter mergedMapOutputsCounter) 
     throws IOException {
@@ -446,6 +439,7 @@ public class TezMerger {
       this.codec = codec;
       this.comparator = comparator;
       this.reporter = reporter;
+      this.considerFinalMergeForProgress = considerFinalMergeForProgress;
       
       for (Path file : inputs) {
         LOG.debug("MergeQ: adding: " + file);
@@ -463,12 +457,13 @@ public class TezMerger {
     
     public MergeQueue(Configuration conf, FileSystem fs, 
         List<Segment> segments, RawComparator comparator,
-        Progressable reporter, boolean sortSegments) {
+        Progressable reporter, boolean sortSegments, boolean considerFinalMergeForProgress) {
       this.conf = conf;
       this.fs = fs;
       this.comparator = comparator;
       this.segments = segments;
       this.reporter = reporter;
+      this.considerFinalMergeForProgress = considerFinalMergeForProgress;
       if (sortSegments) {
         Collections.sort(segments, segmentComparator);
       }
@@ -476,8 +471,9 @@ public class TezMerger {
 
     public MergeQueue(Configuration conf, FileSystem fs,
         List<Segment> segments, RawComparator comparator,
-        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
-      this(conf, fs, segments, comparator, reporter, sortSegments);
+        Progressable reporter, boolean sortSegments, CompressionCodec codec,
+        boolean considerFinalMergeForProgress) {
+      this(conf, fs, segments, comparator, reporter, sortSegments, considerFinalMergeForProgress);
       this.codec = codec;
     }
 
@@ -576,6 +572,10 @@ public class TezMerger {
                                      Progress mergePhase)
         throws IOException {
       LOG.info("Merging " + segments.size() + " sorted segments");
+      if (segments.size() == 0) {
+        LOG.info("Nothing to merge. Returning an empty iterator");
+        return new EmptyIterator();
+      }
 
       /*
        * If there are inMemory segments, then they come first in the segments
@@ -655,7 +655,7 @@ public class TezMerger {
         //if we have lesser number of segments remaining, then just return the
         //iterator, else do another single level merge
         if (numSegments <= factor) { // Will always kick in if only in-mem segments are provided.
-          if (!includeFinalMerge) { // for reduce task
+          if (!considerFinalMergeForProgress) { // for reduce task
 
             // Reset totalBytesProcessed and recalculate totalBytes from the
             // remaining segments to track the progress of the final merge.
@@ -815,7 +815,7 @@ public class TezMerger {
       // If includeFinalMerge is true, allow the following while loop iterate
       // for 1 more iteration. This is to include final merge as part of the
       // computation of expected input bytes of merges
-      boolean considerFinalMerge = includeFinalMerge;
+      boolean considerFinalMerge = considerFinalMergeForProgress;
       
       while (n > f || considerFinalMerge) {
         if (n <=f ) {
@@ -834,9 +834,7 @@ public class TezMerger {
           pos = -pos-1;
         }
         segmentSizes.add(pos, mergedSize);
-        
-        n -= (f-1);
-        f = factor;
+
       }
 
       return totalBytes;
@@ -847,4 +845,37 @@ public class TezMerger {
     }
 
   }
+  
+  private static class EmptyIterator implements TezRawKeyValueIterator {
+    final Progress progress;
+
+    EmptyIterator() {
+      progress = new Progress();
+      progress.set(1.0f);
+    }
+
+    @Override
+    public DataInputBuffer getKey() throws IOException {
+      throw new RuntimeException("No keys on an empty iterator");
+    }
+
+    @Override
+    public DataInputBuffer getValue() throws IOException {
+      throw new RuntimeException("No values on an empty iterator");
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public Progress getProgress() {
+      return progress;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8c6cab4a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 9ae7957..e311e93 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -1054,8 +1054,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       return;
     }
     else {
-      TezMerger.considerFinalMergeForProgress();
-
       final TezSpillRecord spillRec = new TezSpillRecord(partitions);
       for (int parts = 0; parts < partitions; parts++) {
         //create the segments to be merged
@@ -1089,7 +1087,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
                        segmentList, mergeFactor,
                        new Path(taskIdentifier),
                        (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
-                       nullProgressable, sortSegments,
+                       nullProgressable, sortSegments, true,
                        null, spilledRecordsCounter, additionalSpillBytesRead,
                        null); // Not using any Progress in TezMerger. Should just work.