You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:04 UTC

[02/50] [abbrv] tez git commit: TEZ-2360. per-io counters flag should generate both overall and per-edge counters (pramachandran)

TEZ-2360. per-io counters flag should generate both overall and per-edge counters (pramachandran)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/765afd23
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/765afd23
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/765afd23

Branch: refs/heads/TEZ-2003
Commit: 765afd236fd178ebc315c1f912102578ec100c70
Parents: e36f962
Author: Prakash Ramachandran <pr...@hortonworks.com>
Authored: Fri May 1 00:48:16 2015 +0530
Committer: Prakash Ramachandran <pr...@hortonworks.com>
Committed: Fri May 1 00:48:16 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../runtime/api/impl/TezCountersDelegate.java   |  74 +++++++-
 .../java/org/apache/tez/test/TestTezJobs.java   | 184 +++++++++++++++++++
 3 files changed, 255 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/765afd23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d67db81..aa72320 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2360. per-io counters flag should generate both overall and per-edge counters
   TEZ-2389. Tez UI: Sort by attempt-no is incorrect in attempts pages.
   TEZ-2383. Cleanup input/output/processor contexts in LogicalIOProcessorRuntimeTask.
   TEZ-2084. Tez UI: Stacktrace format info is lost in diagnostics

http://git-wip-us.apache.org/repos/asf/tez/blob/765afd23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
index 5286839..3c79530 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
@@ -18,7 +18,12 @@
 
 package org.apache.tez.runtime.api.impl;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.AbstractCounter;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -27,7 +32,7 @@ public class TezCountersDelegate extends TezCounters {
 
   private final String groupModifier;
   private final TezCounters original;
-  
+
   public TezCountersDelegate(TezCounters original, String taskVertexName, String edgeVertexName,
       String type) {
     this.original = original;
@@ -39,10 +44,71 @@ public class TezCountersDelegate extends TezCounters {
   // the standard mechanism to find a counter.
   @Override
   public TezCounter findCounter(String groupName, String counterName) {
+    String simpleGroupName;
     if (groupName.equals(TaskCounter.class.getName())) {
-      groupName = TaskCounter.class.getSimpleName();
+      simpleGroupName = TaskCounter.class.getSimpleName();
+    } else  {
+      simpleGroupName = groupName;
+    }
+    String modifiedGroupName = simpleGroupName + "_" + this.groupModifier;
+    final TezCounter modifiedGroupCounter = original.findCounter(modifiedGroupName, counterName);
+    final TezCounter originalGroupCounter = original.findCounter(groupName, counterName);
+    return new CompositeCounter(modifiedGroupCounter, originalGroupCounter);
+  }
+
+  /*
+   * A counter class to wrap multiple counters. increment operation will increment both counters
+   */
+  private static class CompositeCounter extends AbstractCounter {
+
+    TezCounter modifiedCounter;
+    TezCounter originalCounter;
+
+    public CompositeCounter(TezCounter modifiedCounter, TezCounter originalCounter) {
+      this.modifiedCounter = modifiedCounter;
+      this.originalCounter = originalCounter;
+    }
+
+    @Override
+    public String getName() {
+      return modifiedCounter.getName();
+    }
+
+    @Override
+    public String getDisplayName() {
+      return modifiedCounter.getName();
+    }
+
+    @Override
+    public long getValue() {
+      return modifiedCounter.getValue();
+    }
+
+    @Override
+    public void setValue(long value) {
+      modifiedCounter.setValue(value);
+      originalCounter.setValue(value);
+    }
+
+    @Override
+    public void increment(long increment) {
+      modifiedCounter.increment(increment);
+      originalCounter.increment(increment);
+    }
+
+    @Override
+    public TezCounter getUnderlyingCounter() {
+      return this;
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+      assert false : "shouldn't be called";
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+      assert false : "shouldn't be called";
     }
-    String modifiedGroupName = groupName + "_" + this.groupModifier;
-    return original.findCounter(modifiedGroupName, counterName);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/765afd23/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index db212fa..13b0c03 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -19,6 +19,7 @@
 package org.apache.tez.test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -29,6 +30,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -41,6 +43,12 @@ import java.util.concurrent.locks.ReentrantLock;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -313,6 +321,103 @@ public class TestTezJobs {
   }
 
   @Test(timeout = 60000)
+  public void testPerIOCounterAggregation() throws Exception {
+    String baseDir = "/tmp/perIOCounterAgg/";
+    Path inPath1 = new Path(baseDir + "inPath1");
+    Path inPath2 = new Path(baseDir + "inPath2");
+    Path outPath = new Path(baseDir + "outPath");
+    final Set<String> expectedResults = generateSortMergeJoinInput(inPath1, inPath2);
+    Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+    remoteFs.mkdirs(stagingDirPath);
+
+    TezConfiguration conf = new TezConfiguration(mrrTezCluster.getConfig());
+    conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
+    TezClient tezClient = TezClient.create(SortMergeJoinHelper.class.getSimpleName(), conf);
+    tezClient.start();
+
+    SortMergeJoinHelper sortMergeJoinHelper = new SortMergeJoinHelper(tezClient);
+    sortMergeJoinHelper.setConf(conf);
+
+    String[] args = new String[] {
+        "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(),
+        inPath1.toString(), inPath2.toString(), "1", outPath.toString() };
+    assertEquals(0, sortMergeJoinHelper.run(conf, args, tezClient));
+
+    verifySortMergeJoinInput(outPath, expectedResults);
+
+    String joinerVertexName = "joiner";
+    String input1Name = "input1";
+    String input2Name = "input2";
+    String joinOutputName = "joinOutput";
+    Set<StatusGetOpts> statusOpts = new HashSet<StatusGetOpts>();
+    statusOpts.add(StatusGetOpts.GET_COUNTERS);
+    VertexStatus joinerVertexStatus =
+        sortMergeJoinHelper.dagClient.getVertexStatus(joinerVertexName, statusOpts);
+    final TezCounters joinerCounters = joinerVertexStatus.getVertexCounters();
+    final CounterGroup aggregatedGroup = joinerCounters.getGroup(TaskCounter.class.getCanonicalName());
+    final CounterGroup input1Group = joinerCounters.getGroup(
+        TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_INPUT_" + input1Name);
+    final CounterGroup input2Group = joinerCounters.getGroup(
+        TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_INPUT_" + input2Name);
+    assertTrue("aggregated counter group cannot be empty", aggregatedGroup.size() > 0);
+    assertTrue("per io group for input1 cannot be empty", input1Group.size() > 0);
+    assertTrue("per io group for input1 cannot be empty", input2Group.size() > 0);
+
+    List<TaskCounter> countersToVerifyAgg = Arrays.asList(
+        TaskCounter.ADDITIONAL_SPILLS_BYTES_READ,
+        TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN,
+        TaskCounter.COMBINE_INPUT_RECORDS,
+        TaskCounter.MERGED_MAP_OUTPUTS,
+        TaskCounter.NUM_DISK_TO_DISK_MERGES,
+        TaskCounter.NUM_FAILED_SHUFFLE_INPUTS,
+        TaskCounter.NUM_MEM_TO_DISK_MERGES,
+        TaskCounter.NUM_SHUFFLED_INPUTS,
+        TaskCounter.NUM_SKIPPED_INPUTS,
+        TaskCounter.REDUCE_INPUT_GROUPS,
+        TaskCounter.REDUCE_INPUT_RECORDS,
+        TaskCounter.SHUFFLE_BYTES,
+        TaskCounter.SHUFFLE_BYTES_DECOMPRESSED,
+        TaskCounter.SHUFFLE_BYTES_DISK_DIRECT,
+        TaskCounter.SHUFFLE_BYTES_TO_DISK,
+        TaskCounter.SHUFFLE_BYTES_TO_MEM,
+        TaskCounter.SPILLED_RECORDS
+    );
+
+    int nonZeroCounters = 0;
+    // verify that the sum of the counter values for edges add up to the aggregated counter value.
+    for(TaskCounter c : countersToVerifyAgg) {
+      TezCounter aggregatedCounter = aggregatedGroup.findCounter(c.name(), false);
+      TezCounter input1Counter = input1Group.findCounter(c.name(), false);
+      TezCounter input2Counter = input2Group.findCounter(c.name(), false);
+      assertNotNull("aggregated counter cannot be null " + c.name(), aggregatedCounter);
+      assertNotNull("input1 counter cannot be null " + c.name(), input1Counter);
+      assertNotNull("input2 counter cannot be null " + c.name(), input2Counter);
+
+      assertEquals("aggregated counter does not match sum of input counters " + c.name(),
+          aggregatedCounter.getValue(), input1Counter.getValue() + input2Counter.getValue());
+
+      if (aggregatedCounter.getValue() > 0) {
+        nonZeroCounters++;
+      }
+    }
+
+    // ensure that at least one of the counters tested above were non-zero.
+    assertTrue("At least one of the counter should be non-zero. invalid test ", nonZeroCounters > 0);
+
+    CounterGroup joinerOutputGroup = joinerCounters.getGroup(
+        TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_OUTPUT_" + joinOutputName);
+    String outputCounterName = TaskCounter.OUTPUT_RECORDS.name();
+    TezCounter aggregateCounter = aggregatedGroup.findCounter(outputCounterName, false);
+    TezCounter joinerOutputCounter = joinerOutputGroup.findCounter(outputCounterName, false);
+    assertNotNull("aggregated counter cannot be null " + outputCounterName, aggregateCounter);
+    assertNotNull("output counter cannot be null " + outputCounterName, joinerOutputCounter);
+    assertTrue("counter value is zero. test is invalid", aggregateCounter.getValue() > 0);
+    assertEquals("aggregated counter does not match sum of output counters " + outputCounterName,
+        aggregateCounter.getValue(), joinerOutputCounter.getValue());
+  }
+
+
+  @Test(timeout = 60000)
   public void testSortMergeJoinExampleDisableSplitGrouping() throws Exception {
     SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
     sortMergeJoinExample.setConf(conf);
@@ -982,4 +1087,83 @@ public class TestTezJobs {
       }
     }
   }
+
+  private static class SortMergeJoinHelper extends SortMergeJoinExample {
+    private final TezClient tezClientInternal;
+    private DAGClient dagClient;
+
+    public SortMergeJoinHelper(TezClient tezClient) {
+      this.tezClientInternal = tezClient;
+    }
+
+    @Override
+    public int runDag(DAG dag, boolean printCounters, Logger logger) throws TezException,
+        InterruptedException, IOException {
+      tezClientInternal.waitTillReady();
+      dagClient = tezClientInternal.submitDAG(dag);
+      Set<StatusGetOpts> getOpts = new HashSet<StatusGetOpts>();
+      if (printCounters) {
+        getOpts.add(StatusGetOpts.GET_COUNTERS);
+      }
+
+      DAGStatus dagStatus;
+      dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);
+
+      if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+        logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+        return -1;
+      }
+      return 0;
+    }
+  }
+
+  private Set<String> generateSortMergeJoinInput(Path inPath1, Path inPath2) throws
+      IOException {
+    remoteFs.mkdirs(inPath1);
+    remoteFs.mkdirs(inPath2);
+
+    Set<String> expectedResult = new HashSet<String>();
+    FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file"));
+    FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file"));
+
+    BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1));
+    BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2));
+    for (int i = 0; i < 20; i++) {
+      String term = "term" + i;
+      writer1.write(term);
+      writer1.newLine();
+      if (i % 2 == 0) {
+        writer2.write(term);
+        writer2.newLine();
+        expectedResult.add(term);
+      }
+    }
+    writer1.close();
+    writer2.close();
+    out1.close();
+    out2.close();
+
+    return expectedResult;
+  }
+
+  private void verifySortMergeJoinInput(Path outPath, Set<String> expectedResult) throws
+      IOException {
+    FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() {
+      public boolean accept(Path p) {
+        String name = p.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
+      }
+    });
+    assertEquals(1, statuses.length);
+    FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inStream));
+    String line;
+    while ((line = reader.readLine()) != null) {
+      assertTrue(expectedResult.remove(line));
+    }
+    reader.close();
+    inStream.close();
+    assertEquals(0, expectedResult.size());
+  }
+
 }