You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/05 22:00:50 UTC
[6/8] tez git commit: TEZ-2023. Refactor logIndividualFetchComplete()
to be common for both shuffle-schedulers (rbalamohan)
TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5cf9105f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5cf9105f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5cf9105f
Branch: refs/heads/TEZ-2003
Commit: 5cf9105fe47bb07aa42f5b3132ba13e81fe205a8
Parents: 7096d8a
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Thu Feb 5 08:25:24 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Thu Feb 5 08:25:24 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../library/common/shuffle/HttpConnection.java | 1 +
.../library/common/shuffle/ShuffleUtils.java | 39 ++++++++++++++++++++
.../common/shuffle/impl/ShuffleManager.java | 19 +---------
.../orderedgrouped/ShuffleScheduler.java | 21 ++---------
5 files changed, 47 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 39d7f81..6a494ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers.
TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
TEZ-1999. IndexOutOfBoundsException during merge.
TEZ-2000. Source vertex exists error during DAG submission.
http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
index 4732a5a..1a5de41 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
@@ -237,6 +237,7 @@ public class HttpConnection {
}
// verify that replyHash is HMac of encHash
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
+ //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
LOG.info("for url=" + url +
" sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 629bab8..af02f9e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
+import java.text.DecimalFormat;
import java.util.List;
import javax.crypto.SecretKey;
@@ -50,6 +51,14 @@ public class ShuffleUtils {
private static final Log LOG = LogFactory.getLog(ShuffleUtils.class);
public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
+ static final ThreadLocal<DecimalFormat> MBPS_FORMAT =
+ new ThreadLocal<DecimalFormat>() {
+ @Override
+ protected DecimalFormat initialValue() {
+ return new DecimalFormat("0.00");
+ }
+ };
+
public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
throws IOException {
DataInputByteBuffer in = new DataInputByteBuffer();
@@ -233,5 +242,35 @@ public class ShuffleUtils {
sb.append("]");
return sb.toString();
}
+
+ /**
+ * Log individual fetch complete event.
+ * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
+ * - amount of data transferred between source to destination machine
+ * - time taken to transfer data between source to destination machine
+ * - details on DISK/DISK_DIRECT/MEMORY based shuffles
+ *
+ * @param log
+ * @param millis
+ * @param bytesCompressed
+ * @param bytesDecompressed
+ * @param outputType
+ * @param srcAttemptIdentifier
+ */
+ public static void logIndividualFetchComplete(Log log, long millis, long
+ bytesCompressed,
+ long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
+ double rate = 0;
+ if (millis != 0) {
+ rate = bytesCompressed / ((double) millis / 1000);
+ rate = rate / (1024 * 1024);
+ }
+ log.info(
+ "Completed fetch for attempt: "
+ + srcAttemptIdentifier + " to " + outputType +
+ ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
+ ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+ MBPS_FORMAT.get().format(rate) + " MB/s");
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 13296c7..3dc8156 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -519,8 +519,8 @@ public class ShuffleManager implements FetcherCallback {
if (!completedInputSet.contains(inputIdentifier)) {
fetchedInput.commit();
committed = true;
- logIndividualFetchComplete(copyDuration, fetchedBytes, decompressedLength, fetchedInput,
- srcAttemptIdentifier);
+ ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
+ fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
// Processing counters for completed and commit fetches only. Need
// additional counters for excessive fetches - which primarily comes
@@ -731,22 +731,7 @@ public class ShuffleManager implements FetcherCallback {
+ mbpsFormat.format(transferRate) + " MB/s)");
}
- private void logIndividualFetchComplete(long millis, long fetchedBytes, long decompressedLength,
- FetchedInput fetchedInput,
- InputAttemptIdentifier srcAttemptIdentifier) {
- double rate = 0;
- if (millis != 0) {
- rate = fetchedBytes / ((double) millis / 1000);
- rate = rate / (1024 * 1024);
- }
- LOG.info(
- "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType() +
- ", CompressedSize=" + fetchedBytes + ", DecompressedSize=" + decompressedLength +
- ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
- mbpsFormat.format(rate) + " MB/s");
- }
-
private class SchedulerFutureCallback implements FutureCallback<Void> {
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 066b94a..57e904b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -47,6 +47,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
import com.google.common.collect.Lists;
@@ -103,7 +104,7 @@ class ShuffleScheduler {
private long totalBytesShuffledTillNow = 0;
private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
-
+
public ShuffleScheduler(InputContext inputContext,
Configuration conf,
int numberOfInputs,
@@ -187,8 +188,8 @@ class ShuffleScheduler {
}
output.commit();
- logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed, output,
- srcAttemptIdentifier);
+ ShuffleUtils.logIndividualFetchComplete(LOG, millis, bytesCompressed,
+ bytesDecompressed, output.getType().toString(), srcAttemptIdentifier);
if (output.getType() == Type.DISK) {
bytesShuffledToDisk.increment(bytesCompressed);
} else if (output.getType() == Type.DISK_DIRECT) {
@@ -234,20 +235,6 @@ class ShuffleScheduler {
// TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
}
- private void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed,
- MapOutput output,
- InputAttemptIdentifier srcAttemptIdentifier) {
- double rate = 0;
- if (millis != 0) {
- rate = bytesCompressed / ((double) millis / 1000);
- rate = rate / (1024 * 1024);
- }
- LOG.info(
- "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + output.getType() +
- ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
- ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
- mbpsFormat.format(rate) + " MB/s");
- }
private void logProgress() {
double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);