You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:46 UTC

[44/50] [abbrv] tez git commit: TEZ-2347. Expose additional information in TaskCommunicatorContext. (sseth)

TEZ-2347. Expose additional information in TaskCommunicatorContext. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/960d92bf
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/960d92bf
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/960d92bf

Branch: refs/heads/TEZ-2003
Commit: 960d92bfe8fbd440f273f2920be25919e62a93e4
Parents: 3e03847
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Apr 20 13:17:31 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:18:08 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../tez/dag/api/TaskCommunicatorContext.java    | 50 ++++++++++++++++++++
 .../dag/app/TaskCommunicatorContextImpl.java    | 50 ++++++++++++++++++++
 .../java/org/apache/tez/dag/app/dag/DAG.java    |  2 +
 .../java/org/apache/tez/dag/app/dag/Task.java   |  1 +
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |  6 +++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 10 ++++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 12 +++++
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 13 ++++-
 9 files changed, 144 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index ca5225e..7c13110 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -16,5 +16,6 @@ ALL CHANGES:
   TEZ-2284. Separate TaskReporter into an interface.
   TEZ-2285. Allow TaskCommunicators to indicate task/container liveness.
   TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates.
+  TEZ-2347. Expose additional information in TaskCommunicatorContext.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 19caed9..56345ab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -16,6 +16,7 @@ package org.apache.tez.dag.api;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Set;
 
 import org.apache.hadoop.security.Credentials;
@@ -71,4 +72,53 @@ public interface TaskCommunicatorContext {
   // TODO TEZ-2003 API. Should a method exist for task succeeded.
 
   // TODO Eventually Add methods to report availability stats to the scheduler.
+
+  /**
+   * Get the name of the currently executing dag
+   * @return the name of the currently executing dag
+   */
+  String getCurretnDagName();
+
+  /**
+   * Get the name of the Input vertices for the specified vertex.
+   * Root Inputs are not returned.
+   * @param vertexName the vertex for which source vertex names will be returned
+   * @return an Iterable containing the list of input vertices for the specified vertex
+   */
+  Iterable<String> getInputVertexNames(String vertexName);
+
+  /**
+   * Get the total number of tasks in the given vertex
+   * @param vertexName
+   * @return total number of tasks in this vertex
+   */
+  int getVertexTotalTaskCount(String vertexName);
+
+  /**
+   * Get the number of completed tasks for a given vertex
+   * @param vertexName the vertex name
+   * @return the number of completed tasks for the vertex
+   */
+  int getVertexCompletedTaskCount(String vertexName);
+
+  /**
+   * Get the number of running tasks for a given vertex
+   * @param vertexName the vertex name
+   * @return the number of running tasks for the vertex
+   */
+  int getVertexRunningTaskCount(String vertexName);
+
+  /**
+   * Get the start time for the first attempt of the specified task
+   * @param vertexName the vertex to which the task belongs
+   * @param taskIndex the index of the task
+   * @return the start time for the first attempt of the task
+   */
+  long getFirstAttemptStartTime(String vertexName, int taskIndex);
+
+  /**
+   * Get the start time for the currently executing DAG
+   * @return time when the current dag started executing
+   */
+  long getDagStartTime();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 3714c3c..4cb0c93 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -18,7 +18,9 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Set;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -31,6 +33,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
@@ -111,6 +114,53 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
     context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this);
   }
 
+  @Override
+  public String getCurretnDagName() {
+    return context.getCurrentDAG().getName();
+  }
+
+  @Override
+  public Iterable<String> getInputVertexNames(String vertexName) {
+    Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
+    Vertex vertex = context.getCurrentDAG().getVertex(vertexName);
+    Set<Vertex> sources = vertex.getInputVertices().keySet();
+    return Iterables.transform(sources, new Function<Vertex, String>() {
+      @Override
+      public String apply(@Nullable Vertex input) {
+        return input.getName();
+      }
+    });
+  }
+
+  @Override
+  public int getVertexTotalTaskCount(String vertexName) {
+    Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
+    return context.getCurrentDAG().getVertex(vertexName).getTotalTasks();
+  }
+
+  @Override
+  public int getVertexCompletedTaskCount(String vertexName) {
+    Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
+    return context.getCurrentDAG().getVertex(vertexName).getCompletedTasks();
+  }
+
+  @Override
+  public int getVertexRunningTaskCount(String vertexName) {
+    Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
+    return context.getCurrentDAG().getVertex(vertexName).getRunningTasks();
+  }
+
+  @Override
+  public long getFirstAttemptStartTime(String vertexName, int taskIndex) {
+    Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
+    Preconditions.checkArgument(taskIndex >=0, "TaskIndex must be > 0");
+    return context.getCurrentDAG().getVertex(vertexName).getTask(taskIndex).getFirstAttemptStartTime();
+  }
+
+  @Override
+  public long getDagStartTime() {
+    return context.getCurrentDAG().getStartTime();
+  }
 
   @Override
   public void onStateUpdated(VertexStateUpdate event) {

http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 6d6872b..458362f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -94,6 +94,8 @@ public interface DAG {
 
   Map<String, TezVertexID> getVertexNameIDMapping();
 
+  long getStartTime();
+
   StateChangeNotifier getStateChangeNotifier();
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index b798fce..3af14b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -66,4 +66,5 @@ public interface Task {
 
   public void registerTezEvent(TezEvent tezEvent);
 
+  long getFirstAttemptStartTime();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 6c85cc2..26613e9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -122,6 +122,12 @@ public interface TaskAttempt {
    */
   long getLaunchTime();
 
+  /**
+   * Get the time at which this attempt was scheduled
+   * @return the time at which this attempt was scheduled, 0 if it hasn't been scheduled yet
+   */
+  long getScheduleTime();
+
   /** 
    * @return attempt's finish time. If attempt is not finished
    *  yet, returns 0.

http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 998108b..64a184f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -685,6 +685,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
+  public long getStartTime() {
+    readLock.lock();
+    try {
+      return this.startTime;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
   public StateChangeNotifier getStateChangeNotifier() {
     return entityUpdateTracker;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 11d4df9..092520d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -135,6 +135,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected final AppContext appContext;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
   private long launchTime = 0;
+  private long scheduleTime = 0;
   private long finishTime = 0;
   private String trackerName;
   private int httpPort;
@@ -671,6 +672,16 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   @Override
+  public long getScheduleTime() {
+    readLock.lock();
+    try {
+      return scheduleTime;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
   public long getFinishTime() {
     readLock.lock();
     try {
@@ -1030,6 +1041,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
 
+      ta.scheduleTime = ta.clock.getTime();
       // TODO Creating the remote task here may not be required in case of
       // recovery.
 

http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 15382a8..d4eabe6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1492,7 +1492,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       this.writeLock.unlock();
     }
   }
-  
+
+  @Override
+  public long getFirstAttemptStartTime() {
+    readLock.lock();
+    try {
+      // The first attempt will always have an index of 0.
+      return getAttempt(TezTaskAttemptID.getInstance(getTaskId(), 0)).getScheduleTime();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Private
   @VisibleForTesting
   public List<TezEvent> getTaskEvents() {