You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/12/02 19:51:35 UTC

tez git commit: TEZ-1787. Counters for speculation (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 3b7c7312c -> 9e10348f6


TEZ-1787. Counters for speculation (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9e10348f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9e10348f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9e10348f

Branch: refs/heads/master
Commit: 9e10348f6edb77318bf4cb41f60b1546f4941483
Parents: 3b7c731
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Dec 2 10:51:26 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Dec 2 10:51:26 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/common/counters/TaskCounter.java |  2 ++
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 13 +++++-----
 .../org/apache/tez/dag/app/TestSpeculation.java | 26 +++++++++++++++++---
 4 files changed, 33 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9e10348f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ea8a92..d023848 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1787. Counters for speculation
   TEZ-1773. Add attempt failure cause enum to the attempt failed/killed
   history record
   TEZ-14. Support MR like speculation capabilities based on latency deviation

http://git-wip-us.apache.org/repos/asf/tez/blob/9e10348f/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 22d7f59..94cae5f 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 @Private
 public enum TaskCounter {
   // TODO Eventually, rename counters to be non-MR specific and map them to MR equivalent.
+  
+  NUM_SPECULATIONS,
 
   /**
    * Number of Input Groups seen by ShuffledMergedInput.

http://git-wip-us.apache.org/repos/asf/tez/blob/9e10348f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index b20fa13..a4c4dee 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -119,6 +120,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private final Lock readLock;
   private final Lock writeLock;
   private final List<String> diagnostics = new ArrayList<String>();
+  private TezCounters counters = new TezCounters();
   // TODO Metrics
   //private final MRAppMetrics metrics;
   protected final AppContext appContext;
@@ -417,15 +419,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   @Override
   public TezCounters getCounters() {
-    TezCounters counters = null;
+    TezCounters counters = new TezCounters();
+    counters.incrAllCounters(this.counters);
     readLock.lock();
     try {
       TaskAttempt bestAttempt = selectBestAttempt();
       if (bestAttempt != null) {
-        counters = bestAttempt.getCounters();
-      } else {
-        counters = TaskAttemptImpl.EMPTY_COUNTERS;
-//        counters.groups = new HashMap<CharSequence, CounterGroup>();
+        counters.incrAllCounters(bestAttempt.getCounters());
       }
       return counters;
     } finally {
@@ -890,7 +890,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     task.commitAttempt = null;
     task.successfulAttempt = null;
   }
-
+  
   /**
   * @return a String representation of the splits.
   *
@@ -989,6 +989,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
       LOG.info("Scheduling a redundant attempt for task " + task.taskId);
+      task.counters.findCounter(TaskCounter.NUM_SPECULATIONS).increment(1);
       task.addAndScheduleAttempt();
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/9e10348f/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index 38eb934..c349957 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -116,7 +118,18 @@ public class TestSpeculation {
     Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
     TaskAttempt killedAttempt = task.getAttempt(killedTaId);
     Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
-    Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, killedAttempt.getTerminationCause());
+    Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, 
+        killedAttempt.getTerminationCause());
+    if (withProgress) {
+      // without progress updates occasionally more than 1 task specualates
+      Assert.assertEquals(1, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+          .getValue());
+      Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+          .getValue());
+      org.apache.tez.dag.app.dag.Vertex v = dagImpl.getVertex(killedTaId.getTaskID().getVertexID());
+      Assert.assertEquals(1, v.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+          .getValue());
+    }
     tezClient.stop();
   }
   
@@ -156,9 +169,16 @@ public class TestSpeculation {
     Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
     TaskAttempt killedAttempt = task.getAttempt(killedTaId);
     Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed speculative attempt as");
-    Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION, killedAttempt.getTerminationCause());
+    Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION, 
+        killedAttempt.getTerminationCause());
+    Assert.assertEquals(1, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+        .getValue());
+    Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+        .getValue());
+    org.apache.tez.dag.app.dag.Vertex v = dagImpl.getVertex(killedTaId.getTaskID().getVertexID());
+    Assert.assertEquals(1, v.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+        .getValue());
     tezClient.stop();
   }
 
-
 }