You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/21 22:05:10 UTC

git commit: TEZ-948. Log counters at the end of Task execution. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 34159fed9 -> 674fd81a1


TEZ-948. Log counters at the end of Task execution. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/674fd81a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/674fd81a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/674fd81a

Branch: refs/heads/master
Commit: 674fd81a1a16d20b496b1bab42df6be3aa0ef9c2
Parents: 34159fe
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Mar 21 14:04:54 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Mar 21 14:04:54 2014 -0700

----------------------------------------------------------------------
 .../tez/common/counters/AbstractCounters.java   | 20 ++++++++++++
 .../apache/tez/common/counters/TaskCounter.java |  5 +++
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 34 ++++++++++++++++++--
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  1 +
 .../common/shuffle/impl/MergeManager.java       |  8 ++---
 .../library/common/shuffle/impl/Shuffle.java    |  1 +
 .../common/shuffle/impl/ShuffleScheduler.java   | 18 ++++++++---
 .../library/input/ShuffledMergedInput.java      |  2 ++
 .../library/input/ShuffledUnorderedKVInput.java |  2 ++
 .../shuffle/common/impl/ShuffleManager.java     |  5 ++-
 10 files changed, 83 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
index fd4fdee..4244dc2 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
@@ -310,6 +310,26 @@ public abstract class AbstractCounters<C extends TezCounter,
     }
   }
 
+  public synchronized String toShortString() {
+    StringBuilder sb = new StringBuilder("Counters: " + countCounters()).append(" [");
+    for (G group : this) {
+      sb.append("[").append(group.getDisplayName());
+      boolean isFirst = true;
+      for (TezCounter counter : group) {
+        if (isFirst) {
+          sb.append(" ");
+          isFirst = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append(counter.getDisplayName()).append("=").append(counter.getValue());
+      }
+      sb.append("]");
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+
   /**
    * Return textual representation of the counter values.
    * @return the string

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 47107c3..731d4ac 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -59,6 +59,11 @@ public enum TaskCounter {
    * Number of Inputs from which data is copied. Represents physical Inputs. 
    */
   NUM_SHUFFLED_INPUTS,
+  
+  /**
+   * Number of Inputs from which data was not copied - typically due to an empty Input
+   */
+  NUM_SKIPPED_INPUTS,
 
   /**
    * Number of failed copy attempts (physical inputs)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 3e9263c..d64cc7f 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -124,6 +124,11 @@ public class YarnTezDagChild {
    * changes.
    */
   private static Multimap<String, String> startedInputsMap = HashMultimap.create();
+  
+  private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds.
+  private static final float LOG_COUNTER_BACKOFF = 1.3f;
+  private static int taskNonOobHeartbeatCounter = 0;
+  private static int nextHeartbeatNumToLog = 0;
 
   private static Thread startHeartbeatThread() {
     Thread heartbeatThread = new Thread(new Runnable() {
@@ -226,6 +231,22 @@ public class YarnTezDagChild {
       taskLock.readLock().unlock();
     }
 
+    if (LOG.isDebugEnabled()) {
+      taskNonOobHeartbeatCounter++;
+      if (taskNonOobHeartbeatCounter == nextHeartbeatNumToLog) {
+        taskLock.readLock().lock();
+        try {
+          if (currentTask != null) {
+            LOG.debug("Counters: " + currentTask.getCounters().toShortString());
+            taskNonOobHeartbeatCounter = 0;
+            nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
+          }
+        } finally {
+          taskLock.readLock().unlock();
+        }
+      }
+    }
+
     long reqId = requestCounter.incrementAndGet();
     TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
         containerIdStr, taskAttemptID, eventCounter, eventsRange);
@@ -272,7 +293,7 @@ public class YarnTezDagChild {
     }
     return true;
   }
-
+  
   public static void main(String[] args) throws Throwable {
     Thread.setDefaultUncaughtExceptionHandler(
         new YarnUncaughtExceptionHandler());
@@ -447,9 +468,9 @@ public class YarnTezDagChild {
         }
         taskCount++;
 
-        // Reset file system statistics for the new task.
+        // Reset FileSystem statistics
         FileSystem.clearStatistics();
-        
+
         // Re-use the UGI only if the Credentials have not changed.
         if (containerTask.haveCredentialsChanged()) {
           LOG.info("Refreshing UGI since Credentials have changed");
@@ -497,6 +518,11 @@ public class YarnTezDagChild {
 
           currentTask = createLogicalTask(attemptNumber, taskSpec,
               defaultConf, tezUmbilical, serviceConsumerMetadata);
+          
+          taskNonOobHeartbeatCounter = 0;
+          nextHeartbeatNumToLog = (Math.max(1,
+              (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
+                  : (float) amPollInterval))));
         } finally {
           taskLock.writeLock().unlock();
         }
@@ -710,4 +736,6 @@ public class YarnTezDagChild {
     fs.copyToLocalFile(srcPath, dFile);
     return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
   }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 92d6b07..d2d9b62 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -657,6 +657,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   public synchronized void cleanup() {
+    LOG.info("Final Counters : " + tezCounters.toShortString());
     setTaskDone();
     if (eventRouterThread != null) {
       eventRouterThread.interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 8071b36..50695c8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -492,8 +492,7 @@ public class MergeManager {
     public IntermediateMemoryToMemoryMerger(MergeManager manager, 
                                             int mergeFactor) {
       super(manager, mergeFactor, exceptionReporter);
-      setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
-      		    "shuffled map-outputs");
+      setName("MemToMemMerger [" + inputContext.getSourceVertexName() + "]");
       setDaemon(true);
     }
 
@@ -548,8 +547,7 @@ public class MergeManager {
     
     public InMemoryMerger(MergeManager manager) {
       super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName
-      ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
+      setName("MemtoDiskMerger [" + inputContext.getSourceVertexName() + "]");
       setDaemon(true);
     }
     
@@ -648,7 +646,7 @@ public class MergeManager {
     
     public OnDiskMerger(MergeManager manager) {
       super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName("OnDiskMerger - Thread to merge on-disk map-outputs");
+      setName("DiskToDiskMerger [" + inputContext.getSourceVertexName() + "]");
       setDaemon(true);
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 2b940e4..6c4471c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -286,6 +286,7 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
       }
 
       inputContext.inputIsReady();
+      LOG.info("merge complete for input vertex : " + inputContext.getSourceVertexName());
       return kvIter;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 3fbb6b1..d53c6a6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -39,6 +39,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
@@ -79,7 +80,8 @@ class ShuffleScheduler {
     new HashMap<String,IntWritable>();
   private final TezInputContext inputContext;
   private final Shuffle shuffle;
-  private final TezCounter shuffledMapsCounter;
+  private final TezCounter shuffledInputsCounter;
+  private final TezCounter skippedInputCounter;
   private final TezCounter reduceShuffleBytes;
   private final TezCounter reduceBytesDecompressed;
   private final TezCounter failedShuffleCounter;
@@ -103,7 +105,7 @@ class ShuffleScheduler {
                           Configuration conf,
                           int numberOfInputs,
                           Shuffle shuffle,
-                          TezCounter shuffledMapsCounter,
+                          TezCounter shuffledInputsCounter,
                           TezCounter reduceShuffleBytes,
                           TezCounter reduceBytesDecompressed,
                           TezCounter failedShuffleCounter,
@@ -115,7 +117,7 @@ class ShuffleScheduler {
     remainingMaps = numberOfInputs;
     finishedMaps = new boolean[remainingMaps]; // default init to false
     this.shuffle = shuffle;
-    this.shuffledMapsCounter = shuffledMapsCounter;
+    this.shuffledInputsCounter = shuffledInputsCounter;
     this.reduceShuffleBytes = reduceShuffleBytes;
     this.reduceBytesDecompressed = reduceBytesDecompressed;
     this.failedShuffleCounter = failedShuffleCounter;
@@ -139,6 +141,8 @@ class ShuffleScheduler {
             TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
             TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE));
     
+    this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
+    
     LOG.info("ShuffleScheduler running for sourceVertex: "
         + inputContext.getSourceVertexName() + " with configuration: "
         + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
@@ -169,10 +173,16 @@ class ShuffleScheduler {
         } else {
           bytesShuffledToMem.increment(bytesCompressed);
         }
+        shuffledInputsCounter.increment(1);
+      } else {
+        // Output null implies that a physical input completion is being
+        // registered without needing to fetch data
+        skippedInputCounter.increment(1);
       }
       setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
-      shuffledMapsCounter.increment(1);
+      
       if (--remainingMaps == 0) {
+        LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
         notifyAll();
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 30e85e4..c55012f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -82,6 +82,8 @@ public class ShuffledMergedInput implements LogicalInput {
       inputContext.requestInitialMemory(0l, null);
       isStarted.set(true);
       inputContext.inputIsReady();
+      LOG.info("input fetch not required since there are 0 physical inputs for input vertex: "
+          + inputContext.getSourceVertexName());
       return Collections.emptyList();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index c740748..bf0ed11 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -83,6 +83,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
       inputContext.requestInitialMemory(0l, null);
       isStarted.set(true);
       inputContext.inputIsReady();
+      LOG.info("input fetch not required since there are 0 physical inputs for input vertex: "
+          + inputContext.getSourceVertexName());
       return Collections.emptyList();
     } else {
       long initalMemReq = getInitialMemoryReq();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index c04c134..264e628 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -555,7 +555,10 @@ public class ShuffleManager implements FetcherCallback {
         // TODO Should eventually be controlled by Inputs which are processing the data.
         inputContext.inputIsReady();
       }
-      numCompletedInputs.incrementAndGet();
+      int numComplete = numCompletedInputs.incrementAndGet();
+      if (numComplete == numInputs) {
+        LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
+      }
     } finally {
       lock.unlock();
     }