You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:04 UTC
[02/50] [abbrv] tez git commit: TEZ-2360. per-io counters flag should
generate both overall and per-edge counters (pramachandran)
TEZ-2360. per-io counters flag should generate both overall and per-edge counters (pramachandran)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/765afd23
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/765afd23
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/765afd23
Branch: refs/heads/TEZ-2003
Commit: 765afd236fd178ebc315c1f912102578ec100c70
Parents: e36f962
Author: Prakash Ramachandran <pr...@hortonworks.com>
Authored: Fri May 1 00:48:16 2015 +0530
Committer: Prakash Ramachandran <pr...@hortonworks.com>
Committed: Fri May 1 00:48:16 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../runtime/api/impl/TezCountersDelegate.java | 74 +++++++-
.../java/org/apache/tez/test/TestTezJobs.java | 184 +++++++++++++++++++
3 files changed, 255 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/765afd23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d67db81..aa72320 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
ALL CHANGES:
+ TEZ-2360. per-io counters flag should generate both overall and per-edge counters
TEZ-2389. Tez UI: Sort by attempt-no is incorrect in attempts pages.
TEZ-2383. Cleanup input/output/processor contexts in LogicalIOProcessorRuntimeTask.
TEZ-2084. Tez UI: Stacktrace format info is lost in diagnostics
http://git-wip-us.apache.org/repos/asf/tez/blob/765afd23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
index 5286839..3c79530 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
@@ -18,7 +18,12 @@
package org.apache.tez.runtime.api.impl;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.AbstractCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
@@ -27,7 +32,7 @@ public class TezCountersDelegate extends TezCounters {
private final String groupModifier;
private final TezCounters original;
-
+
public TezCountersDelegate(TezCounters original, String taskVertexName, String edgeVertexName,
String type) {
this.original = original;
@@ -39,10 +44,71 @@ public class TezCountersDelegate extends TezCounters {
// the standard mechanism to find a counter.
@Override
public TezCounter findCounter(String groupName, String counterName) {
+ String simpleGroupName;
if (groupName.equals(TaskCounter.class.getName())) {
- groupName = TaskCounter.class.getSimpleName();
+ simpleGroupName = TaskCounter.class.getSimpleName();
+ } else {
+ simpleGroupName = groupName;
+ }
+ String modifiedGroupName = simpleGroupName + "_" + this.groupModifier;
+ final TezCounter modifiedGroupCounter = original.findCounter(modifiedGroupName, counterName);
+ final TezCounter originalGroupCounter = original.findCounter(groupName, counterName);
+ return new CompositeCounter(modifiedGroupCounter, originalGroupCounter);
+ }
+
+ /*
+ * A counter class to wrap multiple counters. increment operation will increment both counters
+ */
+ private static class CompositeCounter extends AbstractCounter {
+
+ TezCounter modifiedCounter;
+ TezCounter originalCounter;
+
+ public CompositeCounter(TezCounter modifiedCounter, TezCounter originalCounter) {
+ this.modifiedCounter = modifiedCounter;
+ this.originalCounter = originalCounter;
+ }
+
+ @Override
+ public String getName() {
+ return modifiedCounter.getName();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return modifiedCounter.getName();
+ }
+
+ @Override
+ public long getValue() {
+ return modifiedCounter.getValue();
+ }
+
+ @Override
+ public void setValue(long value) {
+ modifiedCounter.setValue(value);
+ originalCounter.setValue(value);
+ }
+
+ @Override
+ public void increment(long increment) {
+ modifiedCounter.increment(increment);
+ originalCounter.increment(increment);
+ }
+
+ @Override
+ public TezCounter getUnderlyingCounter() {
+ return this;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ assert false : "shouldn't be called";
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ assert false : "shouldn't be called";
}
- String modifiedGroupName = groupName + "_" + this.groupModifier;
- return original.findCounter(modifiedGroupName, counterName);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/765afd23/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index db212fa..13b0c03 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -19,6 +19,7 @@
package org.apache.tez.test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -29,6 +30,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.HashSet;
@@ -41,6 +43,12 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -313,6 +321,103 @@ public class TestTezJobs {
}
@Test(timeout = 60000)
+ public void testPerIOCounterAggregation() throws Exception {
+ String baseDir = "/tmp/perIOCounterAgg/";
+ Path inPath1 = new Path(baseDir + "inPath1");
+ Path inPath2 = new Path(baseDir + "inPath2");
+ Path outPath = new Path(baseDir + "outPath");
+ final Set<String> expectedResults = generateSortMergeJoinInput(inPath1, inPath2);
+ Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+ remoteFs.mkdirs(stagingDirPath);
+
+ TezConfiguration conf = new TezConfiguration(mrrTezCluster.getConfig());
+ conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
+ TezClient tezClient = TezClient.create(SortMergeJoinHelper.class.getSimpleName(), conf);
+ tezClient.start();
+
+ SortMergeJoinHelper sortMergeJoinHelper = new SortMergeJoinHelper(tezClient);
+ sortMergeJoinHelper.setConf(conf);
+
+ String[] args = new String[] {
+ "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(),
+ inPath1.toString(), inPath2.toString(), "1", outPath.toString() };
+ assertEquals(0, sortMergeJoinHelper.run(conf, args, tezClient));
+
+ verifySortMergeJoinInput(outPath, expectedResults);
+
+ String joinerVertexName = "joiner";
+ String input1Name = "input1";
+ String input2Name = "input2";
+ String joinOutputName = "joinOutput";
+ Set<StatusGetOpts> statusOpts = new HashSet<StatusGetOpts>();
+ statusOpts.add(StatusGetOpts.GET_COUNTERS);
+ VertexStatus joinerVertexStatus =
+ sortMergeJoinHelper.dagClient.getVertexStatus(joinerVertexName, statusOpts);
+ final TezCounters joinerCounters = joinerVertexStatus.getVertexCounters();
+ final CounterGroup aggregatedGroup = joinerCounters.getGroup(TaskCounter.class.getCanonicalName());
+ final CounterGroup input1Group = joinerCounters.getGroup(
+ TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_INPUT_" + input1Name);
+ final CounterGroup input2Group = joinerCounters.getGroup(
+ TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_INPUT_" + input2Name);
+ assertTrue("aggregated counter group cannot be empty", aggregatedGroup.size() > 0);
+ assertTrue("per io group for input1 cannot be empty", input1Group.size() > 0);
+ assertTrue("per io group for input1 cannot be empty", input2Group.size() > 0);
+
+ List<TaskCounter> countersToVerifyAgg = Arrays.asList(
+ TaskCounter.ADDITIONAL_SPILLS_BYTES_READ,
+ TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN,
+ TaskCounter.COMBINE_INPUT_RECORDS,
+ TaskCounter.MERGED_MAP_OUTPUTS,
+ TaskCounter.NUM_DISK_TO_DISK_MERGES,
+ TaskCounter.NUM_FAILED_SHUFFLE_INPUTS,
+ TaskCounter.NUM_MEM_TO_DISK_MERGES,
+ TaskCounter.NUM_SHUFFLED_INPUTS,
+ TaskCounter.NUM_SKIPPED_INPUTS,
+ TaskCounter.REDUCE_INPUT_GROUPS,
+ TaskCounter.REDUCE_INPUT_RECORDS,
+ TaskCounter.SHUFFLE_BYTES,
+ TaskCounter.SHUFFLE_BYTES_DECOMPRESSED,
+ TaskCounter.SHUFFLE_BYTES_DISK_DIRECT,
+ TaskCounter.SHUFFLE_BYTES_TO_DISK,
+ TaskCounter.SHUFFLE_BYTES_TO_MEM,
+ TaskCounter.SPILLED_RECORDS
+ );
+
+ int nonZeroCounters = 0;
+ // verify that the sum of the counter values for edges add up to the aggregated counter value.
+ for(TaskCounter c : countersToVerifyAgg) {
+ TezCounter aggregatedCounter = aggregatedGroup.findCounter(c.name(), false);
+ TezCounter input1Counter = input1Group.findCounter(c.name(), false);
+ TezCounter input2Counter = input2Group.findCounter(c.name(), false);
+ assertNotNull("aggregated counter cannot be null " + c.name(), aggregatedCounter);
+ assertNotNull("input1 counter cannot be null " + c.name(), input1Counter);
+ assertNotNull("input2 counter cannot be null " + c.name(), input2Counter);
+
+ assertEquals("aggregated counter does not match sum of input counters " + c.name(),
+ aggregatedCounter.getValue(), input1Counter.getValue() + input2Counter.getValue());
+
+ if (aggregatedCounter.getValue() > 0) {
+ nonZeroCounters++;
+ }
+ }
+
+ // ensure that at least one of the counters tested above were non-zero.
+ assertTrue("At least one of the counter should be non-zero. invalid test ", nonZeroCounters > 0);
+
+ CounterGroup joinerOutputGroup = joinerCounters.getGroup(
+ TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_OUTPUT_" + joinOutputName);
+ String outputCounterName = TaskCounter.OUTPUT_RECORDS.name();
+ TezCounter aggregateCounter = aggregatedGroup.findCounter(outputCounterName, false);
+ TezCounter joinerOutputCounter = joinerOutputGroup.findCounter(outputCounterName, false);
+ assertNotNull("aggregated counter cannot be null " + outputCounterName, aggregateCounter);
+ assertNotNull("output counter cannot be null " + outputCounterName, joinerOutputCounter);
+ assertTrue("counter value is zero. test is invalid", aggregateCounter.getValue() > 0);
+ assertEquals("aggregated counter does not match sum of output counters " + outputCounterName,
+ aggregateCounter.getValue(), joinerOutputCounter.getValue());
+ }
+
+
+ @Test(timeout = 60000)
public void testSortMergeJoinExampleDisableSplitGrouping() throws Exception {
SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
sortMergeJoinExample.setConf(conf);
@@ -982,4 +1087,83 @@ public class TestTezJobs {
}
}
}
+
+ private static class SortMergeJoinHelper extends SortMergeJoinExample {
+ private final TezClient tezClientInternal;
+ private DAGClient dagClient;
+
+ public SortMergeJoinHelper(TezClient tezClient) {
+ this.tezClientInternal = tezClient;
+ }
+
+ @Override
+ public int runDag(DAG dag, boolean printCounters, Logger logger) throws TezException,
+ InterruptedException, IOException {
+ tezClientInternal.waitTillReady();
+ dagClient = tezClientInternal.submitDAG(dag);
+ Set<StatusGetOpts> getOpts = new HashSet<StatusGetOpts>();
+ if (printCounters) {
+ getOpts.add(StatusGetOpts.GET_COUNTERS);
+ }
+
+ DAGStatus dagStatus;
+ dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);
+
+ if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+ logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+ return -1;
+ }
+ return 0;
+ }
+ }
+
+ private Set<String> generateSortMergeJoinInput(Path inPath1, Path inPath2) throws
+ IOException {
+ remoteFs.mkdirs(inPath1);
+ remoteFs.mkdirs(inPath2);
+
+ Set<String> expectedResult = new HashSet<String>();
+ FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file"));
+ FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file"));
+
+ BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1));
+ BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2));
+ for (int i = 0; i < 20; i++) {
+ String term = "term" + i;
+ writer1.write(term);
+ writer1.newLine();
+ if (i % 2 == 0) {
+ writer2.write(term);
+ writer2.newLine();
+ expectedResult.add(term);
+ }
+ }
+ writer1.close();
+ writer2.close();
+ out1.close();
+ out2.close();
+
+ return expectedResult;
+ }
+
+ private void verifySortMergeJoinInput(Path outPath, Set<String> expectedResult) throws
+ IOException {
+ FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ });
+ assertEquals(1, statuses.length);
+ FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
+ BufferedReader reader = new BufferedReader(new InputStreamReader(inStream));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ assertTrue(expectedResult.remove(line));
+ }
+ reader.close();
+ inStream.close();
+ assertEquals(0, expectedResult.size());
+ }
+
}