You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/05 22:00:50 UTC

[6/8] tez git commit: TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers (rbalamohan)

TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5cf9105f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5cf9105f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5cf9105f

Branch: refs/heads/TEZ-2003
Commit: 5cf9105fe47bb07aa42f5b3132ba13e81fe205a8
Parents: 7096d8a
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Thu Feb 5 08:25:24 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Thu Feb 5 08:25:24 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../library/common/shuffle/HttpConnection.java  |  1 +
 .../library/common/shuffle/ShuffleUtils.java    | 39 ++++++++++++++++++++
 .../common/shuffle/impl/ShuffleManager.java     | 19 +---------
 .../orderedgrouped/ShuffleScheduler.java        | 21 ++---------
 5 files changed, 47 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 39d7f81..6a494ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers.
   TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
   TEZ-1999. IndexOutOfBoundsException during merge.
   TEZ-2000. Source vertex exists error during DAG submission.

http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
index 4732a5a..1a5de41 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
@@ -237,6 +237,7 @@ public class HttpConnection {
     }
     // verify that replyHash is HMac of encHash
     SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
+    //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
     LOG.info("for url=" + url +
       " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 629bab8..af02f9e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.ByteBuffer;
+import java.text.DecimalFormat;
 import java.util.List;
 
 import javax.crypto.SecretKey;
@@ -50,6 +51,14 @@ public class ShuffleUtils {
   private static final Log LOG = LogFactory.getLog(ShuffleUtils.class);
   public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
 
+  static final ThreadLocal<DecimalFormat> MBPS_FORMAT =
+      new ThreadLocal<DecimalFormat>() {
+        @Override
+        protected DecimalFormat initialValue() {
+          return new DecimalFormat("0.00");
+        }
+      };
+
   public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
       throws IOException {
     DataInputByteBuffer in = new DataInputByteBuffer();
@@ -233,5 +242,35 @@ public class ShuffleUtils {
     sb.append("]");
     return sb.toString();
   }
+
+  /**
+   * Log individual fetch complete event.
+   * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
+   * - amount of data transferred between source to destination machine
+   * - time taken to transfer data between source to destination machine
+   * - details on DISK/DISK_DIRECT/MEMORY based shuffles
+   *
+   * @param log
+   * @param millis
+   * @param bytesCompressed
+   * @param bytesDecompressed
+   * @param outputType
+   * @param srcAttemptIdentifier
+   */
+  public static void logIndividualFetchComplete(Log log, long millis, long
+      bytesCompressed,
+      long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
+    double rate = 0;
+    if (millis != 0) {
+      rate = bytesCompressed / ((double) millis / 1000);
+      rate = rate / (1024 * 1024);
+    }
+    log.info(
+        "Completed fetch for attempt: "
+            + srcAttemptIdentifier + " to " + outputType +
+            ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
+            ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+            MBPS_FORMAT.get().format(rate) + " MB/s");
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 13296c7..3dc8156 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -519,8 +519,8 @@ public class ShuffleManager implements FetcherCallback {
         if (!completedInputSet.contains(inputIdentifier)) {
           fetchedInput.commit();
           committed = true;
-          logIndividualFetchComplete(copyDuration, fetchedBytes, decompressedLength, fetchedInput,
-              srcAttemptIdentifier);
+          ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
+              fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
 
           // Processing counters for completed and commit fetches only. Need
           // additional counters for excessive fetches - which primarily comes
@@ -731,22 +731,7 @@ public class ShuffleManager implements FetcherCallback {
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
 
-  private void logIndividualFetchComplete(long millis, long fetchedBytes, long decompressedLength,
-                                          FetchedInput fetchedInput,
-                                          InputAttemptIdentifier srcAttemptIdentifier) {
-    double rate = 0;
-    if (millis != 0) {
-      rate = fetchedBytes / ((double) millis / 1000);
-      rate = rate / (1024 * 1024);
-    }
 
-    LOG.info(
-        "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType() +
-            ", CompressedSize=" + fetchedBytes + ", DecompressedSize=" + decompressedLength +
-            ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
-            mbpsFormat.format(rate) + " MB/s");
-  }
-  
   private class SchedulerFutureCallback implements FutureCallback<Void> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/5cf9105f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 066b94a..57e904b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -47,6 +47,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
 
 import com.google.common.collect.Lists;
@@ -103,7 +104,7 @@ class ShuffleScheduler {
 
   private long totalBytesShuffledTillNow = 0;
   private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
-  
+
   public ShuffleScheduler(InputContext inputContext,
                           Configuration conf,
                           int numberOfInputs,
@@ -187,8 +188,8 @@ class ShuffleScheduler {
         }
 
         output.commit();
-        logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed, output,
-            srcAttemptIdentifier);
+        ShuffleUtils.logIndividualFetchComplete(LOG, millis, bytesCompressed,
+            bytesDecompressed, output.getType().toString(), srcAttemptIdentifier);
         if (output.getType() == Type.DISK) {
           bytesShuffledToDisk.increment(bytesCompressed);
         } else if (output.getType() == Type.DISK_DIRECT) {
@@ -234,20 +235,6 @@ class ShuffleScheduler {
     // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
   }
 
-  private void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed,
-                                          MapOutput output,
-                                          InputAttemptIdentifier srcAttemptIdentifier) {
-    double rate = 0;
-    if (millis != 0) {
-      rate = bytesCompressed / ((double) millis / 1000);
-      rate = rate / (1024 * 1024);
-    }
-    LOG.info(
-        "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + output.getType() +
-            ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
-            ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
-            mbpsFormat.format(rate) + " MB/s");
-  }
 
   private void logProgress() {
     double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);