You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/10/26 06:45:21 UTC
tez git commit: TEZ-3856. API to access counters in
InputInitializerContext (Prasanth Jayachandran via zhiyuany)
Repository: tez
Updated Branches:
refs/heads/master d5ac3b75f -> ec9135145
TEZ-3856. API to access counters in InputInitializerContext (Prasanth Jayachandran via zhiyuany)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ec913514
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ec913514
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ec913514
Branch: refs/heads/master
Commit: ec9135145fda48917b319b1accc273254c707ae5
Parents: d5ac3b7
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Wed Oct 25 23:43:46 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Wed Oct 25 23:44:02 2017 -0700
----------------------------------------------------------------------
.../apache/tez/runtime/api/InputInitializerContext.java | 7 +++++++
.../src/main/java/org/apache/tez/dag/app/dag/Vertex.java | 6 ++++++
.../app/dag/impl/TezRootInputInitializerContextImpl.java | 5 +++++
.../java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | 11 ++++++++++-
.../test/java/org/apache/tez/mapreduce/TezTestUtils.java | 6 ++++++
5 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ec913514/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
index 6a123cf..ccfac46 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
@@ -117,4 +118,10 @@ public interface InputInitializerContext {
*/
void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
+ /**
+ * Add custom counters
+ *
+ * @param tezCounters counters to add
+ */
+ void addCounters(TezCounters tezCounters);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ec913514/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 4d0a4bf..ba7624c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -86,6 +86,12 @@ public interface Vertex extends Comparable<Vertex> {
*/
TezCounters getCachedCounters();
+ /**
+ * Add custom counters to the vertex
+ * @param tezCounters counters to add
+ */
+ void addCounters(TezCounters tezCounters);
+
int getMaxTaskConcurrency();
Map<TezTaskID, Task> getTasks();
Task getTask(TezTaskID taskID);
http://git-wip-us.apache.org/repos/asf/tez/blob/ec913514/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index 4ca4024..f713054 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
@@ -119,4 +120,8 @@ public class TezRootInputInitializerContextImpl implements
manager.registerForVertexUpdates(vertexName, input.getName(), stateSet);
}
+ @Override
+ public void addCounters(final TezCounters tezCounters) {
+ vertex.addCounters(tezCounters);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ec913514/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 209db5a..0bd73ee 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -231,6 +231,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
// must be a linked map for ordering
volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>();
private Object fullCountersLock = new Object();
+ private TezCounters counters = new TezCounters();
private TezCounters fullCounters = null;
private TezCounters cachedCounters = null;
private long cachedCountersTimestamp = 0;
@@ -1189,6 +1190,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
TezCounters counters = new TezCounters();
+ counters.incrAllCounters(this.counters);
return incrTaskCounters(counters, tasks.values());
} finally {
@@ -1217,13 +1219,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
}
TezCounters counters = new TezCounters();
+ counters.incrAllCounters(this.counters);
cachedCounters = incrTaskCounters(counters, tasks.values());
return cachedCounters;
} finally {
readLock.unlock();
}
}
-
+
+ @Override
+ public void addCounters(final TezCounters tezCounters) {
+ counters.incrAllCounters(tezCounters);
+ }
+
@Override
public int getMaxTaskConcurrency() {
return vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY,
@@ -3308,6 +3316,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Private
public void constructFinalFullcounters() {
this.fullCounters = new TezCounters();
+ this.fullCounters.incrAllCounters(counters);
this.vertexStats = new VertexStats();
for (Task t : this.tasks.values()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/ec913514/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
index 8912ad2..369afbe 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
@@ -19,6 +19,7 @@ package org.apache.tez.mapreduce;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.records.TezDAGID;
@@ -120,6 +121,11 @@ public class TezTestUtils {
}
@Override
+ public void addCounters(TezCounters tezCounters) {
+ throw new UnsupportedOperationException("addCounters not implemented in this mock");
+ }
+
+ @Override
public UserPayload getUserPayload() {
throw new UnsupportedOperationException("getUserPayload not implemented in this mock");
}