You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/03/02 02:08:55 UTC

svn commit: r917830 - in /hadoop/pig/branches/branch-0.6/contrib/zebra: CHANGES.txt build.xml src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java src/test/smoke/ src/test/smoke/zebra_smoke_run.pl

Author: yanz
Date: Tue Mar  2 01:08:55 2010
New Revision: 917830

URL: http://svn.apache.org/viewvc?rev=917830&view=rev
Log:
PIG-1164 Addition of smoke tests (gauravj via yanz)

Added:
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/zebra_smoke_run.pl
Modified:
    hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
    hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt?rev=917830&r1=917829&r2=917830&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt Tue Mar  2 01:08:55 2010
@@ -4,6 +4,8 @@
 
   INCOMPATIBLE CHANGES
 
+    PIG-1164 Addition of smoke tests (gauravj via yanz)
+
   IMPROVEMENTS
 
     PIG-1206 Storing descendingly sorted PIG table as unsorted table (yanz)

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml?rev=917830&r1=917829&r2=917830&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml Tue Mar  2 01:08:55 2010
@@ -183,6 +183,9 @@
           <not>
             <filename name="**/TestColumnSecurity.java"/>
           </not>
+          <not>
+            <filename name="**/TestSmoke*.java"/>
+          </not>
 
         </fileset> 
       </batchtest>
@@ -292,4 +295,30 @@
          	</current>
        </clover-report>
   </target> 
+  <target name="smoke-jar" depends="compile,test">
+      <jar destfile="${build.test}/zebra_smoke.jar"
+	       basedir="${build.test}"
+		   includes="**/TestSmokeMR*.class, **/TestTableLoaderP*.class"
+	  />
+  </target>
+  <target name="package-tests" depends="smoke-jar">
+    <tar longfile="gnu" destfile="${build.test}/zebra_smoke.tar">
+    <tarfileset dir="${src.test}/smoke"
+             fullpath="bin/zebra_smoke_run.pl"
+             preserveLeadingSlashes="true">
+      <include name="zebra_smoke_run.pl"/>
+    </tarfileset>
+    <tarfileset dir="${build.test}/../../../ivy/lib/Pig"
+             fullpath="lib/junit-4.5.jar"
+             preserveLeadingSlashes="true">
+      <include name="junit-4.5.jar"/>
+    </tarfileset>
+    <tarfileset dir="${build.test}"
+             fullpath="lib/zebra_smoke.jar"
+             preserveLeadingSlashes="true">
+      <include name="zebra_smoke.jar"/>
+    </tarfileset>
+   </tar>
+  </target>
 </project>
+

Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java?rev=917830&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java Tue Mar  2 01:08:55 2010
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapred.TestBasicTableIOFormatLocalFS.InvIndex;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.ZebraTuple;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ * 
+ * Assume the input files contain rows of word and count, separated by a space:
+ * 
+ * <pre>
+ * this 2
+ * is 1
+ * a 4 
+ * test 2 
+ * hello 1 
+ * world 3
+ * </pre>
+ * 
+ */
+public class TestSmokeMR extends Configured implements Tool{
+  static String inputPath;
+  static String outputPath;
+  static String inputFileName = "smoke.txt";
+  static String outputTableName ="smokeTable";
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Configuration conf;
+  public static String sortKey = null;
+
+  private static FileSystem fs;
+
+  private static String zebraJar;
+  private static String whichCluster;
+ 
+	static class MapClass implements
+      Mapper<LongWritable, Text, BytesWritable, Tuple> {
+    private BytesWritable bytesKey;
+    private Tuple tupleRow;
+    private Object javaObj;
+
+    @Override
+    public void map(LongWritable key, Text value,
+        OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+        throws IOException {
+        // value should contain "word count"
+        String[] wdct = value.toString().split(" ");
+        if (wdct.length != 2) {
+          // LOG the error
+          return;
+        }
+
+        byte[] word = wdct[0].getBytes();
+        bytesKey.set(word, 0, word.length);
+        tupleRow.set(0, new String(word));
+        tupleRow.set(1, Integer.parseInt(wdct[1]));
+
+        // This key has to be created by user
+        Tuple userKey = new ZebraTuple();
+        userKey.append(new String(word));
+        userKey.append(Integer.parseInt(wdct[1]));
+        try {
+
+            /* New M/R Interface */
+            /* Converts user key to zebra BytesWritable key */
+            /* using sort key expr tree  */
+            /* Returns a java base object */
+            /* Done for each user key */
+        	
+          bytesKey = BasicTableOutputFormat.getSortKey(javaObj, userKey);
+        } catch(Exception e) {
+        	
+        }
+
+        output.collect(bytesKey, tupleRow);
+    }
+
+    @Override
+    public void configure(JobConf job) {
+      bytesKey = new BytesWritable();
+      try {
+        Schema outSchema = BasicTableOutputFormat.getSchema(job);
+        tupleRow = TypesUtils.createTuple(outSchema);
+        
+        /* New M/R Interface */
+        /* returns an expression tree for sort keys */
+        /* Returns a java base object */
+        /* Done once per table */
+        javaObj = BasicTableOutputFormat.getSortKeyGenerator(job);
+        
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (ParseException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      // no-op
+    }
+  }
+
+  static class ReduceClass implements
+    Reducer<BytesWritable, Tuple, BytesWritable, Tuple> {
+      Tuple outRow;
+     
+
+    @Override
+    public void configure(JobConf job) {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }	
+    public void reduce(BytesWritable key, Iterator<Tuple> values,
+      OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
+   	    throws IOException {
+    	  try {
+    		for(; values.hasNext();)  {
+    	      output.collect(key, values.next());
+    		}  
+    	  } catch (ExecException e) {
+    	    e.printStackTrace();
+    	  }
+    }
+  
+  }  
+  
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    if (System.getenv("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (System.getProperty("whichCluster") == null) {
+      System.setProperty("whichCluster", "realCluster");
+      System.out.println("should be called");
+      whichCluster = System.getProperty("whichCluster");
+    } else {
+      whichCluster = System.getProperty("whichCluster");
+    }
+
+    System.out.println("clusterddddd: " + whichCluster);
+    System.out.println(" get env hadoop home: " + System.getenv("HADOOP_HOME"));
+    System.out.println(" get env user name: " + System.getenv("USER"));
+    if ((whichCluster.equalsIgnoreCase("realCluster") && System
+        .getenv("HADOOP_HOME") == null)) {
+      System.out.println("Please set HADOOP_HOME");
+      System.exit(0);
+    }
+
+    if ( conf == null ) {
+      conf = new Configuration();
+    }
+
+    if ((whichCluster.equalsIgnoreCase("realCluster") && System.getenv("USER") == null)) {
+      System.out.println("Please set USER");
+      System.exit(0);
+    }
+//    zebraJar = System.getenv("HADOOP_HOME") + "/lib/zebra.jar";
+    zebraJar = System.getenv("ZEBRA_JAR");
+    File file = new File(zebraJar);
+    if (!file.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
+      System.out.println("Please put zebra.jar at hadoop_home/lib");
+   //   System.exit(0);
+    }
+
+    // set inputPath and outPath
+    String workingDir = null;
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      inputPath = new String("/user/" + System.getenv("USER") + "/"
+          + inputFileName);
+      System.out.println("inputPath: " + inputPath);
+      outputPath =  new String("/user/" + System.getenv("USER") + "/" +outputTableName);
+      fs = new Path(inputPath).getFileSystem(conf);
+
+    } else {
+      RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+      fs = new LocalFileSystem(rawLFS);
+      workingDir = fs.getWorkingDirectory().toString().split(":")[1];
+      inputPath = new String(workingDir + "/" + inputFileName);
+      outputPath = new String(workingDir + "/" + outputTableName);
+      System.out.println("inputPath: " + inputPath);
+ 
+    }
+    writeToFile(inputPath);
+    // check inputPath existence
+    File inputFile = new File(inputPath);
+    if (!inputFile.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
+      System.out.println("Please put inputFile in hdfs: " + inputPath);
+      // System.exit(0);
+    }
+    if (!inputFile.exists() && whichCluster.equalsIgnoreCase("miniCluster")) {
+      System.out
+          .println("Please put inputFile under workingdir. working dir is : "
+              + workingDir);
+      System.exit(0);
+    }
+
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+          .toProperties(conf));
+      pigServer.registerJar(zebraJar);
+
+    }
+
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      if (execType == ExecType.MAPREDUCE) {
+        cluster = MiniCluster.buildCluster();
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        fs = cluster.getFileSystem();
+
+      } else {
+        pigServer = new PigServer(ExecType.LOCAL);
+      }
+    }
+  }
+
+  public static void writeToFile(String inputFile) throws IOException {
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      FileWriter fstream = new FileWriter(inputFile);
+      BufferedWriter out = new BufferedWriter(fstream);
+      out.write("us 2\n");
+      out.write("japan 2\n");
+      out.write("india 4\n");
+      out.write("us 2\n");
+      out.write("japan 1\n");
+      out.write("india 3\n");
+      out.write("nouse 5\n");
+      out.write("nowhere 4\n");
+      out.close();
+    }
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      FSDataOutputStream fout = fs.create(new Path(inputFile));
+      fout.writeBytes("us 2\n");
+      fout.writeBytes("japan 2\n");
+      fout.writeBytes("india 4\n");
+      fout.writeBytes("us 2\n");
+      fout.writeBytes("japan 1\n");
+      fout.writeBytes("india 3\n");
+      fout.writeBytes("nouse 5\n");
+      fout.writeBytes("nowhere 4\n");
+      fout.close();
+    }
+  }
+  public static void checkTableExists(boolean expected, String strDir)
+      throws IOException {
+
+    File theDir = null;
+    boolean actual = false;
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      theDir = new File(strDir.split(":")[1]);
+      actual = theDir.exists();
+
+    }
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      theDir = new File(strDir.split(":")[0]);
+      actual = fs.exists(new Path(theDir.toString()));
+    }
+    System.out.println("the dir : " + theDir.toString());
+
+    if (actual != expected) {
+      Assert.fail("dir exists or not is different from what expected.");
+    }
+  }
+  public static void removeDir(Path outPath) throws IOException {
+    String command = null;
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr "
+          + outPath.toString();
+    } else {
+      StringTokenizer st = new StringTokenizer(outPath.toString(), ":");
+      int count = 0;
+      String file = null;
+      while (st.hasMoreElements()) {
+        count++;
+        String token = st.nextElement().toString();
+        if (count == 2)
+          file = token;
+      }
+      command = "rm -rf " + file;
+    }
+    Runtime runtime = Runtime.getRuntime();
+    Process proc = runtime.exec(command);
+    int exitVal = -1;
+    try {
+      exitVal = proc.waitFor();
+    } catch (InterruptedException e) {
+      System.err.println(e);
+    }
+
+  }
+  public static void main(String[] args) throws ParseException, IOException, Exception {
+    int res = ToolRunner.run(new Configuration(), new TestSmokeMR(), args);
+    System.out.println("res: "+res);
+    checkTableExists(true, outputPath);
+    if (res == 0) {
+      System.out.println("TEST PASSED!");
+    } else { 
+       System.out.println("TEST FAILED");
+       throw new IOException("Zebra MR Smoke Test Failed");
+    }   
+    
+  }
+
+  @Override
+  public int run(String[] arg0) throws Exception {
+    conf = getConf();
+    TestSmokeMR.setUpOnce();
+    removeDir(new Path(outputPath));
+    JobConf jobConf = new JobConf(conf,TestSmokeMR.class);
+
+    jobConf.setJobName("TableMRSortedTableZebraKeyGenerator");
+    jobConf.set("table.output.tfile.compression", "gz");
+    jobConf.setJarByClass(TestSmokeMR.class);
+    // input settings
+    jobConf.setInputFormat(TextInputFormat.class);
+    jobConf.setMapperClass(TestSmokeMR.MapClass.class);
+    jobConf.setReducerClass(TestSmokeMR.ReduceClass.class);
+    jobConf.setMapOutputKeyClass(BytesWritable.class);
+    jobConf.setMapOutputValueClass(ZebraTuple.class);
+    FileInputFormat.setInputPaths(jobConf, inputPath);
+    jobConf.setNumMapTasks(1);
+
+    // output settings
+   jobConf.setOutputFormat(BasicTableOutputFormat.class);
+    BasicTableOutputFormat.setOutputPath(jobConf, new Path(outputPath));
+    // set the logical schema with 2 columns
+    BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int");
+    // for demo purposes, create 2 physical column groups
+    BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]");
+    
+    /* New M/R Interface */
+    /* Set sort columns in a comma separated string */
+    /* Each sort column should belong to schema columns */
+    BasicTableOutputFormat.setSortInfo(jobConf, "word, count");
+
+    // set map-only job.
+    jobConf.setNumReduceTasks(1);
+    JobClient.runJob(jobConf);
+    BasicTableOutputFormat.close(jobConf);
+    return 0;
+  }
+}

Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/zebra_smoke_run.pl
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/zebra_smoke_run.pl?rev=917830&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/zebra_smoke_run.pl (added)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/zebra_smoke_run.pl Tue Mar  2 01:08:55 2010
@@ -0,0 +1,76 @@
+#!/usr/local/bin/perl
+use File::Basename;
+use Getopt::Long;
+if (@ARGV > 0 ){
+  GetOptions( \%options,
+  'cmd:s',
+  'smoke:s',
+  'help:i'
+  );
+}
+if (defined $options{'help'} or !defined $options{'smoke'}){
+  print ("perl run.pl --smoke\n");
+  print ("***************Enviroment Variables Needed***************\n");
+  print ("HADOOP_HOME\n");
+  print ("USER\n");
+  print ("ZEBRA_JAR\n");
+  print ("PIG_JAR\n");
+  print ("ZEBRA_SMOKE_JUNIT_JAR\n");
+  print ("CLASSPATH\n");
+  print ("HADOOP_CLASSPATH\n");
+  print ("ZEBRA_SMOKE_PIG_CLASS\n");
+  print ("ZEBRA_SMOKE_MAPRED_CLASS\n");
+  exit;
+}
+
+#make an output directory
+$zebraqa= $ENV{ZEBRA_SMOKE_DIR};
+mkdir("$zebraqa/output");
+$logfile="$zebraqa/output/zebra.out";
+
+if (defined $options{'smoke'}){
+  $my_hadoop_home=$ENV{HADOOP_HOME};
+  defined($my_hadoop_home) || die("HADOOP_HOME not defined");
+  $my_user=$ENV{USER};
+  defined($my_user) || die("USER not defined");
+  $my_zebra_jar=$ENV{ZEBRA_JAR};
+  defined($my_zebra_jar) || die("ZEBRA_JAR not defined");
+  $my_pig_jar=$ENV{PIG_JAR};
+  defined($my_pig_jar) || die("PIG_JAR not defined");
+  $my_junit_jar=$ENV{ZEBRA_SMOKE_JUNIT_JAR};
+  defined($my_junit_jar) || die("ZEBRA_SMOKE_JUNIT_JAR not defined");
+  $my_classpath=$ENV{CLASSPATH};
+  defined($my_classpath) || die("CLASSPATH not defined");
+  $my_smoke_pig_class=$ENV{ZEBRA_SMOKE_PIG_CLASS};
+  defined($my_smoke_pig_class) || die("ZEBRA_SMOKE_PIG_CLASS not defined");
+  $my_mapred_class=$ENV{ZEBRA_SMOKE_MAPRED_CLASS};
+  defined($my_mapred_class) || die("ZEBRA_SMOKE_MAPRED_CLASS not defined");
+  $my_hadoop_classpath=$ENV{HADOOP_CLASSPATH};
+  defined($my_hadoop_classpath) || die("HADOOP_CLASSPATH not defined");
+	
+  #execute pig job
+  write_to_log("..... STARTING ZEBRA PIG JOB TESTING .....\n");
+  $cmd="java -cp $my_classpath -DwhichCluster=\"realCluster\" -DHADOOP_HOME=$my_hadoop_home -DUSER=$my_user org.junit.runner.JUnitCore $my_smoke_pig_class > $zebraqa/output/zebra.out 2>&1";
+  exec_cmd($cmd);
+
+  #execute mapred job
+  write_to_log("..... STARTING ZEBRA MAP REDUCE JOB TESTING .....\n");	
+  $cmd="$my_hadoop_home/bin/hadoop jar $zebraqa/lib/zebra_smoke.jar $my_mapred_class -libjars $my_pig_jar,$my_zebra_jar,$my_junit_jar >> $zebraqa/output/zebra.out 2>&1";
+  exec_cmd($cmd);
+}
+
+sub exec_cmd {
+  my $cmd = shift or die "exec_cmd: Command not supplied!\n";
+  print ($cmd."\n");
+  my $rc = system($cmd);
+  !$rc or die "ERROR($rc): command failed: $cmd\n";
+  return $rc;
+}
+sub write_to_log {
+  ($whatToWrite,$ignore) = @_;
+  open(MYLOG, ">>$logfile") or die "can't open $logfile : $!\n";
+  print MYLOG $whatToWrite."\n";
+  close MYLOG;
+}
+
+