You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:46 UTC
[44/50] [abbrv] tez git commit: TEZ-2347. Expose additional
information in TaskCommunicatorContext. (sseth)
TEZ-2347. Expose additional information in TaskCommunicatorContext. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/960d92bf
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/960d92bf
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/960d92bf
Branch: refs/heads/TEZ-2003
Commit: 960d92bfe8fbd440f273f2920be25919e62a93e4
Parents: 3e03847
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Apr 20 13:17:31 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:18:08 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../tez/dag/api/TaskCommunicatorContext.java | 50 ++++++++++++++++++++
.../dag/app/TaskCommunicatorContextImpl.java | 50 ++++++++++++++++++++
.../java/org/apache/tez/dag/app/dag/DAG.java | 2 +
.../java/org/apache/tez/dag/app/dag/Task.java | 1 +
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 6 +++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 10 ++++
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 12 +++++
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 13 ++++-
9 files changed, 144 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index ca5225e..7c13110 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -16,5 +16,6 @@ ALL CHANGES:
TEZ-2284. Separate TaskReporter into an interface.
TEZ-2285. Allow TaskCommunicators to indicate task/container liveness.
TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates.
+ TEZ-2347. Expose additional information in TaskCommunicatorContext.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 19caed9..56345ab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -16,6 +16,7 @@ package org.apache.tez.dag.api;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Collection;
import java.util.Set;
import org.apache.hadoop.security.Credentials;
@@ -71,4 +72,53 @@ public interface TaskCommunicatorContext {
// TODO TEZ-2003 API. Should a method exist for task succeeded.
// TODO Eventually Add methods to report availability stats to the scheduler.
+
+ /**
+ * Get the name of the currently executing dag
+ * @return the name of the currently executing dag
+ */
+ String getCurretnDagName();
+
+ /**
+ * Get the name of the Input vertices for the specified vertex.
+ * Root Inputs are not returned.
+ * @param vertexName the vertex for which source vertex names will be returned
+ * @return an Iterable containing the list of input vertices for the specified vertex
+ */
+ Iterable<String> getInputVertexNames(String vertexName);
+
+ /**
+ * Get the total number of tasks in the given vertex
+ * @param vertexName
+ * @return total number of tasks in this vertex
+ */
+ int getVertexTotalTaskCount(String vertexName);
+
+ /**
+ * Get the number of completed tasks for a given vertex
+ * @param vertexName the vertex name
+ * @return the number of completed tasks for the vertex
+ */
+ int getVertexCompletedTaskCount(String vertexName);
+
+ /**
+ * Get the number of running tasks for a given vertex
+ * @param vertexName the vertex name
+ * @return the number of running tasks for the vertex
+ */
+ int getVertexRunningTaskCount(String vertexName);
+
+ /**
+ * Get the start time for the first attempt of the specified task
+ * @param vertexName the vertex to which the task belongs
+ * @param taskIndex the index of the task
+ * @return the start time for the first attempt of the task
+ */
+ long getFirstAttemptStartTime(String vertexName, int taskIndex);
+
+ /**
+ * Get the start time for the currently executing DAG
+ * @return time when the current dag started executing
+ */
+ long getDagStartTime();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 3714c3c..4cb0c93 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -18,7 +18,9 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Set;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -31,6 +33,7 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -111,6 +114,53 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this);
}
+ @Override
+ public String getCurretnDagName() {
+ return context.getCurrentDAG().getName();
+ }
+
+ @Override
+ public Iterable<String> getInputVertexNames(String vertexName) {
+ Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
+ Vertex vertex = context.getCurrentDAG().getVertex(vertexName);
+ Set<Vertex> sources = vertex.getInputVertices().keySet();
+ return Iterables.transform(sources, new Function<Vertex, String>() {
+ @Override
+ public String apply(@Nullable Vertex input) {
+ return input.getName();
+ }
+ });
+ }
+
+ @Override
+ public int getVertexTotalTaskCount(String vertexName) {
+ Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
+ return context.getCurrentDAG().getVertex(vertexName).getTotalTasks();
+ }
+
+ @Override
+ public int getVertexCompletedTaskCount(String vertexName) {
+ Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
+ return context.getCurrentDAG().getVertex(vertexName).getCompletedTasks();
+ }
+
+ @Override
+ public int getVertexRunningTaskCount(String vertexName) {
+ Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
+ return context.getCurrentDAG().getVertex(vertexName).getRunningTasks();
+ }
+
+ @Override
+ public long getFirstAttemptStartTime(String vertexName, int taskIndex) {
+ Preconditions.checkArgument(vertexName != null, "VertexName must be specified");
+ Preconditions.checkArgument(taskIndex >=0, "TaskIndex must be > 0");
+ return context.getCurrentDAG().getVertex(vertexName).getTask(taskIndex).getFirstAttemptStartTime();
+ }
+
+ @Override
+ public long getDagStartTime() {
+ return context.getCurrentDAG().getStartTime();
+ }
@Override
public void onStateUpdated(VertexStateUpdate event) {
http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 6d6872b..458362f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -94,6 +94,8 @@ public interface DAG {
Map<String, TezVertexID> getVertexNameIDMapping();
+ long getStartTime();
+
StateChangeNotifier getStateChangeNotifier();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index b798fce..3af14b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -66,4 +66,5 @@ public interface Task {
public void registerTezEvent(TezEvent tezEvent);
+ long getFirstAttemptStartTime();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 6c85cc2..26613e9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -122,6 +122,12 @@ public interface TaskAttempt {
*/
long getLaunchTime();
+ /**
+ * Get the time at which this attempt was scheduled
+ * @return the time at which this attempt was scheduled, 0 if it hasn't been scheduled yet
+ */
+ long getScheduleTime();
+
/**
* @return attempt's finish time. If attempt is not finished
* yet, returns 0.
http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 998108b..64a184f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -685,6 +685,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
@Override
+ public long getStartTime() {
+ readLock.lock();
+ try {
+ return this.startTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
public StateChangeNotifier getStateChangeNotifier() {
return entityUpdateTracker;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 11d4df9..092520d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -135,6 +135,7 @@ public class TaskAttemptImpl implements TaskAttempt,
protected final AppContext appContext;
private final TaskHeartbeatHandler taskHeartbeatHandler;
private long launchTime = 0;
+ private long scheduleTime = 0;
private long finishTime = 0;
private String trackerName;
private int httpPort;
@@ -671,6 +672,16 @@ public class TaskAttemptImpl implements TaskAttempt,
}
@Override
+ public long getScheduleTime() {
+ readLock.lock();
+ try {
+ return scheduleTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
public long getFinishTime() {
readLock.lock();
try {
@@ -1030,6 +1041,7 @@ public class TaskAttemptImpl implements TaskAttempt,
public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
+ ta.scheduleTime = ta.clock.getTime();
// TODO Creating the remote task here may not be required in case of
// recovery.
http://git-wip-us.apache.org/repos/asf/tez/blob/960d92bf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 15382a8..d4eabe6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1492,7 +1492,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
this.writeLock.unlock();
}
}
-
+
+ @Override
+ public long getFirstAttemptStartTime() {
+ readLock.lock();
+ try {
+ // The first attempt will always have an index of 0.
+ return getAttempt(TezTaskAttemptID.getInstance(getTaskId(), 0)).getScheduleTime();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
@Private
@VisibleForTesting
public List<TezEvent> getTaskEvents() {