You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/12/02 19:51:35 UTC
tez git commit: TEZ-1787. Counters for speculation (bikas)
Repository: tez
Updated Branches:
refs/heads/master 3b7c7312c -> 9e10348f6
TEZ-1787. Counters for speculation (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9e10348f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9e10348f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9e10348f
Branch: refs/heads/master
Commit: 9e10348f6edb77318bf4cb41f60b1546f4941483
Parents: 3b7c731
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Dec 2 10:51:26 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Dec 2 10:51:26 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/common/counters/TaskCounter.java | 2 ++
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 13 +++++-----
.../org/apache/tez/dag/app/TestSpeculation.java | 26 +++++++++++++++++---
4 files changed, 33 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9e10348f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ea8a92..d023848 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1787. Counters for speculation
TEZ-1773. Add attempt failure cause enum to the attempt failed/killed
history record
TEZ-14. Support MR like speculation capabilities based on latency deviation
http://git-wip-us.apache.org/repos/asf/tez/blob/9e10348f/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 22d7f59..94cae5f 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
@Private
public enum TaskCounter {
// TODO Eventually, rename counters to be non-MR specific and map them to MR equivalent.
+
+ NUM_SPECULATIONS,
/**
* Number of Input Groups seen by ShuffledMergedInput.
http://git-wip-us.apache.org/repos/asf/tez/blob/9e10348f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index b20fa13..a4c4dee 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -119,6 +120,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private final Lock readLock;
private final Lock writeLock;
private final List<String> diagnostics = new ArrayList<String>();
+ private TezCounters counters = new TezCounters();
// TODO Metrics
//private final MRAppMetrics metrics;
protected final AppContext appContext;
@@ -417,15 +419,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public TezCounters getCounters() {
- TezCounters counters = null;
+ TezCounters counters = new TezCounters();
+ counters.incrAllCounters(this.counters);
readLock.lock();
try {
TaskAttempt bestAttempt = selectBestAttempt();
if (bestAttempt != null) {
- counters = bestAttempt.getCounters();
- } else {
- counters = TaskAttemptImpl.EMPTY_COUNTERS;
-// counters.groups = new HashMap<CharSequence, CounterGroup>();
+ counters.incrAllCounters(bestAttempt.getCounters());
}
return counters;
} finally {
@@ -890,7 +890,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
task.commitAttempt = null;
task.successfulAttempt = null;
}
-
+
/**
* @return a String representation of the splits.
*
@@ -989,6 +989,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
LOG.info("Scheduling a redundant attempt for task " + task.taskId);
+ task.counters.findCounter(TaskCounter.NUM_SPECULATIONS).increment(1);
task.addAndScheduleAttempt();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9e10348f/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index 38eb934..c349957 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
@@ -116,7 +118,18 @@ public class TestSpeculation {
Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
TaskAttempt killedAttempt = task.getAttempt(killedTaId);
Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
- Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, killedAttempt.getTerminationCause());
+ Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION,
+ killedAttempt.getTerminationCause());
+ if (withProgress) {
+ // without progress updates occasionally more than 1 task specualates
+ Assert.assertEquals(1, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+ .getValue());
+ Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+ .getValue());
+ org.apache.tez.dag.app.dag.Vertex v = dagImpl.getVertex(killedTaId.getTaskID().getVertexID());
+ Assert.assertEquals(1, v.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+ .getValue());
+ }
tezClient.stop();
}
@@ -156,9 +169,16 @@ public class TestSpeculation {
Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
TaskAttempt killedAttempt = task.getAttempt(killedTaId);
Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed speculative attempt as");
- Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION, killedAttempt.getTerminationCause());
+ Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION,
+ killedAttempt.getTerminationCause());
+ Assert.assertEquals(1, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+ .getValue());
+ Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+ .getValue());
+ org.apache.tez.dag.app.dag.Vertex v = dagImpl.getVertex(killedTaId.getTaskID().getVertexID());
+ Assert.assertEquals(1, v.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+ .getValue());
tezClient.stop();
}
-
}