You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/21 22:05:10 UTC
git commit: TEZ-948. Log counters at the end of Task execution.
(sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 34159fed9 -> 674fd81a1
TEZ-948. Log counters at the end of Task execution. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/674fd81a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/674fd81a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/674fd81a
Branch: refs/heads/master
Commit: 674fd81a1a16d20b496b1bab42df6be3aa0ef9c2
Parents: 34159fe
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Mar 21 14:04:54 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Mar 21 14:04:54 2014 -0700
----------------------------------------------------------------------
.../tez/common/counters/AbstractCounters.java | 20 ++++++++++++
.../apache/tez/common/counters/TaskCounter.java | 5 +++
.../apache/hadoop/mapred/YarnTezDagChild.java | 34 ++++++++++++++++++--
.../runtime/LogicalIOProcessorRuntimeTask.java | 1 +
.../common/shuffle/impl/MergeManager.java | 8 ++---
.../library/common/shuffle/impl/Shuffle.java | 1 +
.../common/shuffle/impl/ShuffleScheduler.java | 18 ++++++++---
.../library/input/ShuffledMergedInput.java | 2 ++
.../library/input/ShuffledUnorderedKVInput.java | 2 ++
.../shuffle/common/impl/ShuffleManager.java | 5 ++-
10 files changed, 83 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
index fd4fdee..4244dc2 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
@@ -310,6 +310,26 @@ public abstract class AbstractCounters<C extends TezCounter,
}
}
+ public synchronized String toShortString() {
+ StringBuilder sb = new StringBuilder("Counters: " + countCounters()).append(" [");
+ for (G group : this) {
+ sb.append("[").append(group.getDisplayName());
+ boolean isFirst = true;
+ for (TezCounter counter : group) {
+ if (isFirst) {
+ sb.append(" ");
+ isFirst = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(counter.getDisplayName()).append("=").append(counter.getValue());
+ }
+ sb.append("]");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
/**
* Return textual representation of the counter values.
* @return the string
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 47107c3..731d4ac 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -59,6 +59,11 @@ public enum TaskCounter {
* Number of Inputs from which data is copied. Represents physical Inputs.
*/
NUM_SHUFFLED_INPUTS,
+
+ /**
+ * Number of Inputs from which data was not copied - typically due to an empty Input
+ */
+ NUM_SKIPPED_INPUTS,
/**
* Number of failed copy attempts (physical inputs)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 3e9263c..d64cc7f 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -124,6 +124,11 @@ public class YarnTezDagChild {
* changes.
*/
private static Multimap<String, String> startedInputsMap = HashMultimap.create();
+
+ private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds.
+ private static final float LOG_COUNTER_BACKOFF = 1.3f;
+ private static int taskNonOobHeartbeatCounter = 0;
+ private static int nextHeartbeatNumToLog = 0;
private static Thread startHeartbeatThread() {
Thread heartbeatThread = new Thread(new Runnable() {
@@ -226,6 +231,22 @@ public class YarnTezDagChild {
taskLock.readLock().unlock();
}
+ if (LOG.isDebugEnabled()) {
+ taskNonOobHeartbeatCounter++;
+ if (taskNonOobHeartbeatCounter == nextHeartbeatNumToLog) {
+ taskLock.readLock().lock();
+ try {
+ if (currentTask != null) {
+ LOG.debug("Counters: " + currentTask.getCounters().toShortString());
+ taskNonOobHeartbeatCounter = 0;
+ nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
+ }
+ } finally {
+ taskLock.readLock().unlock();
+ }
+ }
+ }
+
long reqId = requestCounter.incrementAndGet();
TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
containerIdStr, taskAttemptID, eventCounter, eventsRange);
@@ -272,7 +293,7 @@ public class YarnTezDagChild {
}
return true;
}
-
+
public static void main(String[] args) throws Throwable {
Thread.setDefaultUncaughtExceptionHandler(
new YarnUncaughtExceptionHandler());
@@ -447,9 +468,9 @@ public class YarnTezDagChild {
}
taskCount++;
- // Reset file system statistics for the new task.
+ // Reset FileSystem statistics
FileSystem.clearStatistics();
-
+
// Re-use the UGI only if the Credentials have not changed.
if (containerTask.haveCredentialsChanged()) {
LOG.info("Refreshing UGI since Credentials have changed");
@@ -497,6 +518,11 @@ public class YarnTezDagChild {
currentTask = createLogicalTask(attemptNumber, taskSpec,
defaultConf, tezUmbilical, serviceConsumerMetadata);
+
+ taskNonOobHeartbeatCounter = 0;
+ nextHeartbeatNumToLog = (Math.max(1,
+ (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
+ : (float) amPollInterval))));
} finally {
taskLock.writeLock().unlock();
}
@@ -710,4 +736,6 @@ public class YarnTezDagChild {
fs.copyToLocalFile(srcPath, dFile);
return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 92d6b07..d2d9b62 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -657,6 +657,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
public synchronized void cleanup() {
+ LOG.info("Final Counters : " + tezCounters.toShortString());
setTaskDone();
if (eventRouterThread != null) {
eventRouterThread.interrupt();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 8071b36..50695c8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -492,8 +492,7 @@ public class MergeManager {
public IntermediateMemoryToMemoryMerger(MergeManager manager,
int mergeFactor) {
super(manager, mergeFactor, exceptionReporter);
- setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
- "shuffled map-outputs");
+ setName("MemToMemMerger [" + inputContext.getSourceVertexName() + "]");
setDaemon(true);
}
@@ -548,8 +547,7 @@ public class MergeManager {
public InMemoryMerger(MergeManager manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter);
- setName
- ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
+ setName("MemtoDiskMerger [" + inputContext.getSourceVertexName() + "]");
setDaemon(true);
}
@@ -648,7 +646,7 @@ public class MergeManager {
public OnDiskMerger(MergeManager manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter);
- setName("OnDiskMerger - Thread to merge on-disk map-outputs");
+ setName("DiskToDiskMerger [" + inputContext.getSourceVertexName() + "]");
setDaemon(true);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 2b940e4..6c4471c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -286,6 +286,7 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
}
inputContext.inputIsReady();
+ LOG.info("merge complete for input vertex : " + inputContext.getSourceVertexName());
return kvIter;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 3fbb6b1..d53c6a6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -39,6 +39,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
@@ -79,7 +80,8 @@ class ShuffleScheduler {
new HashMap<String,IntWritable>();
private final TezInputContext inputContext;
private final Shuffle shuffle;
- private final TezCounter shuffledMapsCounter;
+ private final TezCounter shuffledInputsCounter;
+ private final TezCounter skippedInputCounter;
private final TezCounter reduceShuffleBytes;
private final TezCounter reduceBytesDecompressed;
private final TezCounter failedShuffleCounter;
@@ -103,7 +105,7 @@ class ShuffleScheduler {
Configuration conf,
int numberOfInputs,
Shuffle shuffle,
- TezCounter shuffledMapsCounter,
+ TezCounter shuffledInputsCounter,
TezCounter reduceShuffleBytes,
TezCounter reduceBytesDecompressed,
TezCounter failedShuffleCounter,
@@ -115,7 +117,7 @@ class ShuffleScheduler {
remainingMaps = numberOfInputs;
finishedMaps = new boolean[remainingMaps]; // default init to false
this.shuffle = shuffle;
- this.shuffledMapsCounter = shuffledMapsCounter;
+ this.shuffledInputsCounter = shuffledInputsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.reduceBytesDecompressed = reduceBytesDecompressed;
this.failedShuffleCounter = failedShuffleCounter;
@@ -139,6 +141,8 @@ class ShuffleScheduler {
TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE));
+ this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
+
LOG.info("ShuffleScheduler running for sourceVertex: "
+ inputContext.getSourceVertexName() + " with configuration: "
+ "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
@@ -169,10 +173,16 @@ class ShuffleScheduler {
} else {
bytesShuffledToMem.increment(bytesCompressed);
}
+ shuffledInputsCounter.increment(1);
+ } else {
+ // Output null implies that a physical input completion is being
+ // registered without needing to fetch data
+ skippedInputCounter.increment(1);
}
setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
- shuffledMapsCounter.increment(1);
+
if (--remainingMaps == 0) {
+ LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
notifyAll();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 30e85e4..c55012f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -82,6 +82,8 @@ public class ShuffledMergedInput implements LogicalInput {
inputContext.requestInitialMemory(0l, null);
isStarted.set(true);
inputContext.inputIsReady();
+ LOG.info("input fetch not required since there are 0 physical inputs for input vertex: "
+ + inputContext.getSourceVertexName());
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index c740748..bf0ed11 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -83,6 +83,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
inputContext.requestInitialMemory(0l, null);
isStarted.set(true);
inputContext.inputIsReady();
+ LOG.info("input fetch not required since there are 0 physical inputs for input vertex: "
+ + inputContext.getSourceVertexName());
return Collections.emptyList();
} else {
long initalMemReq = getInitialMemoryReq();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/674fd81a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index c04c134..264e628 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -555,7 +555,10 @@ public class ShuffleManager implements FetcherCallback {
// TODO Should eventually be controlled by Inputs which are processing the data.
inputContext.inputIsReady();
}
- numCompletedInputs.incrementAndGet();
+ int numComplete = numCompletedInputs.incrementAndGet();
+ if (numComplete == numInputs) {
+ LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
+ }
} finally {
lock.unlock();
}