You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/28 01:54:35 UTC
git commit: TEZ-887. Allow counters to be separated at a per
Input/Output level. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 847cbaa35 -> 74078d309
TEZ-887. Allow counters to be separated at a per Input/Output level.
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/74078d30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/74078d30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/74078d30
Branch: refs/heads/master
Commit: 74078d3092292f7b27828491669850bfe68ce5f4
Parents: 847cbaa
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Feb 27 16:54:16 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Feb 27 16:54:16 2014 -0800
----------------------------------------------------------------------
.../apache/tez/dag/api/TezConfiguration.java | 11 ++++
.../runtime/api/impl/TezCountersDelegate.java | 58 ++++++++++++++++++++
.../runtime/api/impl/TezInputContextImpl.java | 14 ++++-
.../runtime/api/impl/TezOutputContextImpl.java | 14 ++++-
4 files changed, 95 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74078d30/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 4355cf5..142d2d9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -185,6 +185,17 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX
+ "scale.memory.reserve-fraction";
+ /**
+ * Whether to generate counters per IO or not. Enabling this will rename
+ * CounterGroups / CounterNames to making thme unique per Vertex +
+ * Src|Destination
+ */
+ @Unstable
+ @Private
+ public static final String TEZ_TASK_GENERATE_COUNTERS_PER_IO = TEZ_TASK_PREFIX
+ + "generate.counters.per.io";
+ public static final boolean TEZ_TASK_GENERATE_COUNTERS_PER_IO_DEFAULT = false;
+
public static final String TASK_TIMEOUT = TEZ_TASK_PREFIX + "timeout";
public static final String TASK_HEARTBEAT_TIMEOUT_MS = TEZ_TASK_PREFIX + "heartbeat.timeout-ms";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74078d30/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
new file mode 100644
index 0000000..29beac0
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+
+public class TezCountersDelegate extends TezCounters {
+
+ private final String groupModifier;
+ private final TezCounters original;
+
+ public TezCountersDelegate(TezCounters original, String taskVertexName, String edgeVertexName,
+ String type) {
+ this.original = original;
+ this.groupModifier = cleanVertexName(taskVertexName) + "_" + type + "_"
+ + cleanVertexName(edgeVertexName);
+ }
+
+ // Should only be called while setting up Inputs / Outputs - rather than being
+ // the standard mechanism to find a counter.
+ @Override
+ public TezCounter findCounter(String groupName, String counterName) {
+ if (groupName.equals(TaskCounter.class.getName())) {
+ groupName = TaskCounter.class.getSimpleName();
+ }
+ String modifiedGroupName = groupName + "_" + this.groupModifier;
+ return original.findCounter(modifiedGroupName, counterName);
+ }
+
+
+ private static String cleanVertexName(String vertexName) {
+ return sanitizeString(vertexName).substring(0,
+ vertexName.length() > 40 ? 40 : vertexName.length());
+ }
+
+ private static String sanitizeString(String srcString) {
+ String res = srcString.replaceAll("[^A-za-z0-9_]", "_");
+ return res; // Number starts allowed rightnow
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74078d30/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index a80d90b..c138a58 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.InputReadyTracker;
import org.apache.tez.runtime.RuntimeTask;
@@ -55,7 +56,8 @@ public class TezInputContextImpl extends TezTaskContextImpl
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
InputDescriptor inputDescriptor, Input input, InputReadyTracker inputReadyTracker) {
super(conf, appAttemptNumber, dagName, taskVertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+ wrapCounters(counters, taskVertexName, sourceVertexName, conf),
+ runtimeTask, tezUmbilical, serviceConsumerMetadata,
auxServiceEnv, memDist, inputDescriptor);
this.userPayload = userPayload;
this.inputIndex = inputIndex;
@@ -67,6 +69,16 @@ public class TezInputContextImpl extends TezTaskContextImpl
this.inputReadyTracker = inputReadyTracker;
}
+ private static TezCounters wrapCounters(TezCounters tezCounters, String taskVertexName,
+ String edgeVertexName, Configuration conf) {
+ if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO,
+ TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO_DEFAULT)) {
+ return new TezCountersDelegate(tezCounters, taskVertexName, edgeVertexName, "INPUT");
+ } else {
+ return tezCounters;
+ }
+ }
+
@Override
public void sendEvents(List<Event> events) {
List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74078d30/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 5bb261d..1a2cfcb 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.api.Event;
@@ -53,7 +54,8 @@ public class TezOutputContextImpl extends TezTaskContextImpl
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
OutputDescriptor outputDescriptor) {
super(conf, appAttemptNumber, dagName, taskVertexName, taskAttemptID,
- counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+ wrapCounters(counters, taskVertexName, destinationVertexName, conf),
+ runtimeTask, tezUmbilical, serviceConsumerMetadata,
auxServiceEnv, memDist, outputDescriptor);
this.userPayload = userPayload;
this.outputIndex = outputIndex;
@@ -62,6 +64,16 @@ public class TezOutputContextImpl extends TezTaskContextImpl
taskVertexName, destinationVertexName, taskAttemptID);
}
+ private static TezCounters wrapCounters(TezCounters tezCounters, String taskVertexName,
+ String edgeVertexName, Configuration conf) {
+ if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO,
+ TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO_DEFAULT)) {
+ return new TezCountersDelegate(tezCounters, taskVertexName, edgeVertexName, "OUTPUT");
+ } else {
+ return tezCounters;
+ }
+ }
+
@Override
public void sendEvents(List<Event> events) {
List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());