You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/05/20 22:56:43 UTC
git commit: TEZ-1139. Add a test for IntersectDataGen and
IntersectValidate. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 60905cd25 -> 2e664e76c
TEZ-1139. Add a test for IntersectDataGen and IntersectValidate. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2e664e76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2e664e76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2e664e76
Branch: refs/heads/master
Commit: 2e664e76cfe07bed6cc024ec474c6135f681919a
Parents: 60905cd
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 20 13:56:24 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 20 13:56:24 2014 -0700
----------------------------------------------------------------------
.../mapreduce/examples/IntersectDataGen.java | 110 ++++++++++++++-----
.../mapreduce/examples/IntersectExample.java | 87 ++++++++++-----
.../mapreduce/examples/IntersectValidate.java | 105 ++++++++++++------
.../java/org/apache/tez/test/TestTezJobs.java | 48 ++++++++
4 files changed, 262 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e664e76/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index 318c708..d977701 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.tez.mapreduce.examples;
import java.io.ByteArrayInputStream;
@@ -69,18 +87,63 @@ public class IntersectDataGen extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
+ int result = validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+ return execute(otherArgs);
+ }
+
+ public int run(Configuration conf, String[] args, TezSession tezSession) throws Exception {
+ setConf(conf);
+ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ int result = validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+ return execute(otherArgs, tezSession);
+ }
+
+ private int validateArgs(String[] otherArgs) {
if (otherArgs.length != 6) {
printUsage();
return 2;
}
- return execute(otherArgs);
+ return 0;
}
- private int execute(String[] args) throws IOException, TezException, InterruptedException {
+ private int execute(String [] args) throws TezException, IOException, InterruptedException {
+ TezConfiguration tezConf = new TezConfiguration(getConf());
+ TezSession tezSession = null;
+ try {
+ tezSession = createTezSession(tezConf);
+ return execute(args, tezConf, tezSession);
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+ }
+
+ private int execute(String[] args, TezSession tezSession) throws IOException, TezException,
+ InterruptedException {
+ TezConfiguration tezConf = new TezConfiguration(getConf());
+ return execute(args, tezConf, tezSession);
+ }
+
+ private TezSession createTezSession(TezConfiguration tezConf) throws TezException, IOException {
+ AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
+ TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
+ tezConf);
+ TezSession tezSession = new TezSession("IntersectDataGenSession", sessionConfiguration);
+ tezSession.start();
+ return tezSession;
+ }
+
+ private int execute(String[] args, TezConfiguration tezConf, TezSession tezSession)
+ throws IOException, TezException, InterruptedException {
LOG.info("Running IntersectDataGen");
- TezConfiguration tezConf = new TezConfiguration(getConf());
UserGroupInformation.setConfiguration(tezConf);
String outDir1 = args[0];
@@ -115,36 +178,27 @@ public class IntersectDataGen extends Configured implements Tool {
res = checkOutputDirectory(fs, largeOutPath) + checkOutputDirectory(fs, smallOutPath)
+ checkOutputDirectory(fs, expectedOutputPath);
if (res != 0) {
- return 2;
+ return 3;
}
if (numTasks <= 0) {
System.err.println("NumTasks must be > 0");
- return 2;
+ return 4;
}
- AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
- TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
- tezConf);
- TezSession tezSession = new TezSession("IntersectDataGenSession", sessionConfiguration);
- try {
- tezSession.start();
-
- DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
- largeOutSize, smallOutSize);
- setupURIsForCredentials(dag, largeOutPath, smallOutPath, expectedOutputPath);
-
- tezSession.waitTillReady();
- DAGClient dagClient = tezSession.submitDAG(dag);
- DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
- if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
- LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
- return -1;
- }
- return 0;
- } finally {
- tezSession.stop();
+ DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
+ largeOutSize, smallOutSize);
+ setupURIsForCredentials(dag, largeOutPath, smallOutPath, expectedOutputPath);
+
+ tezSession.waitTillReady();
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+ if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+ LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+ return -1;
}
+ return 0;
+
}
private DAG createDag(TezConfiguration tezConf, Path largeOutPath, Path smallOutPath,
@@ -154,7 +208,7 @@ public class IntersectDataGen extends Configured implements Tool {
long largeOutSizePerTask = largeOutSize / numTasks;
long smallOutSizePerTask = smallOutSize / numTasks;
- DAG dag = new DAG("IntersectExample");
+ DAG dag = new DAG("IntersectDataGen");
byte[] streamOutputPayload = createPayloadForOutput(largeOutPath, tezConf);
byte[] hashOutputPayload = createPayloadForOutput(smallOutPath, tezConf);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e664e76/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index aa9b9a2..45885f2 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -96,17 +96,63 @@ public class IntersectExample extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
+ int result = validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+ return execute(otherArgs);
+ }
+
+ public int run(Configuration conf, String[] args, TezSession tezSession) throws Exception {
+ setConf(conf);
+ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ int result = validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+ return execute(otherArgs, tezSession);
+ }
+
+ private int validateArgs(String[] otherArgs) {
if (otherArgs.length != 4) {
printUsage();
return 2;
}
- return execute(otherArgs);
+ return 0;
}
- private int execute(String[] args) throws IOException, TezException, InterruptedException {
- LOG.info("Running IntersectExample");
+ private int execute(String[] args) throws TezException, IOException, InterruptedException {
TezConfiguration tezConf = new TezConfiguration(getConf());
+ TezSession tezSession = null;
+ try {
+ tezSession = createTezSession(tezConf);
+ return execute(args, tezConf, tezSession);
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+ }
+
+ private int execute(String[] args, TezSession tezSession) throws IOException, TezException,
+ InterruptedException {
+ TezConfiguration tezConf = new TezConfiguration(getConf());
+ return execute(args, tezConf, tezSession);
+ }
+
+ private TezSession createTezSession(TezConfiguration tezConf) throws TezException, IOException {
+ AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
+ TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
+ tezConf);
+ TezSession tezSession = new TezSession("IntersectExampleSession", sessionConfiguration);
+ tezSession.start();
+ return tezSession;
+ }
+
+ private int execute(String[] args, TezConfiguration tezConf, TezSession tezSession)
+ throws IOException, TezException, InterruptedException {
+ LOG.info("Running IntersectExample");
+
UserGroupInformation.setConfiguration(tezConf);
String streamInputDir = args[0];
@@ -122,34 +168,25 @@ public class IntersectExample extends Configured implements Tool {
FileSystem fs = FileSystem.get(tezConf);
if (fs.exists(outputPath)) {
System.err.println("Output directory: " + outputDir + " already exists");
- return 2;
+ return 3;
}
if (numPartitions <= 0) {
System.err.println("NumPartitions must be > 0");
- return 2;
+ return 4;
}
- AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
- TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
- tezConf);
- TezSession tezSession = new TezSession("IntersectExampleSession", sessionConfiguration);
- try {
- tezSession.start();
+ DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions);
+ setupURIsForCredentials(dag, streamInputPath, hashInputPath, outputPath);
- DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions);
- setupURIsForCredentials(dag, streamInputPath, hashInputPath, outputPath);
-
- tezSession.waitTillReady();
- DAGClient dagClient = tezSession.submitDAG(dag);
- DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
- if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
- LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
- return -1;
- }
- return 0;
- } finally {
- tezSession.stop();
+ tezSession.waitTillReady();
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+ if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+ LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+ return -1;
}
+ return 0;
+
}
private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e664e76/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index 537b5cd..537eb91 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -97,17 +97,62 @@ public class IntersectValidate extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ int result = validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+ return execute(otherArgs);
+ }
+
+ public int run(Configuration conf, String[] args, TezSession tezSession) throws Exception {
+ setConf(conf);
+ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ int result = validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+ return execute(otherArgs, tezSession);
+ }
+ private int validateArgs(String[] otherArgs) {
if (otherArgs.length != 3 && otherArgs.length != 2) {
printUsage();
return 2;
}
- return execute(otherArgs);
+ return 0;
}
- private int execute(String[] args) throws IOException, TezException, InterruptedException {
- LOG.info("Running IntersectValidate");
+ private int execute(String[] args) throws TezException, IOException, InterruptedException {
TezConfiguration tezConf = new TezConfiguration(getConf());
+ TezSession tezSession = null;
+ try {
+ tezSession = createTezSession(tezConf);
+ return execute(args, tezConf, tezSession);
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+ }
+
+ private int execute(String[] args, TezSession tezSession) throws IOException, TezException,
+ InterruptedException {
+ TezConfiguration tezConf = new TezConfiguration(getConf());
+ return execute(args, tezConf, tezSession);
+ }
+
+ private TezSession createTezSession(TezConfiguration tezConf) throws TezException, IOException {
+ AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
+ TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
+ tezConf);
+ TezSession tezSession = new TezSession("IntersectValidateSession", sessionConfiguration);
+ tezSession.start();
+ return tezSession;
+ }
+
+ private int execute(String[] args, TezConfiguration tezConf, TezSession tezSession)
+ throws IOException, TezException, InterruptedException {
+ LOG.info("Running IntersectValidate");
UserGroupInformation.setConfiguration(tezConf);
String lhsDir = args[0];
@@ -119,47 +164,37 @@ public class IntersectValidate extends Configured implements Tool {
if (numPartitions <= 0) {
System.err.println("NumPartitions must be > 0");
- return 2;
+ return 4;
}
Path lhsPath = new Path(lhsDir);
Path rhsPath = new Path(rhsDir);
- AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
- TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
- tezConf);
- TezSession tezSession = new TezSession("IntersectExampleSession", sessionConfiguration);
- try {
- tezSession.start();
-
- DAG dag = createDag(tezConf, lhsPath, rhsPath, numPartitions);
- setupURIsForCredentials(dag, lhsPath, rhsPath);
-
- tezSession.waitTillReady();
- DAGClient dagClient = tezSession.submitDAG(dag);
- DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
- if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
- LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
- return -1;
+ DAG dag = createDag(tezConf, lhsPath, rhsPath, numPartitions);
+ setupURIsForCredentials(dag, lhsPath, rhsPath);
+
+ tezSession.waitTillReady();
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+ if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+ LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+ return -1;
+ } else {
+ dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+ TezCounter counter = dagStatus.getDAGCounters().findCounter(COUNTER_GROUP_NAME,
+ MISSING_KEY_COUNTER_NAME);
+ if (counter == null) {
+ LOG.info("Unable to determing equality");
+ return -2;
} else {
- dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
- TezCounter counter = dagStatus.getDAGCounters().findCounter(COUNTER_GROUP_NAME,
- MISSING_KEY_COUNTER_NAME);
- if (counter == null) {
- LOG.info("Unable to determing equality");
- return -1;
+ if (counter.getValue() != 0) {
+ LOG.info("Validate failed. The two sides are not equivalent");
+ return -3;
} else {
- if (counter.getValue() != 0) {
- LOG.info("Validate failed. The two sides are not equivalent");
- return -1;
- } else {
- LOG.info("Vlidation successful. The two sides are equivalent");
- return 0;
- }
+ LOG.info("Vlidation successful. The two sides are equivalent");
+ return 0;
}
}
- } finally {
- tezSession.stop();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e664e76/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 94cdfdc..171458f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -61,7 +61,9 @@ import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.examples.ExampleDriver;
+import org.apache.tez.mapreduce.examples.IntersectDataGen;
import org.apache.tez.mapreduce.examples.IntersectExample;
+import org.apache.tez.mapreduce.examples.IntersectValidate;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
import org.junit.AfterClass;
@@ -272,6 +274,52 @@ public class TestTezJobs {
assertEquals(0, expectedResult.size());
}
+ @Test(timeout = 120000)
+ public void testIntersect2() throws Exception {
+
+ Path testDir = new Path("/tmp/testIntersect2");
+ Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+ remoteFs.mkdirs(stagingDirPath);
+ remoteFs.mkdirs(testDir);
+
+ Path dataPath1 = new Path(testDir, "inPath1");
+ Path dataPath2 = new Path(testDir, "inPath2");
+ Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
+ Path outPath = new Path(testDir, "outPath");
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+ AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
+ TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
+ tezConf);
+ TezSession tezSession = null;
+ try {
+ tezSession = new TezSession("IntersectExampleSession", sessionConfiguration);
+ tezSession.start();
+
+ IntersectDataGen dataGen = new IntersectDataGen();
+ String[] dataGenArgs = new String[] {
+ dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+ expectedOutputPath.toString(), "2" };
+ assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezSession));
+
+ IntersectExample intersect = new IntersectExample();
+ String[] intersectArgs = new String[] {
+ dataPath1.toString(), dataPath2.toString(), "2", outPath.toString() };
+ assertEquals(0, intersect.run(tezConf, intersectArgs, tezSession));
+
+ IntersectValidate intersectValidate = new IntersectValidate();
+ String[] intersectValidateArgs = new String[] {
+ expectedOutputPath.toString(), outPath.toString(), "3" };
+ assertEquals(0, intersectValidate.run(tezConf, intersectValidateArgs, tezSession));
+
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+ }
+
@Test
public void testNonDefaultFSStagingDir() throws Exception {
SleepProcessorConfig spConf = new SleepProcessorConfig(1);