You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:48:01 UTC
[20/50] [abbrv] tez git commit: TEZ-3637. TezMerger logs too much at
INFO level. (sseth)
TEZ-3637. TezMerger logs too much at INFO level. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bdc0ee9c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bdc0ee9c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bdc0ee9c
Branch: refs/heads/TEZ-1190
Commit: bdc0ee9c9ffcc9c199e0ca4245d7084f6df943c4
Parents: d5bf28b
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 27 19:26:31 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 27 19:26:31 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../shuffle/orderedgrouped/MergeManager.java | 213 +++++++++++++++----
.../library/common/sort/impl/TezMerger.java | 56 ++---
3 files changed, 197 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/bdc0ee9c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c9ac898..88b0b98 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3637. TezMerger logs too much at INFO level
TEZ-3638. VertexImpl logs too much at info when removing tasks after auto-reduce parallelism
TEZ-3634. reduce the default buffer sizes in PipelinedSorter by a small amount.
TEZ-3627. Use queue name available in RegisterApplicationMasterResponse for publishing to ATS.
http://git-wip-us.apache.org/repos/asf/tez/blob/bdc0ee9c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 26bdca7..9f0e73c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.io.FileChunk;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
@@ -146,6 +147,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
private final int ifileReadAheadLength;
private final int ifileBufferSize;
+ // Variables for stats and logging
+ private long lastInMemSegmentLogTime = -1L;
+ private final SegmentStatsTracker statsInMemTotal = new SegmentStatsTracker();
+ private final SegmentStatsTracker statsInMemLastLog = new SegmentStatsTracker();
+
+
private AtomicInteger mergeFileSequenceId = new AtomicInteger(0);
private final boolean cleanup;
@@ -465,13 +472,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
unreserve(size);
}
+
@Override
public synchronized void closeInMemoryFile(MapOutput mapOutput) {
inMemoryMapOutputs.add(mapOutput);
- LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
- + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
- + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ", mapOutput=" +
- mapOutput);
+ trackAndLogCloseInMemoryFile(mapOutput);
commitMemory+= mapOutput.getSize();
@@ -490,6 +495,44 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
}
}
+ private void trackAndLogCloseInMemoryFile(MapOutput mapOutput) {
+ statsInMemTotal.updateStats(mapOutput.getSize());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+ + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+ + ", commitMemory -> " + commitMemory + ", usedMemory ->" +
+ usedMemory + ", mapOutput=" +
+ mapOutput);
+ } else {
+ statsInMemLastLog.updateStats(mapOutput.getSize());
+ long now = Time.monotonicNow();
+ if (now > lastInMemSegmentLogTime + 30 * 1000L) {
+ LOG.info(
+ "CloseInMemoryFile. Current state: inMemoryMapOutputs.size={}," +
+ " commitMemory={}," +
+ " usedMemory={}. Since last log:" +
+ " count={}," +
+ " min={}," +
+ " max={}," +
+ " total={}," +
+ " avg={}",
+ inMemoryMapOutputs.size(),
+ commitMemory,
+ usedMemory,
+ statsInMemLastLog.count,
+ statsInMemLastLog.minSize,
+ statsInMemLastLog.maxSize,
+ statsInMemLastLog.size,
+ (statsInMemLastLog.count == 0 ? "nan" :
+ (statsInMemLastLog.size / (double) statsInMemLastLog.count))
+ );
+ statsInMemLastLog.reset();
+ lastInMemSegmentLogTime = now;
+ }
+ }
+ }
+
private void startMemToDiskMerge() {
synchronized (inMemoryMerger) {
if (!inMemoryMerger.isInProgress()) {
@@ -505,9 +548,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
inMemoryMergedMapOutputs.add(mapOutput);
- LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
- ", inMemoryMergedMapOutputs.size() -> " +
- inMemoryMergedMapOutputs.size());
+ if (LOG.isDebugEnabled()) {
+ // This log could be moved to INFO level for a while, after mem-to-mem
+ // merge is production ready.
+ LOG.debug("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
+ ", inMemoryMergedMapOutputs.size() -> " +
+ inMemoryMergedMapOutputs.size());
+ }
commitMemory += mapOutput.getSize();
@@ -535,9 +582,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
}
onDiskMapOutputs.add(file);
- if (LOG.isDebugEnabled()) {
- LOG.debug("close onDiskFile=" + file.getPath() + ", len=" + file.getLength());
- }
+ logCloseOnDiskFile(file);
synchronized (onDiskMerger) {
if (!onDiskMerger.isInProgress() &&
@@ -547,6 +592,23 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
}
}
+ private long lastOnDiskSegmentLogTime = -1L;
+ private void logCloseOnDiskFile(FileChunk file) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "close onDiskFile=" + file.getPath() + ", len=" + file.getLength() +
+ ", onDisMapOutputs=" + onDiskMapOutputs.size());
+ } else {
+ long now = Time.monotonicNow();
+ if (now > lastOnDiskSegmentLogTime + 30 * 1000L) {
+ LOG.info(
+ "close onDiskFile. State: NumOnDiskFiles={}. Current: path={}, len={}",
+ onDiskMapOutputs.size(), file.getPath(), file.getLength());
+ lastOnDiskSegmentLogTime = now;
+ }
+ }
+ }
+
/**
* Should <b>only</b> be used after the Shuffle phaze is complete, otherwise can
* return an invalid state since a merge may not be in progress dur to
@@ -576,6 +638,14 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs);
onDiskMapOutputs.clear();
+ if (statsInMemTotal.count > 0) {
+ LOG.info(
+ "TotalInMemFetchStats: count={}, totalSize={}, min={}, max={}, avg={}",
+ statsInMemTotal.count, statsInMemTotal.size,
+ statsInMemTotal.minSize, statsInMemTotal.maxSize,
+ (statsInMemTotal.size / (float) statsInMemTotal.size));
+ }
+
// Don't attempt a final merge if close is invoked as a result of a previous
// shuffle exception / error.
if (tryFinalMerge) {
@@ -1069,21 +1139,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
List<MapOutput> inMemoryMapOutputs,
List<FileChunk> onDiskMapOutputs
) throws IOException, InterruptedException {
- LOG.info("finalMerge called with " +
- inMemoryMapOutputs.size() + " in-memory map-outputs and " +
- onDiskMapOutputs.size() + " on-disk map-outputs");
- if (LOG.isDebugEnabled()) {
- for (MapOutput inMemoryMapOutput : inMemoryMapOutputs) {
- LOG.debug("inMemoryOutput=" + inMemoryMapOutput + ", size=" + inMemoryMapOutput
- .getSize());
- }
-
- for (FileChunk onDiskMapOutput : onDiskMapOutputs) {
- LOG.debug("onDiskMapOutput=" + onDiskMapOutput.getPath() + ", size=" + onDiskMapOutput
- .getLength());
- }
- }
+ logFinalMergeStart(inMemoryMapOutputs, onDiskMapOutputs);
+ StringBuilder finalMergeLog = new StringBuilder();
inputContext.notifyProgress();
@@ -1148,15 +1206,25 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
// add to list of final disk outputs.
onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen()));
- LOG.info("Merged " + numMemDiskSegments + " segments, " +
- inMemToDiskBytes + " bytes to disk to satisfy " +
- "reduce memory limit. outputPath=" + outputPath);
+ if (LOG.isInfoEnabled()) {
+ finalMergeLog.append("MemMerged: " + numMemDiskSegments + ", " + inMemToDiskBytes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Merged " + numMemDiskSegments + "segments, size=" +
+ inMemToDiskBytes + " to " + outputPath);
+ }
+ }
+
inMemToDiskBytes = 0;
memDiskSegments.clear();
} else if (inMemToDiskBytes != 0) {
- LOG.info("Keeping " + numMemDiskSegments + " segments, " +
- inMemToDiskBytes + " bytes in memory for " +
- "intermediate, on-disk merge");
+ if (LOG.isInfoEnabled()) {
+ finalMergeLog.append("DelayedMemMerge: " + numMemDiskSegments + ", " + inMemToDiskBytes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Keeping " + numMemDiskSegments + " segments, " +
+ inMemToDiskBytes + " bytes in memory for " +
+ "intermediate, on-disk merge");
+ }
+ }
}
}
@@ -1167,8 +1235,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
for (FileChunk fileChunk : onDisk) {
final long fileLength = fileChunk.getLength();
onDiskBytes += fileLength;
- LOG.info("Disk file=" + fileChunk.getPath() + ", len=" + fileLength + ", isLocal=" +
- fileChunk.isLocalFile());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Disk file=" + fileChunk.getPath() + ", len=" + fileLength +
+ ", isLocal=" +
+ fileChunk.isLocalFile());
+ }
final Path file = fileChunk.getPath();
TezCounter counter =
@@ -1179,8 +1250,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
diskSegments.add(new DiskSegment(fs, file, fileOffset, fileLength, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, preserve, counter));
}
- LOG.info("Merging " + onDisk.length + " files, " +
- onDiskBytes + " bytes from disk");
+ if (LOG.isInfoEnabled()) {
+ finalMergeLog.append(". DiskSeg: " + onDisk.length + ", " + onDiskBytes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Merging " + onDisk.length + " files, " +
+ onDiskBytes + " bytes from disk");
+ }
+ }
Collections.sort(diskSegments, new Comparator<Segment>() {
public int compare(Segment o1, Segment o2) {
if (o1.getLength() == o2.getLength()) {
@@ -1194,8 +1270,14 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
List<Segment> finalSegments = new ArrayList<Segment>();
long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
finalSegments, 0);
- LOG.info("Merging " + finalSegments.size() + " segments, " +
- inMemBytes + " bytes from memory into reduce");
+ if (LOG.isInfoEnabled()) {
+ finalMergeLog.append(". MemSeg: " + finalSegments.size() + ", " + inMemBytes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Merging " + finalSegments.size() + " segments, " +
+ inMemBytes + " bytes from memory into reduce");
+ }
+ }
+
if (0 != onDiskBytes) {
final int numInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
@@ -1211,6 +1293,9 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
finalSegments.add(new Segment(
new RawKVIteratorReader(diskMerge, onDiskBytes), null));
}
+ if (LOG.isInfoEnabled()) {
+ LOG.info(finalMergeLog.toString());
+ }
// This is doing nothing but creating an iterator over the segments.
return TezMerger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
@@ -1218,6 +1303,35 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
additionalBytesRead, null);
}
+
+ private void logFinalMergeStart(List<MapOutput> inMemoryMapOutputs,
+ List<FileChunk> onDiskMapOutputs) {
+ long inMemSegmentSize = 0;
+ for (MapOutput inMemoryMapOutput : inMemoryMapOutputs) {
+ inMemSegmentSize += inMemoryMapOutput.getSize();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finalMerge: inMemoryOutput=" + inMemoryMapOutput + ", size=" +
+ inMemoryMapOutput.getSize());
+ }
+ }
+ long onDiskSegmentSize = 0;
+ for (FileChunk onDiskMapOutput : onDiskMapOutputs) {
+ onDiskSegmentSize += onDiskMapOutput.getLength();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finalMerge: onDiskMapOutput=" + onDiskMapOutput.getPath() +
+ ", size=" + onDiskMapOutput.getLength());
+ }
+ }
+
+ LOG.info(
+ "finalMerge with #inMemoryOutputs={}, size={} and #onDiskOutputs={}, size={}",
+ inMemoryMapOutputs.size(), inMemSegmentSize, onDiskMapOutputs.size(),
+ onDiskSegmentSize);
+
+ }
+
@VisibleForTesting
long getCommitMemory() {
return commitMemory;
@@ -1232,4 +1346,31 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
void waitForMemToMemMerge() throws InterruptedException {
memToMemMerger.waitForMerge();
}
+
+
+
+ private static class SegmentStatsTracker {
+ private long size;
+ private int count;
+ private long minSize;
+ private long maxSize;
+
+ SegmentStatsTracker() {
+ reset();
+ }
+
+ void updateStats(long segSize) {
+ size += segSize;
+ count++;
+ minSize = (segSize < minSize ? segSize : minSize);
+ maxSize = (segSize > maxSize ? segSize : maxSize);
+ }
+
+ void reset() {
+ size = 0L;
+ count = 0;
+ minSize = Long.MAX_VALUE;
+ maxSize = Long.MIN_VALUE;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bdc0ee9c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 17e0fe2..8f3e84a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -90,32 +90,6 @@ public class TezMerger {
mergePhase);
}
- public static
- TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- Class keyClass, Class valueClass,
- CompressionCodec codec, boolean ifileReadAhead,
- int ifileReadAheadLength, int ifileBufferSize,
- Path[] inputs, boolean deleteInputs,
- int mergeFactor, Path tmpDir,
- RawComparator comparator,
- Progressable reporter,
- TezCounter readsCounter,
- TezCounter writesCounter,
- TezCounter mergedMapOutputsCounter,
- TezCounter bytesReadCounter,
- Progress mergePhase)
- throws IOException, InterruptedException {
- return
- new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
- ifileReadAheadLength, ifileBufferSize, false, comparator,
- reporter, mergedMapOutputsCounter).merge(
- keyClass, valueClass,
- mergeFactor, tmpDir,
- readsCounter, writesCounter,
- bytesReadCounter,
- mergePhase);
- }
-
// Used by the in-memory merger.
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
@@ -225,8 +199,8 @@ public class TezMerger {
}
}
}
- if ((count > 0) && LOG.isDebugEnabled()) {
- LOG.debug("writeFile SAME_KEY count=" + count);
+ if ((count > 0) && LOG.isTraceEnabled()) {
+ LOG.trace("writeFile SAME_KEY count=" + count);
}
}
@@ -510,7 +484,9 @@ public class TezMerger {
this.considerFinalMergeForProgress = considerFinalMergeForProgress;
for (Path file : inputs) {
- LOG.debug("MergeQ: adding: " + file);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("MergeQ: adding: " + file);
+ }
segments.add(new DiskSegment(fs, file, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize,
!deleteInputs,
@@ -702,11 +678,13 @@ public class TezMerger {
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException, InterruptedException {
- LOG.info("Merging " + segments.size() + " sorted segments");
if (segments.size() == 0) {
LOG.info("Nothing to merge. Returning an empty iterator");
return new EmptyIterator();
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Merging " + segments.size() + " sorted segments");
+ }
/*
* If there are inMemory segments, then they come first in the segments
@@ -806,19 +784,23 @@ public class TezMerger {
mergeProgress.set(totalBytesProcessed * progPerByte);
else
mergeProgress.set(1.0f); // Last pass and no segments left - we're done
-
- LOG.info("Down to the last merge-pass, with " + numSegments +
- " segments left of total size: " +
- (totalBytes - totalBytesProcessed) + " bytes");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Down to the last merge-pass, with " + numSegments +
+ " segments left of total size: " +
+ (totalBytes - totalBytesProcessed) + " bytes");
+ }
// At this point, Factor Segments have not been physically
// materialized. The merge will be done dynamically. Some of them may
// be in-memory segments, other on-disk semgnets. Decision to be made
// by a finalMerge is that is required.
return this;
} else {
- LOG.info("Merging " + segmentsToMerge.size() +
- " intermediate segments out of a total of " +
- (segments.size()+segmentsToMerge.size()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Merging " + segmentsToMerge.size() +
+ " intermediate segments out of a total of " +
+ (segments.size() + segmentsToMerge.size()));
+ }
long bytesProcessedInPrevMerges = totalBytesProcessed;
totalBytesProcessed += startBytes;