You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/28 01:54:35 UTC

git commit: TEZ-887. Allow counters to be separated at a per Input/Output level. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 847cbaa35 -> 74078d309


TEZ-887. Allow counters to be separated at a per Input/Output level.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/74078d30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/74078d30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/74078d30

Branch: refs/heads/master
Commit: 74078d3092292f7b27828491669850bfe68ce5f4
Parents: 847cbaa
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Feb 27 16:54:16 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Feb 27 16:54:16 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    | 11 ++++
 .../runtime/api/impl/TezCountersDelegate.java   | 58 ++++++++++++++++++++
 .../runtime/api/impl/TezInputContextImpl.java   | 14 ++++-
 .../runtime/api/impl/TezOutputContextImpl.java  | 14 ++++-
 4 files changed, 95 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74078d30/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 4355cf5..142d2d9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -185,6 +185,17 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX
       + "scale.memory.reserve-fraction";
 
+  /**
+   * Whether to generate counters per IO or not. Enabling this will rename
+   * CounterGroups / CounterNames to making thme unique per Vertex +
+   * Src|Destination
+   */
+  @Unstable
+  @Private
+  public static final String TEZ_TASK_GENERATE_COUNTERS_PER_IO = TEZ_TASK_PREFIX
+      + "generate.counters.per.io";
+  public static final boolean TEZ_TASK_GENERATE_COUNTERS_PER_IO_DEFAULT = false;
+  
   public static final String TASK_TIMEOUT = TEZ_TASK_PREFIX + "timeout";
 
   public static final String TASK_HEARTBEAT_TIMEOUT_MS = TEZ_TASK_PREFIX + "heartbeat.timeout-ms";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74078d30/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
new file mode 100644
index 0000000..29beac0
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+
+public class TezCountersDelegate extends TezCounters {
+
+  private final String groupModifier;
+  private final TezCounters original;
+  
+  public TezCountersDelegate(TezCounters original, String taskVertexName, String edgeVertexName,
+      String type) {
+    this.original = original;
+    this.groupModifier = cleanVertexName(taskVertexName) + "_" + type + "_"
+        + cleanVertexName(edgeVertexName);
+  }
+
+  // Should only be called while setting up Inputs / Outputs - rather than being
+  // the standard mechanism to find a counter.
+  @Override
+  public TezCounter findCounter(String groupName, String counterName) {
+    if (groupName.equals(TaskCounter.class.getName())) {
+      groupName = TaskCounter.class.getSimpleName();
+    }
+    String modifiedGroupName = groupName + "_" + this.groupModifier;
+    return original.findCounter(modifiedGroupName, counterName);
+  }
+
+
+  private static String cleanVertexName(String vertexName) {
+    return sanitizeString(vertexName).substring(0,
+        vertexName.length() > 40 ? 40 : vertexName.length());
+  }
+
+  private static String sanitizeString(String srcString) {
+    String res = srcString.replaceAll("[^A-za-z0-9_]", "_"); 
+    return res; // Number starts allowed rightnow
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74078d30/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index a80d90b..c138a58 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.RuntimeTask;
@@ -55,7 +56,8 @@ public class TezInputContextImpl extends TezTaskContextImpl
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       InputDescriptor inputDescriptor,  Input input, InputReadyTracker inputReadyTracker) {
     super(conf, appAttemptNumber, dagName, taskVertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+        wrapCounters(counters, taskVertexName, sourceVertexName, conf),
+        runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, inputDescriptor);
     this.userPayload = userPayload;
     this.inputIndex = inputIndex;
@@ -67,6 +69,16 @@ public class TezInputContextImpl extends TezTaskContextImpl
     this.inputReadyTracker = inputReadyTracker;
   }
 
+  private static TezCounters wrapCounters(TezCounters tezCounters, String taskVertexName,
+      String edgeVertexName, Configuration conf) {
+    if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO,
+        TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO_DEFAULT)) {
+      return new TezCountersDelegate(tezCounters, taskVertexName, edgeVertexName, "INPUT");
+    } else {
+      return tezCounters;
+    }
+  }
+
   @Override
   public void sendEvents(List<Event> events) {
     List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/74078d30/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 5bb261d..1a2cfcb 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.Event;
@@ -53,7 +54,8 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       OutputDescriptor outputDescriptor) {
     super(conf, appAttemptNumber, dagName, taskVertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+        wrapCounters(counters, taskVertexName, destinationVertexName, conf),
+        runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, outputDescriptor);
     this.userPayload = userPayload;
     this.outputIndex = outputIndex;
@@ -62,6 +64,16 @@ public class TezOutputContextImpl extends TezTaskContextImpl
         taskVertexName, destinationVertexName, taskAttemptID);
   }
 
+  private static TezCounters wrapCounters(TezCounters tezCounters, String taskVertexName,
+      String edgeVertexName, Configuration conf) {
+    if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO,
+        TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO_DEFAULT)) {
+      return new TezCountersDelegate(tezCounters, taskVertexName, edgeVertexName, "OUTPUT");
+    } else {
+      return tezCounters;
+    }
+  }
+
   @Override
   public void sendEvents(List<Event> events) {
     List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());