You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/08/29 18:31:49 UTC
git commit: TEZ-1516. Log transfer rates for broadcast fetch. (sseth)
Repository: tez
Updated Branches:
refs/heads/master 7f70f9ddd -> 3dc64aced
TEZ-1516. Log transfer rates for broadcast fetch. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3dc64ace
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3dc64ace
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3dc64ace
Branch: refs/heads/master
Commit: 3dc64aced6e5c806e55fd65faf95315552037f7f
Parents: 7f70f9d
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Aug 29 09:31:17 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 29 09:31:17 2014 -0700
----------------------------------------------------------------------
.../common/shuffle/impl/ShuffleScheduler.java | 40 ++++++++++++-----
.../shuffle/common/impl/ShuffleManager.java | 47 +++++++++++++++++---
2 files changed, 72 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3dc64ace/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 6cda4e2..6874afb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -161,17 +161,21 @@ class ShuffleScheduler {
MapHost host,
long bytesCompressed,
long bytesDecompressed,
- long milis,
+ long millis,
MapOutput output
) throws IOException {
- failureCounts.remove(srcAttemptIdentifier);
- if (host != null) {
- hostFailures.remove(host.getHostIdentifier());
- }
-
+
if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
if (output != null) {
+
+ failureCounts.remove(srcAttemptIdentifier);
+ if (host != null) {
+ hostFailures.remove(host.getHostIdentifier());
+ }
+
output.commit();
+ logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed, output,
+ srcAttemptIdentifier);
if (output.getType() == Type.DISK) {
bytesShuffledToDisk.increment(bytesCompressed);
} else if (output.getType() == Type.DISK_DIRECT) {
@@ -217,13 +221,29 @@ class ShuffleScheduler {
// TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
}
+ private void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed,
+ MapOutput output,
+ InputAttemptIdentifier srcAttemptIdentifier) {
+ double rate = 0;
+ if (millis != 0) {
+ rate = bytesCompressed / ((double) millis / 1000);
+ rate = rate / (1024 * 1024);
+ }
+ LOG.info(
+ "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + output.getType() +
+ ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
+ ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+ mbpsFormat.format(rate) + " MB/s");
+ }
+
private void logProgress() {
- float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
- int mapsDone = numInputs - remainingMaps;
+ double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
+ int inputsDone = numInputs - remainingMaps;
long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
- float transferRate = mbs / secsSinceStart;
- LOG.info("copy(" + mapsDone + " of " + numInputs + " at "
+ double transferRate = mbs / secsSinceStart;
+ LOG.info("copy(" + inputsDone + " of " + numInputs +
+ ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
+ mbpsFormat.format(transferRate) + " MB/s)");
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3dc64ace/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index e28dcd1..4e1a06c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.shuffle.common.impl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.text.DecimalFormat;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -86,6 +87,8 @@ public class ShuffleManager implements FetcherCallback {
private final InputContext inputContext;
private final int numInputs;
+ private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
+
private final FetchedInputAllocator inputManager;
private final ListeningExecutorService fetcherExecutor;
@@ -105,6 +108,7 @@ public class ShuffleManager implements FetcherCallback {
private final long startTime;
private long lastProgressTime;
+ private long totalBytesShuffledTillNow;
// Required to be held when manipulating pendingHosts
private final ReentrantLock lock = new ReentrantLock();
@@ -436,9 +440,7 @@ public class ShuffleManager implements FetcherCallback {
public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
throws IOException {
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-
- LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+ InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
// Count irrespective of whether this is a copy of an already fetched input
lock.lock();
@@ -454,7 +456,9 @@ public class ShuffleManager implements FetcherCallback {
if (!completedInputSet.contains(inputIdentifier)) {
fetchedInput.commit();
committed = true;
-
+ logIndividualFetchComplete(copyDuration, fetchedBytes, decompressedLength, fetchedInput,
+ srcAttemptIdentifier);
+
// Processing counters for completed and commit fetches only. Need
// additional counters for excessive fetches - which primarily comes
// in after speculation or retries.
@@ -470,6 +474,13 @@ public class ShuffleManager implements FetcherCallback {
decompressedDataSizeCounter.increment(decompressedLength);
registerCompletedInput(fetchedInput);
+ lock.lock();
+ try {
+ totalBytesShuffledTillNow += fetchedBytes;
+ } finally {
+ lock.unlock();
+ }
+ logProgress();
}
}
}
@@ -649,7 +660,33 @@ public class ShuffleManager implements FetcherCallback {
throw new UnsupportedOperationException("Not supported for NullFetchedInput");
}
}
-
+
+ private void logProgress() {
+ double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
+ int inputsDone = numInputs - numCompletedInputs.get();
+ long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+ double transferRate = mbs / secsSinceStart;
+ LOG.info("copy(" + inputsDone + " of " + numInputs +
+ ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
+ + mbpsFormat.format(transferRate) + " MB/s)");
+ }
+
+ private void logIndividualFetchComplete(long millis, long fetchedBytes, long decompressedLength,
+ FetchedInput fetchedInput,
+ InputAttemptIdentifier srcAttemptIdentifier) {
+ double rate = 0;
+ if (millis != 0) {
+ rate = fetchedBytes / ((double) millis / 1000);
+ rate = rate / (1024 * 1024);
+ }
+
+ LOG.info(
+ "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType() +
+ ", CompressedSize=" + fetchedBytes + ", DecompressedSize=" + decompressedLength +
+ ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+ mbpsFormat.format(rate) + " MB/s");
+ }
private class SchedulerFutureCallback implements FutureCallback<Void> {