You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/05/20 22:56:43 UTC

git commit: TEZ-1139. Add a test for IntersectDataGen and IntersectValidate. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 60905cd25 -> 2e664e76c


TEZ-1139. Add a test for IntersectDataGen and IntersectValidate. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2e664e76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2e664e76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2e664e76

Branch: refs/heads/master
Commit: 2e664e76cfe07bed6cc024ec474c6135f681919a
Parents: 60905cd
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 20 13:56:24 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 20 13:56:24 2014 -0700

----------------------------------------------------------------------
 .../mapreduce/examples/IntersectDataGen.java    | 110 ++++++++++++++-----
 .../mapreduce/examples/IntersectExample.java    |  87 ++++++++++-----
 .../mapreduce/examples/IntersectValidate.java   | 105 ++++++++++++------
 .../java/org/apache/tez/test/TestTezJobs.java   |  48 ++++++++
 4 files changed, 262 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e664e76/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index 318c708..d977701 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.mapreduce.examples;
 
 import java.io.ByteArrayInputStream;
@@ -69,18 +87,63 @@ public class IntersectDataGen extends Configured implements Tool {
   public int run(String[] args) throws Exception {
     Configuration conf = getConf();
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs);
+  }
+  
+  public int run(Configuration conf, String[] args, TezSession tezSession) throws Exception {
+    setConf(conf);
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs, tezSession);
+  }
+  
+  private int validateArgs(String[] otherArgs) {
     if (otherArgs.length != 6) {
       printUsage();
       return 2;
     }
-    return execute(otherArgs);
+    return 0;
   }
 
-  private int execute(String[] args) throws IOException, TezException, InterruptedException {
+  private int execute(String [] args) throws TezException, IOException, InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    TezSession tezSession = null;
+    try {
+      tezSession = createTezSession(tezConf);
+      return execute(args, tezConf, tezSession);
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+  }
+  
+  private int execute(String[] args, TezSession tezSession) throws IOException, TezException,
+      InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    return execute(args, tezConf, tezSession);
+  }
+  
+  private TezSession createTezSession(TezConfiguration tezConf) throws TezException, IOException {
+    AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
+    TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
+        tezConf);
+    TezSession tezSession = new TezSession("IntersectDataGenSession", sessionConfiguration);
+    tezSession.start();
+    return tezSession;
+  }
+  
+  private int execute(String[] args, TezConfiguration tezConf, TezSession tezSession)
+      throws IOException, TezException, InterruptedException {
     LOG.info("Running IntersectDataGen");
 
-    TezConfiguration tezConf = new TezConfiguration(getConf());
     UserGroupInformation.setConfiguration(tezConf);
 
     String outDir1 = args[0];
@@ -115,36 +178,27 @@ public class IntersectDataGen extends Configured implements Tool {
     res = checkOutputDirectory(fs, largeOutPath) + checkOutputDirectory(fs, smallOutPath)
         + checkOutputDirectory(fs, expectedOutputPath);
     if (res != 0) {
-      return 2;
+      return 3;
     }
 
     if (numTasks <= 0) {
       System.err.println("NumTasks must be > 0");
-      return 2;
+      return 4;
     }
 
-    AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
-    TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
-        tezConf);
-    TezSession tezSession = new TezSession("IntersectDataGenSession", sessionConfiguration);
-    try {
-      tezSession.start();
-
-      DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
-          largeOutSize, smallOutSize);
-      setupURIsForCredentials(dag, largeOutPath, smallOutPath, expectedOutputPath);
-
-      tezSession.waitTillReady();
-      DAGClient dagClient = tezSession.submitDAG(dag);
-      DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
-      if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-        LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-        return -1;
-      }
-      return 0;
-    } finally {
-      tezSession.stop();
+    DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
+        largeOutSize, smallOutSize);
+    setupURIsForCredentials(dag, largeOutPath, smallOutPath, expectedOutputPath);
+
+    tezSession.waitTillReady();
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+      return -1;
     }
+    return 0;
+
   }
 
   private DAG createDag(TezConfiguration tezConf, Path largeOutPath, Path smallOutPath,
@@ -154,7 +208,7 @@ public class IntersectDataGen extends Configured implements Tool {
     long largeOutSizePerTask = largeOutSize / numTasks;
     long smallOutSizePerTask = smallOutSize / numTasks;
 
-    DAG dag = new DAG("IntersectExample");
+    DAG dag = new DAG("IntersectDataGen");
 
     byte[] streamOutputPayload = createPayloadForOutput(largeOutPath, tezConf);
     byte[] hashOutputPayload = createPayloadForOutput(smallOutPath, tezConf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e664e76/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index aa9b9a2..45885f2 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -96,17 +96,63 @@ public class IntersectExample extends Configured implements Tool {
   public int run(String[] args) throws Exception {
     Configuration conf = getConf();
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs);
+  }
+  
+  public int run(Configuration conf, String[] args, TezSession tezSession) throws Exception {
+    setConf(conf);
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs, tezSession);
+  }
+  
+  private int validateArgs(String[] otherArgs) {
     if (otherArgs.length != 4) {
       printUsage();
       return 2;
     }
-    return execute(otherArgs);
+    return 0;
   }
 
-  private int execute(String[] args) throws IOException, TezException, InterruptedException {
-    LOG.info("Running IntersectExample");
+  private int execute(String[] args) throws TezException, IOException, InterruptedException {
     TezConfiguration tezConf = new TezConfiguration(getConf());
+    TezSession tezSession = null;
+    try {
+      tezSession = createTezSession(tezConf);
+      return execute(args, tezConf, tezSession);
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+  }
+  
+  private int execute(String[] args, TezSession tezSession) throws IOException, TezException,
+      InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    return execute(args, tezConf, tezSession);
+  }
+  
+  private TezSession createTezSession(TezConfiguration tezConf) throws TezException, IOException {
+    AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
+    TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
+        tezConf);
+    TezSession tezSession = new TezSession("IntersectExampleSession", sessionConfiguration);
+    tezSession.start();
+    return tezSession;
+  }
+  
+  private int execute(String[] args, TezConfiguration tezConf, TezSession tezSession)
+      throws IOException, TezException, InterruptedException {
+    LOG.info("Running IntersectExample");
+
     UserGroupInformation.setConfiguration(tezConf);
 
     String streamInputDir = args[0];
@@ -122,34 +168,25 @@ public class IntersectExample extends Configured implements Tool {
     FileSystem fs = FileSystem.get(tezConf);
     if (fs.exists(outputPath)) {
       System.err.println("Output directory: " + outputDir + " already exists");
-      return 2;
+      return 3;
     }
     if (numPartitions <= 0) {
       System.err.println("NumPartitions must be > 0");
-      return 2;
+      return 4;
     }
 
-    AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
-    TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
-        tezConf);
-    TezSession tezSession = new TezSession("IntersectExampleSession", sessionConfiguration);
-    try {
-      tezSession.start();
+    DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions);
+    setupURIsForCredentials(dag, streamInputPath, hashInputPath, outputPath);
 
-      DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions);
-      setupURIsForCredentials(dag, streamInputPath, hashInputPath, outputPath);
-
-      tezSession.waitTillReady();
-      DAGClient dagClient = tezSession.submitDAG(dag);
-      DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
-      if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-        LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-        return -1;
-      }
-      return 0;
-    } finally {
-      tezSession.stop();
+    tezSession.waitTillReady();
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+      return -1;
     }
+    return 0;
+
   }
 
   private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e664e76/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index 537b5cd..537eb91 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -97,17 +97,62 @@ public class IntersectValidate extends Configured implements Tool {
   public int run(String[] args) throws Exception {
     Configuration conf = getConf();
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs);
+  }
+  
+  public int run(Configuration conf, String[] args, TezSession tezSession) throws Exception {
+    setConf(conf);
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs, tezSession);
+  } 
 
+  private int validateArgs(String[] otherArgs) {
     if (otherArgs.length != 3 && otherArgs.length != 2) {
       printUsage();
       return 2;
     }
-    return execute(otherArgs);
+    return 0;
   }
 
-  private int execute(String[] args) throws IOException, TezException, InterruptedException {
-    LOG.info("Running IntersectValidate");
+  private int execute(String[] args) throws TezException, IOException, InterruptedException {
     TezConfiguration tezConf = new TezConfiguration(getConf());
+    TezSession tezSession = null;
+    try {
+      tezSession = createTezSession(tezConf);
+      return execute(args, tezConf, tezSession);
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+  }
+  
+  private int execute(String[] args, TezSession tezSession) throws IOException, TezException,
+      InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    return execute(args, tezConf, tezSession);
+  }
+  
+  private TezSession createTezSession(TezConfiguration tezConf) throws TezException, IOException {
+    AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
+    TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
+        tezConf);
+    TezSession tezSession = new TezSession("IntersectValidateSession", sessionConfiguration);
+    tezSession.start();
+    return tezSession;
+  }
+
+  private int execute(String[] args, TezConfiguration tezConf, TezSession tezSession)
+      throws IOException, TezException, InterruptedException {
+    LOG.info("Running IntersectValidate");
     UserGroupInformation.setConfiguration(tezConf);
 
     String lhsDir = args[0];
@@ -119,47 +164,37 @@ public class IntersectValidate extends Configured implements Tool {
 
     if (numPartitions <= 0) {
       System.err.println("NumPartitions must be > 0");
-      return 2;
+      return 4;
     }
 
     Path lhsPath = new Path(lhsDir);
     Path rhsPath = new Path(rhsDir);
 
-    AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
-    TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
-        tezConf);
-    TezSession tezSession = new TezSession("IntersectExampleSession", sessionConfiguration);
-    try {
-      tezSession.start();
-
-      DAG dag = createDag(tezConf, lhsPath, rhsPath, numPartitions);
-      setupURIsForCredentials(dag, lhsPath, rhsPath);
-
-      tezSession.waitTillReady();
-      DAGClient dagClient = tezSession.submitDAG(dag);
-      DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
-      if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-        LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-        return -1;
+    DAG dag = createDag(tezConf, lhsPath, rhsPath, numPartitions);
+    setupURIsForCredentials(dag, lhsPath, rhsPath);
+
+    tezSession.waitTillReady();
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+      return -1;
+    } else {
+      dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+      TezCounter counter = dagStatus.getDAGCounters().findCounter(COUNTER_GROUP_NAME,
+          MISSING_KEY_COUNTER_NAME);
+      if (counter == null) {
+        LOG.info("Unable to determing equality");
+        return -2;
       } else {
-        dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-        TezCounter counter = dagStatus.getDAGCounters().findCounter(COUNTER_GROUP_NAME,
-            MISSING_KEY_COUNTER_NAME);
-        if (counter == null) {
-          LOG.info("Unable to determing equality");
-          return -1;
+        if (counter.getValue() != 0) {
+          LOG.info("Validate failed. The two sides are not equivalent");
+          return -3;
         } else {
-          if (counter.getValue() != 0) {
-            LOG.info("Validate failed. The two sides are not equivalent");
-            return -1;
-          } else {
-            LOG.info("Vlidation successful. The two sides are equivalent");
-            return 0;
-          }
+          LOG.info("Vlidation successful. The two sides are equivalent");
+          return 0;
         }
       }
-    } finally {
-      tezSession.stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e664e76/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 94cdfdc..171458f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -61,7 +61,9 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.mapreduce.examples.ExampleDriver;
+import org.apache.tez.mapreduce.examples.IntersectDataGen;
 import org.apache.tez.mapreduce.examples.IntersectExample;
+import org.apache.tez.mapreduce.examples.IntersectValidate;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
 import org.junit.AfterClass;
@@ -272,6 +274,52 @@ public class TestTezJobs {
     assertEquals(0, expectedResult.size());
   }
 
+  @Test(timeout = 120000)
+  public void testIntersect2() throws Exception {
+
+    Path testDir = new Path("/tmp/testIntersect2");
+    Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+    remoteFs.mkdirs(stagingDirPath);
+    remoteFs.mkdirs(testDir);
+
+    Path dataPath1 = new Path(testDir, "inPath1");
+    Path dataPath2 = new Path(testDir, "inPath2");
+    Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
+    Path outPath = new Path(testDir, "outPath");
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+    AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
+    TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
+        tezConf);
+    TezSession tezSession = null;
+    try {
+      tezSession = new TezSession("IntersectExampleSession", sessionConfiguration);
+      tezSession.start();
+
+      IntersectDataGen dataGen = new IntersectDataGen();
+      String[] dataGenArgs = new String[] {
+          dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+          expectedOutputPath.toString(), "2" };
+      assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezSession));
+
+      IntersectExample intersect = new IntersectExample();
+      String[] intersectArgs = new String[] {
+          dataPath1.toString(), dataPath2.toString(), "2", outPath.toString() };
+      assertEquals(0, intersect.run(tezConf, intersectArgs, tezSession));
+
+      IntersectValidate intersectValidate = new IntersectValidate();
+      String[] intersectValidateArgs = new String[] {
+          expectedOutputPath.toString(), outPath.toString(), "3" };
+      assertEquals(0, intersectValidate.run(tezConf, intersectValidateArgs, tezSession));
+
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+  }
+  
   @Test
   public void testNonDefaultFSStagingDir() throws Exception {
     SleepProcessorConfig spConf = new SleepProcessorConfig(1);