You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:48:01 UTC

[20/50] [abbrv] tez git commit: TEZ-3637. TezMerger logs too much at INFO level. (sseth)

TEZ-3637. TezMerger logs too much at INFO level. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bdc0ee9c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bdc0ee9c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bdc0ee9c

Branch: refs/heads/TEZ-1190
Commit: bdc0ee9c9ffcc9c199e0ca4245d7084f6df943c4
Parents: d5bf28b
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 27 19:26:31 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 27 19:26:31 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../shuffle/orderedgrouped/MergeManager.java    | 213 +++++++++++++++----
 .../library/common/sort/impl/TezMerger.java     |  56 ++---
 3 files changed, 197 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bdc0ee9c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c9ac898..88b0b98 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3637. TezMerger logs too much at INFO level
   TEZ-3638. VertexImpl logs too much at info when removing tasks after auto-reduce parallelism
   TEZ-3634. reduce the default buffer sizes in PipelinedSorter by a small amount.
   TEZ-3627. Use queue name available in RegisterApplicationMasterResponse for publishing to ATS.

http://git-wip-us.apache.org/repos/asf/tez/blob/bdc0ee9c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 26bdca7..9f0e73c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.io.FileChunk;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -146,6 +147,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
   private final int ifileReadAheadLength;
   private final int ifileBufferSize;
 
+  // Variables for stats and logging
+  private long lastInMemSegmentLogTime = -1L;
+  private final SegmentStatsTracker statsInMemTotal = new SegmentStatsTracker();
+  private final SegmentStatsTracker statsInMemLastLog = new SegmentStatsTracker();
+
+
   private AtomicInteger mergeFileSequenceId = new AtomicInteger(0);
 
   private final boolean cleanup;
@@ -465,13 +472,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
     unreserve(size);
   }
 
+
   @Override
   public synchronized void closeInMemoryFile(MapOutput mapOutput) { 
     inMemoryMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
-          + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
-          + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ", mapOutput=" +
-          mapOutput);
+    trackAndLogCloseInMemoryFile(mapOutput);
 
     commitMemory+= mapOutput.getSize();
 
@@ -490,6 +495,44 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
     }
   }
 
+  private void trackAndLogCloseInMemoryFile(MapOutput mapOutput) {
+    statsInMemTotal.updateStats(mapOutput.getSize());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+          + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+          + ", commitMemory -> " + commitMemory + ", usedMemory ->" +
+          usedMemory + ", mapOutput=" +
+          mapOutput);
+    } else {
+      statsInMemLastLog.updateStats(mapOutput.getSize());
+      long now = Time.monotonicNow();
+      if (now > lastInMemSegmentLogTime + 30 * 1000L) {
+        LOG.info(
+            "CloseInMemoryFile. Current state: inMemoryMapOutputs.size={}," +
+                " commitMemory={}," +
+                " usedMemory={}. Since last log:" +
+                " count={}," +
+                " min={}," +
+                " max={}," +
+                " total={}," +
+                " avg={}",
+            inMemoryMapOutputs.size(),
+            commitMemory,
+            usedMemory,
+            statsInMemLastLog.count,
+            statsInMemLastLog.minSize,
+            statsInMemLastLog.maxSize,
+            statsInMemLastLog.size,
+            (statsInMemLastLog.count == 0 ? "nan" :
+                (statsInMemLastLog.size / (double) statsInMemLastLog.count))
+        );
+        statsInMemLastLog.reset();
+        lastInMemSegmentLogTime = now;
+      }
+    }
+  }
+
   private void startMemToDiskMerge() {
     synchronized (inMemoryMerger) {
       if (!inMemoryMerger.isInProgress()) {
@@ -505,9 +548,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
   
   public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
     inMemoryMergedMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
-             ", inMemoryMergedMapOutputs.size() -> " + 
-             inMemoryMergedMapOutputs.size());
+    if (LOG.isDebugEnabled()) {
+      // This log could be moved to INFO level for a while, after mem-to-mem
+      // merge is production ready.
+      LOG.debug("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
+          ", inMemoryMergedMapOutputs.size() -> " +
+          inMemoryMergedMapOutputs.size());
+    }
 
     commitMemory += mapOutput.getSize();
 
@@ -535,9 +582,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
     }
 
     onDiskMapOutputs.add(file);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("close onDiskFile=" + file.getPath() + ", len=" + file.getLength());
-    }
+    logCloseOnDiskFile(file);
 
     synchronized (onDiskMerger) {
       if (!onDiskMerger.isInProgress() &&
@@ -547,6 +592,23 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
     }
   }
 
+  private long lastOnDiskSegmentLogTime = -1L;
+  private void logCloseOnDiskFile(FileChunk file) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "close onDiskFile=" + file.getPath() + ", len=" + file.getLength() +
+              ", onDisMapOutputs=" + onDiskMapOutputs.size());
+    } else {
+      long now = Time.monotonicNow();
+      if (now > lastOnDiskSegmentLogTime + 30 * 1000L) {
+        LOG.info(
+            "close onDiskFile. State: NumOnDiskFiles={}. Current: path={}, len={}",
+            onDiskMapOutputs.size(), file.getPath(), file.getLength());
+        lastOnDiskSegmentLogTime = now;
+      }
+    }
+  }
+
   /**
    * Should <b>only</b> be used after the Shuffle phaze is complete, otherwise can
    * return an invalid state since a merge may not be in progress dur to
@@ -576,6 +638,14 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs);
       onDiskMapOutputs.clear();
 
+      if (statsInMemTotal.count > 0) {
+        LOG.info(
+            "TotalInMemFetchStats: count={}, totalSize={}, min={}, max={}, avg={}",
+            statsInMemTotal.count, statsInMemTotal.size,
+            statsInMemTotal.minSize, statsInMemTotal.maxSize,
+            (statsInMemTotal.size / (float) statsInMemTotal.size));
+      }
+
       // Don't attempt a final merge if close is invoked as a result of a previous
       // shuffle exception / error.
       if (tryFinalMerge) {
@@ -1069,21 +1139,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
                                        List<MapOutput> inMemoryMapOutputs,
                                        List<FileChunk> onDiskMapOutputs
                                        ) throws IOException, InterruptedException {
-    LOG.info("finalMerge called with " + 
-             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
-             onDiskMapOutputs.size() + " on-disk map-outputs");
 
-    if (LOG.isDebugEnabled()) {
-      for (MapOutput inMemoryMapOutput : inMemoryMapOutputs) {
-        LOG.debug("inMemoryOutput=" + inMemoryMapOutput + ", size=" + inMemoryMapOutput
-            .getSize());
-      }
-
-      for (FileChunk onDiskMapOutput : onDiskMapOutputs) {
-        LOG.debug("onDiskMapOutput=" + onDiskMapOutput.getPath() + ", size=" + onDiskMapOutput
-                .getLength());
-      }
-    }
+    logFinalMergeStart(inMemoryMapOutputs, onDiskMapOutputs);
+    StringBuilder finalMergeLog = new StringBuilder();
     
     inputContext.notifyProgress();
 
@@ -1148,15 +1206,25 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
         // add to list of final disk outputs.
         onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen()));
 
-        LOG.info("Merged " + numMemDiskSegments + " segments, " +
-                 inMemToDiskBytes + " bytes to disk to satisfy " +
-                 "reduce memory limit. outputPath=" + outputPath);
+        if (LOG.isInfoEnabled()) {
+          finalMergeLog.append("MemMerged: " + numMemDiskSegments + ", " + inMemToDiskBytes);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Merged " + numMemDiskSegments + "segments, size=" +
+                inMemToDiskBytes + " to " + outputPath);
+          }
+        }
+
         inMemToDiskBytes = 0;
         memDiskSegments.clear();
       } else if (inMemToDiskBytes != 0) {
-        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
-            inMemToDiskBytes + " bytes in memory for " +
-                 "intermediate, on-disk merge");
+        if (LOG.isInfoEnabled()) {
+          finalMergeLog.append("DelayedMemMerge: " + numMemDiskSegments + ", " + inMemToDiskBytes);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Keeping " + numMemDiskSegments + " segments, " +
+                inMemToDiskBytes + " bytes in memory for " +
+                "intermediate, on-disk merge");
+          }
+        }
       }
     }
 
@@ -1167,8 +1235,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
     for (FileChunk fileChunk : onDisk) {
       final long fileLength = fileChunk.getLength();
       onDiskBytes += fileLength;
-      LOG.info("Disk file=" + fileChunk.getPath() + ", len=" + fileLength + ", isLocal=" +
-          fileChunk.isLocalFile());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Disk file=" + fileChunk.getPath() + ", len=" + fileLength +
+            ", isLocal=" +
+            fileChunk.isLocalFile());
+      }
 
       final Path file = fileChunk.getPath();
       TezCounter counter =
@@ -1179,8 +1250,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       diskSegments.add(new DiskSegment(fs, file, fileOffset, fileLength, codec, ifileReadAhead,
                                    ifileReadAheadLength, ifileBufferSize, preserve, counter));
     }
-    LOG.info("Merging " + onDisk.length + " files, " +
-             onDiskBytes + " bytes from disk");
+    if (LOG.isInfoEnabled()) {
+      finalMergeLog.append(". DiskSeg: " + onDisk.length + ", " + onDiskBytes);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Merging " + onDisk.length + " files, " +
+            onDiskBytes + " bytes from disk");
+      }
+    }
     Collections.sort(diskSegments, new Comparator<Segment>() {
       public int compare(Segment o1, Segment o2) {
         if (o1.getLength() == o2.getLength()) {
@@ -1194,8 +1270,14 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
     List<Segment> finalSegments = new ArrayList<Segment>();
     long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
                                              finalSegments, 0);
-    LOG.info("Merging " + finalSegments.size() + " segments, " +
-             inMemBytes + " bytes from memory into reduce");
+    if (LOG.isInfoEnabled()) {
+      finalMergeLog.append(". MemSeg: " + finalSegments.size() + ", " + inMemBytes);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Merging " + finalSegments.size() + " segments, " +
+            inMemBytes + " bytes from memory into reduce");
+      }
+    }
+
     if (0 != onDiskBytes) {
       final int numInMemSegments = memDiskSegments.size();
       diskSegments.addAll(0, memDiskSegments);
@@ -1211,6 +1293,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       finalSegments.add(new Segment(
             new RawKVIteratorReader(diskMerge, onDiskBytes), null));
     }
+    if (LOG.isInfoEnabled()) {
+      LOG.info(finalMergeLog.toString());
+    }
     // This is doing nothing but creating an iterator over the segments.
     return TezMerger.merge(job, fs, keyClass, valueClass,
                  finalSegments, finalSegments.size(), tmpDir,
@@ -1218,6 +1303,35 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
                  additionalBytesRead, null);
   }
 
+
+  private void logFinalMergeStart(List<MapOutput> inMemoryMapOutputs,
+                                  List<FileChunk> onDiskMapOutputs) {
+    long inMemSegmentSize = 0;
+    for (MapOutput inMemoryMapOutput : inMemoryMapOutputs) {
+      inMemSegmentSize += inMemoryMapOutput.getSize();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("finalMerge: inMemoryOutput=" + inMemoryMapOutput + ", size=" +
+            inMemoryMapOutput.getSize());
+      }
+    }
+    long onDiskSegmentSize = 0;
+    for (FileChunk onDiskMapOutput : onDiskMapOutputs) {
+      onDiskSegmentSize += onDiskMapOutput.getLength();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("finalMerge: onDiskMapOutput=" + onDiskMapOutput.getPath() +
+            ", size=" + onDiskMapOutput.getLength());
+      }
+    }
+
+    LOG.info(
+        "finalMerge with #inMemoryOutputs={}, size={} and #onDiskOutputs={}, size={}",
+        inMemoryMapOutputs.size(), inMemSegmentSize, onDiskMapOutputs.size(),
+        onDiskSegmentSize);
+
+  }
+
   @VisibleForTesting
   long getCommitMemory() {
     return commitMemory;
@@ -1232,4 +1346,31 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
   void waitForMemToMemMerge() throws InterruptedException {
     memToMemMerger.waitForMerge();
   }
+
+
+
+  private static class SegmentStatsTracker {
+    private long size;
+    private int count;
+    private long minSize;
+    private long maxSize;
+
+    SegmentStatsTracker() {
+      reset();
+    }
+
+    void updateStats(long segSize) {
+      size += segSize;
+      count++;
+      minSize = (segSize < minSize ? segSize : minSize);
+      maxSize = (segSize > maxSize ? segSize : maxSize);
+    }
+
+    void reset() {
+      size = 0L;
+      count = 0;
+      minSize = Long.MAX_VALUE;
+      maxSize = Long.MIN_VALUE;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bdc0ee9c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 17e0fe2..8f3e84a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -90,32 +90,6 @@ public class TezMerger {
                                            mergePhase);
   }
 
-  public static 
-  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class keyClass, Class valueClass, 
-                            CompressionCodec codec, boolean ifileReadAhead,
-                            int ifileReadAheadLength, int ifileBufferSize,
-                            Path[] inputs, boolean deleteInputs, 
-                            int mergeFactor, Path tmpDir,
-                            RawComparator comparator,
-                            Progressable reporter,
-                            TezCounter readsCounter,
-                            TezCounter writesCounter,
-                            TezCounter mergedMapOutputsCounter,
-                            TezCounter bytesReadCounter,
-                            Progress mergePhase)
-      throws IOException, InterruptedException {
-    return 
-      new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
-                           ifileReadAheadLength, ifileBufferSize, false, comparator, 
-                           reporter, mergedMapOutputsCounter).merge(
-                                           keyClass, valueClass,
-                                           mergeFactor, tmpDir,
-                                           readsCounter, writesCounter,
-                                           bytesReadCounter,
-                                           mergePhase);
-  }
-  
   // Used by the in-memory merger.
   public static
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs, 
@@ -225,8 +199,8 @@ public class TezMerger {
         }
       }
     }
-    if ((count > 0) && LOG.isDebugEnabled()) {
-      LOG.debug("writeFile SAME_KEY count=" + count);
+    if ((count > 0) && LOG.isTraceEnabled()) {
+      LOG.trace("writeFile SAME_KEY count=" + count);
     }
   }
 
@@ -510,7 +484,9 @@ public class TezMerger {
       this.considerFinalMergeForProgress = considerFinalMergeForProgress;
       
       for (Path file : inputs) {
-        LOG.debug("MergeQ: adding: " + file);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("MergeQ: adding: " + file);
+        }
         segments.add(new DiskSegment(fs, file, codec, ifileReadAhead,
                                       ifileReadAheadLength, ifileBufferSize,
                                       !deleteInputs, 
@@ -702,11 +678,13 @@ public class TezMerger {
                                      TezCounter bytesReadCounter,
                                      Progress mergePhase)
         throws IOException, InterruptedException {
-      LOG.info("Merging " + segments.size() + " sorted segments");
       if (segments.size() == 0) {
         LOG.info("Nothing to merge. Returning an empty iterator");
         return new EmptyIterator();
       }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Merging " + segments.size() + " sorted segments");
+      }
 
       /*
        * If there are inMemory segments, then they come first in the segments
@@ -806,19 +784,23 @@ public class TezMerger {
             mergeProgress.set(totalBytesProcessed * progPerByte);
           else
             mergeProgress.set(1.0f); // Last pass and no segments left - we're done
-          
-          LOG.info("Down to the last merge-pass, with " + numSegments + 
-                   " segments left of total size: " +
-                   (totalBytes - totalBytesProcessed) + " bytes");
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Down to the last merge-pass, with " + numSegments +
+                " segments left of total size: " +
+                (totalBytes - totalBytesProcessed) + " bytes");
+          }
           // At this point, Factor Segments have not been physically
           // materialized. The merge will be done dynamically. Some of them may
           // be in-memory segments, other on-disk semgnets. Decision to be made
           // by a finalMerge is that is required.
           return this;
         } else {
-          LOG.info("Merging " + segmentsToMerge.size() + 
-                   " intermediate segments out of a total of " + 
-                   (segments.size()+segmentsToMerge.size()));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Merging " + segmentsToMerge.size() +
+                " intermediate segments out of a total of " +
+                (segments.size() + segmentsToMerge.size()));
+          }
           
           long bytesProcessedInPrevMerges = totalBytesProcessed;
           totalBytesProcessed += startBytes;