You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/08/29 18:31:49 UTC

git commit: TEZ-1516. Log transfer rates for broadcast fetch. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master 7f70f9ddd -> 3dc64aced


TEZ-1516. Log transfer rates for broadcast fetch. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3dc64ace
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3dc64ace
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3dc64ace

Branch: refs/heads/master
Commit: 3dc64aced6e5c806e55fd65faf95315552037f7f
Parents: 7f70f9d
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Aug 29 09:31:17 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 29 09:31:17 2014 -0700

----------------------------------------------------------------------
 .../common/shuffle/impl/ShuffleScheduler.java   | 40 ++++++++++++-----
 .../shuffle/common/impl/ShuffleManager.java     | 47 +++++++++++++++++---
 2 files changed, 72 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3dc64ace/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 6cda4e2..6874afb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -161,17 +161,21 @@ class ShuffleScheduler {
                                          MapHost host,
                                          long bytesCompressed,
                                          long bytesDecompressed,
-                                         long milis,
+                                         long millis,
                                          MapOutput output
                                          ) throws IOException {
-    failureCounts.remove(srcAttemptIdentifier);
-    if (host != null) {
-      hostFailures.remove(host.getHostIdentifier());
-    }
-    
+
     if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
       if (output != null) {
+
+        failureCounts.remove(srcAttemptIdentifier);
+        if (host != null) {
+          hostFailures.remove(host.getHostIdentifier());
+        }
+
         output.commit();
+        logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed, output,
+            srcAttemptIdentifier);
         if (output.getType() == Type.DISK) {
           bytesShuffledToDisk.increment(bytesCompressed);
         } else if (output.getType() == Type.DISK_DIRECT) {
@@ -217,13 +221,29 @@ class ShuffleScheduler {
     // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
   }
 
+  private void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed,
+                                          MapOutput output,
+                                          InputAttemptIdentifier srcAttemptIdentifier) {
+    double rate = 0;
+    if (millis != 0) {
+      rate = bytesCompressed / ((double) millis / 1000);
+      rate = rate / (1024 * 1024);
+    }
+    LOG.info(
+        "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + output.getType() +
+            ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
+            ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+            mbpsFormat.format(rate) + " MB/s");
+  }
+
   private void logProgress() {
-    float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
-    int mapsDone = numInputs - remainingMaps;
+    double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
+    int inputsDone = numInputs - remainingMaps;
     long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
 
-    float transferRate = mbs / secsSinceStart;
-    LOG.info("copy(" + mapsDone + " of " + numInputs + " at "
+    double transferRate = mbs / secsSinceStart;
+    LOG.info("copy(" + inputsDone + " of " + numInputs +
+        ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3dc64ace/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index e28dcd1..4e1a06c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.shuffle.common.impl;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.text.DecimalFormat;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -86,6 +87,8 @@ public class ShuffleManager implements FetcherCallback {
   private final InputContext inputContext;
   private final int numInputs;
 
+  private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
+
   private final FetchedInputAllocator inputManager;
 
   private final ListeningExecutorService fetcherExecutor;
@@ -105,6 +108,7 @@ public class ShuffleManager implements FetcherCallback {
   
   private final long startTime;
   private long lastProgressTime;
+  private long totalBytesShuffledTillNow;
 
   // Required to be held when manipulating pendingHosts
   private final ReentrantLock lock = new ReentrantLock();
@@ -436,9 +440,7 @@ public class ShuffleManager implements FetcherCallback {
   public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
       FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
       throws IOException {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();    
-
-    LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
 
     // Count irrespective of whether this is a copy of an already fetched input
     lock.lock();
@@ -454,7 +456,9 @@ public class ShuffleManager implements FetcherCallback {
         if (!completedInputSet.contains(inputIdentifier)) {
           fetchedInput.commit();
           committed = true;
-          
+          logIndividualFetchComplete(copyDuration, fetchedBytes, decompressedLength, fetchedInput,
+              srcAttemptIdentifier);
+
           // Processing counters for completed and commit fetches only. Need
           // additional counters for excessive fetches - which primarily comes
           // in after speculation or retries.
@@ -470,6 +474,13 @@ public class ShuffleManager implements FetcherCallback {
           decompressedDataSizeCounter.increment(decompressedLength);
 
           registerCompletedInput(fetchedInput);
+          lock.lock();
+          try {
+            totalBytesShuffledTillNow += fetchedBytes;
+          } finally {
+            lock.unlock();
+          }
+          logProgress();
         }
       }
     }
@@ -649,7 +660,33 @@ public class ShuffleManager implements FetcherCallback {
       throw new UnsupportedOperationException("Not supported for NullFetchedInput");
     }
   }
-  
+
+  private void logProgress() {
+    double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
+    int inputsDone = numInputs - numCompletedInputs.get();
+    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+    double transferRate = mbs / secsSinceStart;
+    LOG.info("copy(" + inputsDone + " of " + numInputs +
+        ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
+        + mbpsFormat.format(transferRate) + " MB/s)");
+  }
+
+  private void logIndividualFetchComplete(long millis, long fetchedBytes, long decompressedLength,
+                                          FetchedInput fetchedInput,
+                                          InputAttemptIdentifier srcAttemptIdentifier) {
+    double rate = 0;
+    if (millis != 0) {
+      rate = fetchedBytes / ((double) millis / 1000);
+      rate = rate / (1024 * 1024);
+    }
+
+    LOG.info(
+        "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType() +
+            ", CompressedSize=" + fetchedBytes + ", DecompressedSize=" + decompressedLength +
+            ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+            mbpsFormat.format(rate) + " MB/s");
+  }
   
   private class SchedulerFutureCallback implements FutureCallback<Void> {