You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/04/16 01:13:10 UTC

svn commit: r1468268 [1/3] - in /pig/trunk: ./ test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/ test/perf/ test/perf/pigmix/ test/perf/pigmix/bin/ test/perf/pigmix/conf/ test/perf/pigmix/src/ test/perf/pigmix/src/java/ test/perf/pigmix/src/jav...

Author: daijy
Date: Mon Apr 15 23:13:09 2013
New Revision: 1468268

URL: http://svn.apache.org/r1468268
Log:
PIG-200: Pig Performance Benchmarks

Added:
    pig/trunk/test/perf/
    pig/trunk/test/perf/pigmix/
    pig/trunk/test/perf/pigmix/bin/
    pig/trunk/test/perf/pigmix/bin/generate_data.sh
    pig/trunk/test/perf/pigmix/bin/runpigmix.pl
    pig/trunk/test/perf/pigmix/build.xml
    pig/trunk/test/perf/pigmix/conf/
    pig/trunk/test/perf/pigmix/conf/config.sh
    pig/trunk/test/perf/pigmix/src/
    pig/trunk/test/perf/pigmix/src/java/
    pig/trunk/test/perf/pigmix/src/java/org/
    pig/trunk/test/perf/pigmix/src/java/org/apache/
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/DataGenerator.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L1.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L10.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L11.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L12.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L13.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L14.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L15.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L16.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L17.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L2.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L3.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L4.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L5.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L6.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L7.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L8.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L9.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/Library.java
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/
    pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/PigPerformanceLoader.java
    pig/trunk/test/perf/pigmix/src/pig/
    pig/trunk/test/perf/pigmix/src/pig/L1.pig
    pig/trunk/test/perf/pigmix/src/pig/L10.pig
    pig/trunk/test/perf/pigmix/src/pig/L11.pig
    pig/trunk/test/perf/pigmix/src/pig/L12.pig
    pig/trunk/test/perf/pigmix/src/pig/L13.pig
    pig/trunk/test/perf/pigmix/src/pig/L14.pig
    pig/trunk/test/perf/pigmix/src/pig/L15.pig
    pig/trunk/test/perf/pigmix/src/pig/L16.pig
    pig/trunk/test/perf/pigmix/src/pig/L17.pig
    pig/trunk/test/perf/pigmix/src/pig/L2.pig
    pig/trunk/test/perf/pigmix/src/pig/L3.pig
    pig/trunk/test/perf/pigmix/src/pig/L4.pig
    pig/trunk/test/perf/pigmix/src/pig/L5.pig
    pig/trunk/test/perf/pigmix/src/pig/L6.pig
    pig/trunk/test/perf/pigmix/src/pig/L7.pig
    pig/trunk/test/perf/pigmix/src/pig/L8.pig
    pig/trunk/test/perf/pigmix/src/pig/L9.pig
Removed:
    pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1468268&r1=1468267&r2=1468268&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 15 23:13:09 2013
@@ -28,6 +28,8 @@ PIG-3174:  Remove rpm and deb artifacts 
 
 IMPROVEMENTS
 
+PIG-200: Pig Performance Benchmarks (daijy)
+
 PIG-3261: User set PIG_CLASSPATH entries must be prepended to the CLASSPATH, not 
 appended (qwertymaniac via daijy)
 

Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1468268&r1=1468267&r2=1468268&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Mon Apr 15 23:13:09 2013
@@ -115,6 +115,9 @@
     <!-- e2e test properties -->
     <property name="test.e2e.dir" value="${basedir}/test/e2e/pig"/>
 
+    <!-- pigmix properties -->
+    <property name="pigmix.dir" value="${basedir}/test/perf/pigmix"/>
+
     <!-- parser properties -->
     <property name="src.gen.query.parser.dir" value="${src.gen.dir}/org/apache/pig/impl/logicalLayer/parser" />
     <property name="src.gen.script.parser.dir" value="${src.gen.dir}/org/apache/pig/tools/pigscript/parser" />
@@ -518,7 +521,7 @@
         <echo>*** To compile with all warnings enabled, supply -Dall.warnings=1 on command line ***</echo>
         <echo>*** Else, you will only be warned about deprecations ***</echo>
         <compileSources sources="${test.src.dir};${src.shims.test.dir}"
-            excludes="**/PigTestLoader.java **/resources/**" dist="${test.build.classes}" cp="test.classpath" warnings="${javac.args.warnings}" />
+            excludes="**/PigTestLoader.java **/resources/** perf/**" dist="${test.build.classes}" cp="test.classpath" warnings="${javac.args.warnings}" />
         <copy file="${basedir}/test/hbase-site.xml" tofile="${test.build.classes}/hbase-site.xml"/>
 
         <ivy:cachepath pathid="mr-apps-test-ivy.classpath" conf="test" />
@@ -885,6 +888,14 @@
         <ant dir="${test.e2e.dir}" target="deploy-local"/>
     </target>
 
+    <target name="pigmix-deploy" depends="jar, jar-withouthadoop" description="deploy end-to-end tests to existing cluster">
+        <ant dir="${pigmix.dir}" target="deploy"/>
+    </target>
+
+    <target name="pigmix" depends="jar, jar-withouthadoop, piggybank" description="run end-to-end tests">
+        <ant dir="${pigmix.dir}" target="test"/>
+    </target>
+
     <!-- ================================================================== -->
     <!-- Pigunit                                                            -->
     <!-- ================================================================== -->

Added: pig/trunk/test/perf/pigmix/bin/generate_data.sh
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/bin/generate_data.sh?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/bin/generate_data.sh (added)
+++ pig/trunk/test/perf/pigmix/bin/generate_data.sh Mon Apr 15 23:13:09 2013
@@ -0,0 +1,240 @@
+#!/bin/sh
+
+if [[ -z $PIG_HOME ]] 
+then
+    echo "Please set PIG_HOME environment variable to where pig-withouthadoop.jar located."
+    exit 1
+fi
+
+if [[ -z $PIG_BIN ]]
+then
+    echo "Please set PIG_BIN environment variable to the pig script (usually $PIG_HOME/bin/pig)"
+    exit 1
+fi
+
+if [[ -z $PIGMIX_HOME ]]
+then
+    export PIGMIX_HOME=$PIG_HOME/test/perf/pigmix
+fi
+
+if [[ -z $HADOOP_HOME ]]
+then
+    echo "Please set HADOOP_HOME environment variable, make sure $HADOOP_HOME/bin/hadoop exists"
+    exit 1
+fi
+
+source $PIGMIX_HOME/conf/config.sh
+
+pigjar=$PIG_HOME/pig-withouthadoop.jar
+pigmixjar=$PIGMIX_HOME/pigmix.jar
+
+classpath=$pigjar:$pigmixjar
+
+export HADOOP_CLASSPATH=$classpath
+
+export PIG_OPTS="-Xmx1024m"
+export HADOOP_CLIENT_OPTS="-Xmx1024m"
+
+echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
+
+$HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot
+
+if [ $? -ne 0 ]
+then
+    echo "Fail to run $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot, perhaps the destination already exist, if so, you need to run:"
+    echo "$HADOOP_HOME/bin/hadoop fs -rmr $hdfsroot"
+    exit 1
+fi
+
+mainclass=org.apache.pig.test.pigmix.datagen.DataGenerator 
+
+user_field=s:20:1600000:z:7
+action_field=i:1:2:u:0
+os_field=i:1:20:z:0
+query_term_field=s:10:1800000:z:20
+ip_addr_field=l:1:1000000:z:0
+timestamp_field=l:1:86400:z:0
+estimated_revenue_field=d:1:100000:z:5
+page_info_field=m:10:1:z:0
+page_links_field=bm:10:1:z:20
+
+echo "Generating $pages"
+
+pages=$hdfsroot/page_views
+
+$HADOOP_HOME/bin/hadoop jar $pigmixjar $mainclass \
+    -m $mappers -r $rows -f $pages $user_field \
+    $action_field $os_field $query_term_field $ip_addr_field \
+    $timestamp_field $estimated_revenue_field $page_info_field \
+    $page_links_field
+
+
+# Skim off 1 in 10 records for the user table
+# Be careful the file is in HDFS if you run previous job as hadoop job, 
+# you should either copy data into local disk to run following script
+# or run hadoop job to trim the data
+
+protousers=$hdfsroot/protousers
+echo "Skimming users"
+$PIG_BIN << EOF
+register $pigmixjar;
+A = load '$pages' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user;
+C = distinct B parallel $mappers;
+D = order C by \$0 parallel $mappers;
+store D into '$protousers';
+EOF
+
+
+# Create users table, with now user field.
+phone_field=s:10:1600000:z:20
+address_field=s:20:1600000:z:20
+city_field=s:10:1600000:z:20
+state_field=s:2:1600:z:20
+zip_field=i:2:1600:z:20
+
+users=$hdfsroot/users
+
+echo "Generating $users"
+
+$HADOOP_HOME/bin/hadoop jar $pigmixjar $mainclass \
+    -m $mappers -i $protousers -f $users $phone_field \
+    $address_field $city_field $state_field $zip_field
+
+# Find unique keys for fragment replicate join testing
+# If file is in HDFS, extra steps are required
+numuniquekeys=500
+protopowerusers=$hdfsroot/proto_power_users
+echo "Skimming power users"
+
+$PIG_BIN << EOF
+register $pigmixjar;
+fs -rmr $protopowerusers;
+A = load '$pages' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user;
+C = distinct B parallel $mappers;
+D = order C by \$0 parallel $mappers;
+E = limit D 500;
+store E into '$protopowerusers';
+EOF
+
+echo "Generating $powerusers"
+
+powerusers=$hdfsroot/power_users
+
+$HADOOP_HOME/bin/hadoop jar $pigmixjar $mainclass \
+    -m $mappers -i $protopowerusers -f $powerusers $phone_field \
+    $address_field $city_field $state_field $zip_field
+
+echo "Generating widerow"
+
+widerows=$hdfsroot/widerow
+user_field=s:20:10000:z:0
+int_field=i:1:10000:u:0 
+
+$HADOOP_HOME/bin/hadoop jar $pigmixjar $mainclass \
+    -m $mappers -r $widerowcnt -f $widerows $user_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+    $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field
+
+widegroupbydata=$hdfsroot/widegroupbydata
+
+$PIG_BIN << EOF
+register $pigmixjar;
+A = load '$pages' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = order A by user parallel $mappers;
+store B into '${pages}_sorted' using PigStorage('\u0001');
+
+exec
+
+alpha = load '$users' using PigStorage('\u0001') as (name, phone, address, city, state, zip);
+a1 = order alpha by name parallel $mappers;
+store a1 into '${users}_sorted' using PigStorage('\u0001');
+
+exec
+
+a = load '$powerusers' using PigStorage('\u0001') as (name, phone, address, city, state, zip);
+b = sample a 0.5;
+store b into '${powerusers}_samples' using PigStorage('\u0001');
+
+exec
+
+A = load '$pages' as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links) using org.apache.pig.test.pigmix.udf.PigPerformanceLoader();
+B = foreach A generate user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links,
+user as user1, action as action1, timespent as timespent1, query_term as query_term1, ip_addr as ip_addr1, timestamp as timestamp1, estimated_revenue as estimated_revenue1, page_info as page_info1, page_links as page_links1,
+user as user2, action as action2, timespent as timespent2, query_term as query_term2, ip_addr as ip_addr2, timestamp as timestamp2, estimated_revenue as estimated_revenue2, page_info as page_info2, page_links as page_links2;
+store B into '$widegroupbydata' using PigStorage('\u0001');;
+EOF
+
+poweruserlocal=$localtmp/$powerusers
+poweruserlocalsingle=$localtmp/${powerusers}_single
+rm -fr $poweruserlocal
+rm $poweruserlocalsingle
+mkdir -p $poweruserlocal
+$PIG_BIN << EOF
+fs -copyToLocal ${powerusers}/* $poweruserlocal;
+EOF
+
+cat $poweruserlocal/* > poweruserlocalsingle
+
+$PIG_BIN << EOF
+fs -rmr $protousers;
+fs -rmr $powerusers;
+fs -rmr $protopowerusers;
+fs -copyFromLocal poweruserlocalsingle $hdfsroot/power_users;
+EOF
+
+rm -fr $poweruserlocal
+rm poweruserlocalsingle

Added: pig/trunk/test/perf/pigmix/bin/runpigmix.pl
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/bin/runpigmix.pl?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/bin/runpigmix.pl (added)
+++ pig/trunk/test/perf/pigmix/bin/runpigmix.pl Mon Apr 15 23:13:09 2013
@@ -0,0 +1,110 @@
+#!/usr/local/bin/perl -w
+
+if(scalar(@ARGV) < 6 )
+{
+    print STDERR "Usage: $0 <pig_home> <pig_bin> <pigmix_jar> <hadoop_home> <hadoop_bin> <pig mix scripts dir> <hdfs_root> <pigmix_output> [parallel] [numruns] [runmapreduce] \n";
+    exit(-1);
+}
+my $pighome = shift;
+my $pigbin = shift;
+my $pigmixjar = shift;
+my $hadoophome = shift;
+my $hadoopbin = shift;
+my $scriptdir = shift;
+my $hdfsroot = shift;
+my $pigmixoutput = shift;
+my $parallel = shift;
+my $runs = shift;
+my $runmapreduce = shift;
+my $pigjar = "$pighome/pig-withouthadoop.jar";
+if(!defined($parallel)) {
+    $parallel = 40;
+}
+if(!defined($runs)) {
+    $runs = 3;
+}
+if(!defined($runmapreduce)) {
+    $runmapreduce = 1;
+}
+
+$ENV{'HADOOP_HOME'} = $hadoophome;
+$ENV{'HADOOP_CLIENT_OPTS'}="-Xmx1024m";
+
+my $cmd;
+my $total_pig_times = 0;
+my $total_mr_times = 0;
+
+print STDERR "Removing output dir $pigmixoutput \n";
+$cmd = "$hadoopbin fs -rmr $pigmixoutput";
+print STDERR "Going to run $cmd\n";
+print STDERR `$cmd 2>&1`;
+
+for(my $i = 1; $i <= 17; $i++) {
+    my $pig_times = 0;
+    for(my $j = 0; $j < $runs; $j++) {
+        print STDERR "Running Pig Query L".$i."\n";
+        print STDERR "L".$i.":";
+        print STDERR "Going to run $pigbin $scriptdir/L".$i.".pig\n";
+        my $s = time();
+        $cmd = "$pigbin -param PIGMIX_JAR=$pigmixjar -param HDFS_ROOT=$hdfsroot -param PIGMIX_OUTPUT=$pigmixoutput/pig -param PARALLEL=$parallel $scriptdir/L". $i.".pig" ;
+        print STDERR `$cmd 2>&1`;
+        my $e = time();
+        $pig_times += $e - $s;
+        cleanup($i);
+    }
+    # find avg
+    $pig_times = $pig_times/$runs;
+    # round to next second
+    $pig_times = int($pig_times + 0.5);
+    $total_pig_times = $total_pig_times + $pig_times;
+
+    if ($runmapreduce==0) {
+        print "PigMix_$i pig run time: $pig_times\n";
+    }
+    else {
+        $mr_times = 0;
+        for(my $j = 0; $j < $runs; $j++) {
+            print STDERR "Running Map-Reduce Query L".$i."\n";
+            print STDERR "L".$i.":";
+            print STDERR "Going to run $hadoopbin jar $pigmixjar org.apache.pig.test.pigmix.mapreduce.L"."$i $hdfsroot $pigmixoutput/mapreduce $parallel\n";
+            my $s = time();
+            $cmd = "$hadoopbin jar $pigmixjar org.apache.pig.test.pigmix.mapreduce.L$i $hdfsroot $pigmixoutput/mapreduce $parallel";
+            print STDERR `$cmd 2>&1`;
+            my $e = time();
+            $mr_times += $e - $s;
+            cleanup($i);
+        }
+        # find avg
+        $mr_times = $mr_times/$runs;
+        # round to next second
+        $mr_times = int($mr_times + 0.5);
+        $total_mr_times = $total_mr_times + $mr_times;
+
+        my $multiplier = $pig_times/$mr_times;
+        print "PigMix_$i pig run time: $pig_times, java run time: $mr_times, multiplier: $multiplier\n";
+    }
+}
+
+if ($runmapreduce==0) {
+    print "Total pig run time: $total_pig_times\n";
+}
+else {
+    my $total_multiplier = $total_pig_times / $total_mr_times;
+    print "Total pig run time: $total_pig_times, total java time: $total_mr_times, multiplier: $total_multiplier\n";
+}
+
+sub cleanup {
+    my $suffix = shift;
+    my $cmd;
+    $cmd = "$pigbin -e rmf L".$suffix."out";
+    print STDERR `$cmd 2>&1`;
+    $cmd = "$pigbin -e rmf highest_value_page_per_user";
+    print STDERR `$cmd 2>&1`;
+    $cmd = "$pigbin -e rmf total_timespent_per_term";
+    print STDERR `$cmd 2>&1`;
+    $cmd = "$pigbin -e rmf queries_per_action";
+    print STDERR `$cmd 2>&1`;
+    $cmd = "$pigbin -e rmf tmp";
+    print STDERR `$cmd 2>&1`;
+}
+

Added: pig/trunk/test/perf/pigmix/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/build.xml?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/build.xml (added)
+++ pig/trunk/test/perf/pigmix/build.xml Mon Apr 15 23:13:09 2013
@@ -0,0 +1,102 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project name="PigMix" default="jar">
+
+  <property name="pig.dir" value="${basedir}/../../.."/>
+  <property name="pig.jar" value="${pig.dir}/pig.jar"/>
+  <property name="ivy.dir" location="${pig.dir}/ivy" />
+  <loadproperties srcfile="${ivy.dir}/libraries.properties"/>
+
+  <path id="classpath">
+    <fileset dir="${basedir}/lib">
+      <include name="*.jar"/>
+    </fileset>
+    <fileset file="${pig.jar}" />
+  </path>
+
+  <property name="java.dir" value="${basedir}/src/java"/>
+  <property name="pigmix.build.dir" value="${basedir}/build"/>
+  <property name="pigmix.jar" value="${basedir}/pigmix.jar"/>
+
+  <condition property="harness.cluster.conf" value="${harness.hadoop.home}/conf">
+    <not>
+      <isset property="harness.cluster.conf"/>
+    </not>
+  </condition>
+
+  <!-- Build the UDFs -->
+  <target name="pig-jar-available">
+    <available property="pig.jar.available" file="${pig.jar}"/>
+    <fail message="You must build the main Pig jar before running these tests"
+        unless="pig.jar.available"/>
+  </target>
+
+  <target name="clean">
+    <delete dir="${pigmix.build.dir}" />
+    <delete file="${pigmix.jar}" />
+  </target>
+
+  <target name="compile" depends="pig-jar-available">
+    <mkdir dir="${pigmix.build.dir}" />
+    <javac srcdir="${java.dir}" destdir="${pigmix.build.dir}" debug="on">
+      <classpath refid="classpath" />
+    </javac>
+  </target>
+
+  <target name="jar" depends="compile">
+    <jar jarfile="${pigmix.jar}">
+      <fileset dir="build"/>
+      <zipfileset src="lib/sdsuLibJKD12.jar"/>
+    </jar>
+  </target>
+
+  <target name="property-check">
+    <fail message="Please set the property harness.hadoop.home to HADOOP_HOME, make sure ${harness.hadoop.home}/bin/hadoop exists."
+      unless="harness.hadoop.home"/>
+  </target>
+
+  <target name="deploy" depends="jar, property-check">
+    <exec executable="bash" failonerror="true">
+      <arg value="${basedir}/bin/generate_data.sh"/>
+      <env key="PIG_HOME" value="${pig.dir}"/>
+      <env key="PIG_BIN" value="${pig.dir}/bin/pig"/>
+      <env key="PIGMIX_HOME" value="${basedir}"/>
+      <env key="HADOOP_HOME" value="${harness.hadoop.home}"/>
+      <!--env key="HADOOP_CONF_DIR" value="${harness.cluster.conf}"/-->
+    </exec>
+  </target>
+
+  <target name="test" depends="jar, property-check">
+    <exec executable="perl" failonerror="true">
+      <arg value="${basedir}/bin/runpigmix.pl"/>
+      <arg value="${pig.dir}"/>
+      <arg value="${pig.dir}/bin/pig"/>
+      <arg value="${basedir}/pigmix.jar"/>
+      <arg value="${harness.hadoop.home}"/>
+      <arg value="${harness.hadoop.home}/bin/hadoop"/>
+      <arg value="${basedir}/src/pig"/>
+      <arg value="/user/pig/tests/data/pigmix"/>
+      <arg value="output"/>
+      <!--parallel is hardcoded, we can change but L9/L10 mapreduce program will still use 40 reduces-->
+      <arg value="40"/>
+      <!--number of runs-->
+      <arg value="1"/>
+      <!--env key="HADOOP_CONF_DIR" value="${harness.cluster.conf}"/-->
+    </exec>
+  </target>
+</project>

Added: pig/trunk/test/perf/pigmix/conf/config.sh
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/conf/config.sh?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/conf/config.sh (added)
+++ pig/trunk/test/perf/pigmix/conf/config.sh Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+hdfsroot=/user/pig/tests/data/pigmix
+localtmp=/tmp
+
+# configure the number of mappers for data generator
+mappers=90
+
+# ~1600 bytes per row for page_views (it is the base for most other inputs)
+rows=625000000
+
+# only used in L11 (widerow, ~2500 bytes per row)
+widerowcnt=10000000

Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/DataGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/DataGenerator.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/DataGenerator.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/DataGenerator.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.pigmix.datagen;
+
+import java.io.*;
+import java.lang.SecurityException;
+import java.text.ParseException;
+import java.util.*;
+
+import sdsu.algorithms.data.Zipf;
+
+import org.apache.pig.tools.cmdline.CmdLineParser;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+
+/**
+ * A tool to generate data for performance testing.
+ */
+public class DataGenerator extends Configured implements Tool {
+    ColSpec[] colSpecs;
+    long seed = -1;
+    long numRows = -1;
+    int numMappers = -1;
+    String outputFile;
+    String inFile;
+    char separator = '\u0001' ;
+    Random rand;
+
+    private String[] mapkey = { "a", "b", "c", "d", "e", "f", "g", "h", "i",
+        "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w",
+        "x", "y", "z"};
+
+    public static void main(String[] args) throws Exception {    	
+    	DataGenerator dg = new DataGenerator();    	 
+    	try {
+    		ToolRunner.run(new Configuration(), dg, args);    			
+   		}catch(Exception e) {
+    		throw new IOException (e);
+    	}    	
+        dg.go();
+    }
+    
+    protected DataGenerator(long seed) {    
+    	System.out.println("Using seed " + seed);
+        rand = new Random(seed);
+    }
+
+    protected DataGenerator() {
+    	
+    }
+    
+    protected DataGenerator(String[] args) {
+    	
+    }
+    
+    public int run(String[] args) throws Exception {    	
+        CmdLineParser opts = new CmdLineParser(args);
+        opts.registerOpt('e', "seed", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('f', "file", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('r', "rows", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('s', "separator", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('i', "input", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('m', "mappers", CmdLineParser.ValueExpected.OPTIONAL);
+
+        char opt;
+        try {
+            while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
+                switch (opt) {
+                case 'e':
+                    seed = Long.valueOf(opts.getValStr());
+                    break;
+
+                case 'f':
+                    outputFile = opts.getValStr();                   
+                    break;
+
+                case 'i':
+                    inFile = opts.getValStr();                   
+                    break;
+
+                case 'r':
+                    numRows = Long.valueOf(opts.getValStr());
+                    break;
+    
+                case 's':
+                    separator = opts.getValStr().charAt(0);
+                    break;
+                    
+                case 'm':
+                	numMappers = Integer.valueOf(opts.getValStr());
+                	break;
+
+                default:
+                    usage();
+                    break;
+                }
+            }
+        } catch (ParseException pe) {
+            System.err.println("Couldn't parse the command line arguments, " +
+                pe.getMessage());
+            usage();
+        }
+
+        if (numRows < 1 && inFile == null) usage();
+
+        if (numRows > 0 && inFile != null) usage();              
+        
+        if (numMappers > 0 && seed != -1) usage();
+        
+        if (seed == -1){
+        	seed = System.currentTimeMillis();
+        }
+        
+        String remainders[] = opts.getRemainingArgs();
+        colSpecs = new ColSpec[remainders.length];
+        for (int i = 0; i < remainders.length; i++) {
+            colSpecs[i] = new ColSpec(remainders[i]);
+        }
+        System.err.println("Using seed " + seed);
+        rand = new Random(seed); 
+        
+        return 0;
+    }
+
+    private void go() throws IOException {    	    
+    	long t1 = System.currentTimeMillis();
+    	if (numMappers <= 0) {
+    		System.out.println("Generate data in local mode.");
+        	goLocal();
+        }else{
+        	System.out.println("Generate data in hadoop mode.");        
+        	HadoopRunner runner = new HadoopRunner();
+        	runner.goHadoop();
+        }
+    	   	
+    	long t2 = System.currentTimeMillis();
+    	System.out.println("Job is successful! It took " + (t2-t1)/1000 + " seconds.");
+    }
+    
+    public void goLocal() throws IOException {
+    	
+    	PrintWriter out = null;
+		try {
+            out = new PrintWriter(outputFile);
+        } catch (FileNotFoundException fnfe) {
+            System.err.println("Could not find file " + outputFile +
+                ", " + fnfe.getMessage());
+            return;
+        } catch (SecurityException se) {
+            System.err.println("Could not write to file " + outputFile +
+                ", " + se.getMessage());
+            return;
+        }
+        
+        BufferedReader in = null;
+        if (inFile != null) {
+        	try {
+      	  		in = new BufferedReader(new FileReader(inFile));
+        	} catch (FileNotFoundException fnfe) {
+        		System.err.println("Unable to find input file " + inFile);
+        		return;
+        	}
+        }
+        
+        if (numRows > 0) {
+        	for (int i = 0; i < numRows; i++) {
+        		writeLine(out);
+        		out.println();
+        	}
+        } else if (in != null) {
+        	String line;
+        	while ((line = in.readLine()) != null) {
+        		out.print(line);
+        		writeLine(out);
+        		out.println();
+        	}
+        }
+        out.close();
+    }      
+    
+    protected void writeLine(PrintWriter out) {
+        for (int j = 0; j < colSpecs.length; j++) {
+            if (j != 0) out.print(separator);
+            // First, decide if it's going to be null            
+            if (rand.nextInt(100) < colSpecs[j].pctNull) {
+            	continue;
+            }
+            writeCol(colSpecs[j], out);
+        }
+    }
+
+    private void writeCol(ColSpec colspec, PrintWriter out) {
+        switch (colspec.datatype) {
+        case INT:
+            out.print(colspec.nextInt());
+            break;
+
+        case LONG:
+            out.print(colspec.nextLong());
+            break;
+
+        case FLOAT:
+            out.print(colspec.nextFloat());
+            break;
+
+        case DOUBLE:
+            out.print(colspec.nextDouble());
+            break;
+
+        case STRING:
+            out.print(colspec.nextString());
+            break;
+
+        case MAP:
+            int len = rand.nextInt(20) + 6;
+            for (int k = 0; k < len; k++) {
+                if (k != 0) out.print('');
+                out.print(mapkey[k] + '');
+                out.print(colspec.gen.randomString());
+            }
+            break;
+
+        case BAG:
+            int numElements = rand.nextInt(5) + 5;
+            for (int i = 0; i < numElements; i++) {
+                if (i != 0) out.print('');
+                switch(colspec.contained.datatype) {
+                    case INT: out.print("i"); break;
+                    case LONG: out.print("l"); break;
+                    case FLOAT: out.print("f"); break;
+                    case DOUBLE: out.print("d"); break;
+                    case STRING: out.print("s"); break;
+                    case MAP: out.print("m"); break;
+                    case BAG: out.print("b"); break;
+                    default: throw new RuntimeException("should never be here");
+                }
+                writeCol(colspec.contained, out);
+            }
+        }
+    }
+
+    private void usage() {
+        System.err.println("Usage: datagen -rows numrows [options] colspec ...");
+        System.err.println("\tOptions:");
+        System.err.println("\t-e -seed seed value for random numbers");
+        System.err.println("\t-f -file output file, default is stdout");
+        System.err.println("\t-i -input input file, lines will be read from");
+        System.err.println("\t\tthe file and additional columns appended.");
+        System.err.println("\t\tMutually exclusive with -r.");
+        System.err.println("\t-r -rows number of rows to output");
+        System.err.println("\t-s -separator character, default is ^A");
+        System.err.println("\t-m -number of mappers to run concurrently to generate data. " +
+        		"If not specified, DataGenerator runs locally. This option can NOT be used with -e.");
+        System.err.println();
+        System.err.print("\tcolspec: columntype:average_size:cardinality:");
+        System.err.println("distribution_type:percent_null");
+        System.err.println("\tcolumntype:");
+        System.err.println("\t\ti = int");
+        System.err.println("\t\tl = long");
+        System.err.println("\t\tf = float");
+        System.err.println("\t\td = double");
+        System.err.println("\t\ts = string");
+        System.err.println("\t\tm = map");
+        System.err.println("\t\tbx = bag of x, where x is a columntype");
+        System.err.println("\tdistribution_type:");
+        System.err.println("\t\tu = uniform");
+        System.err.println("\t\tz = zipf");
+
+        throw new RuntimeException();
+    }
+
+
+ 
+    static enum Datatype { INT, LONG, FLOAT, DOUBLE, STRING, MAP, BAG };
+    static enum DistributionType { UNIFORM, ZIPF };
+    protected class ColSpec {
+    	String arg;
+        Datatype datatype;
+        DistributionType distype;
+        int avgsz;
+        int card;
+        RandomGenerator gen;
+        int pctNull;
+        ColSpec contained;
+        String mapfile;
+        Map<Integer, Object> map;
+
+        public ColSpec(String arg) {
+        	this.arg = arg;
+        	
+            String[] parts = arg.split(":");
+            if (parts.length != 5 && parts.length != 6) {
+                System.err.println("Colspec [" + arg + "] format incorrect"); 
+                usage();
+            }
+
+            switch (parts[0].charAt(0)) {
+                case 'i': datatype = Datatype.INT; break;
+                case 'l': datatype = Datatype.LONG; break;
+                case 'f': datatype = Datatype.FLOAT; break;
+                case 'd': datatype = Datatype.DOUBLE; break;
+                case 's': datatype = Datatype.STRING; break;
+                case 'm': datatype = Datatype.MAP; break;
+                case 'b':
+                    datatype = Datatype.BAG;
+                    contained = new ColSpec(arg.substring(1));
+                    return;
+                default: 
+                    System.err.println("Don't know column type " +
+                        parts[0].charAt(0));
+                    usage();
+                    break;
+            }
+            avgsz = Integer.valueOf(parts[1]);
+            card = Integer.valueOf(parts[2]);
+            switch (parts[3].charAt(0)) {
+                case 'u': 
+                    gen = new UniformRandomGenerator(avgsz, card);
+                    distype = DistributionType.UNIFORM;
+                    break;
+
+                case 'z': 
+                    gen = new ZipfRandomGenerator(avgsz, card);
+                    distype = DistributionType.ZIPF;
+                    break;
+
+                default:
+                    System.err.println("Don't know generator type " +
+                        parts[3].charAt(0));
+                    usage();
+                    break;
+            }
+
+            pctNull = Integer.valueOf(parts[4]);
+            if (pctNull > 100) {
+                System.err.println("Percentage null must be between 0-100, "
+                    + "you gave" + pctNull);
+                usage();
+            }
+            contained = null;
+
+            // if config has 6 columns, the last col is the file name 
+            // of the mapping file from random number to field value
+            if (parts.length == 6) {
+            	mapfile = parts[5];
+            	gen.hasMapFile = true;
+            }
+            
+            map = new HashMap<Integer, Object>();
+        }
+        
+        public int nextInt() {
+        	return gen.nextInt(map);
+        }
+        
+        public long nextLong() {
+        	return gen.nextInt(map);
+        }
+        
+        public double nextDouble() {
+        	return gen.nextDouble(map);
+        }
+        
+        public float nextFloat() {
+        	return gen.nextFloat(map);
+        }
+        
+        public String nextString() {
+        	return gen.nextString(map);
+        }
+    }
+
+    abstract class RandomGenerator {
+
+        protected int avgsz;
+        protected boolean hasMapFile; // indicating whether a map file from 
+                                      // random number to the field value is pre-defined
+
+        abstract public int nextInt(Map<Integer, Object> map);
+        abstract public long nextLong(Map<Integer, Object> map);
+        abstract public float nextFloat(Map<Integer, Object> map);
+        abstract public double nextDouble(Map<Integer, Object> map);
+        abstract public String nextString(Map<Integer, Object> map);
+
+        public String randomString() {
+            int var = (int)((double)avgsz * 0.3);
+            StringBuffer sb = new StringBuffer(avgsz + var);
+            if (var < 1) var = 1;
+            int len = rand.nextInt(2 * var) + avgsz - var;
+            for (int i = 0; i < len; i++) {
+                int n = rand.nextInt(122 - 65) + 65;
+                sb.append(Character.toChars(n));
+            }
+            return sb.toString();
+        }
+        
+        public float randomFloat() {
+        	return rand.nextFloat() * rand.nextInt();
+        }
+        
+        public double randomDouble() {
+        	return rand.nextDouble() * rand.nextInt();
+        }
+    }
+
+    class UniformRandomGenerator extends RandomGenerator {
+        int card;       
+        
+        public UniformRandomGenerator(int a, int c) {
+            avgsz = a;
+            card = c;            
+        }
+
+        public int nextInt(Map<Integer, Object> map) {
+            return rand.nextInt(card);
+        }
+
+        public long nextLong(Map<Integer, Object> map) {
+            return rand.nextLong() % card;
+        }
+
+        public float nextFloat(Map<Integer, Object> map) {
+            int seed = rand.nextInt(card);
+            Float f = (Float)map.get(seed);
+            if (f == null) {
+            	if (!hasMapFile) {
+            		f = randomFloat();
+            		map.put(seed, f);
+            	}else{
+            		throw new IllegalStateException("Number " + seed + " is not found in map file");
+            	}
+            }
+            return f;
+        }
+        
+        public double nextDouble(Map<Integer, Object> map) {
+            int seed = rand.nextInt(card);
+            Double d = (Double)map.get(seed);
+            if (d == null) {
+            	if (!hasMapFile) {
+            		d = randomDouble();
+            		map.put(seed, d);
+            	}else{
+            		throw new IllegalStateException("Number " + seed + " is not found in map file");
+            	}
+            }
+            return d;
+        }
+
+        public String nextString(Map<Integer, Object> map) {
+            int seed = rand.nextInt(card);
+            String s = (String)map.get(seed);
+            if (s == null) {
+            	if (!hasMapFile) {
+            		s = randomString();
+            		map.put(seed, s);
+            	}else{
+            		throw new IllegalStateException("Number " + seed + " is not found in map file");
+            	}
+            }
+            return s;
+        }
+        
+    }
+
+    class ZipfRandomGenerator extends RandomGenerator {
+        Zipf z;
+
+        public ZipfRandomGenerator(int a, int c) {
+            avgsz = a;
+            z = new Zipf(c);         
+        }
+        
+        
+        // the Zipf library returns a random number [1..cardinality], so we substract by 1
+        // to get [0..cardinality)
+        // the randome number returned by zipf library is an integer, but converted into double
+        private double next() {
+        	return z.nextElement()-1;
+        }
+
+        public int nextInt(Map<Integer, Object> map) {
+            return (int)next();
+        }
+
+        public long nextLong(Map<Integer, Object> map) {
+            return (long)next();
+        }
+
+        public float nextFloat(Map<Integer, Object> map) {
+        	int seed = (int)next();
+            Float d = (Float)map.get(seed);
+            if (d == null) {
+            	if (!hasMapFile) {
+            		d = randomFloat();
+            		map.put(seed, d);
+            	}else{
+            		throw new IllegalStateException("Number " + seed + " is not found in map file");
+            	}
+            }
+            return d;
+        }
+
+        public double nextDouble(Map<Integer, Object> map) {
+        	 int seed = (int)next();
+             Double d = (Double)map.get(seed);
+             if (d == null) {
+             	if (!hasMapFile) {
+             		d = randomDouble();
+             		map.put(seed, d);
+             	}else{
+             		throw new IllegalStateException("Number " + seed + " is not found in map file");
+             	}
+             }
+             return d;
+        }
+        
+        public String nextString(Map<Integer, Object> map) {
+            int seed = (int)next();
+            String s = (String)map.get(seed);
+            if (s == null) {
+            	if (!hasMapFile) {
+            		s = randomString();
+            		map.put(seed, s);
+            	}else{
+            		throw new IllegalStateException("Number " + seed + " is not found in map file");
+            	}
+            }
+            return s;
+        }
+    }
+    
+//  launch hadoop job
+    class HadoopRunner {
+    	Random r;
+    	FileSystem fs;
+    	Path tmpHome;
+    	
+    	public HadoopRunner() {
+    		r = new Random();
+    	}
+    	
+    	public void goHadoop() throws IOException {
+            // Configuration processed by ToolRunner
+            Configuration conf = getConf();
+            
+            // Create a JobConf using the processed conf            
+            JobConf job = new JobConf(conf);     
+            fs = FileSystem.get(job);           
+            
+            tmpHome = createTempDir(null);                          
+            
+            String config = genMapFiles().toUri().getRawPath();
+            // set config properties into job conf
+            job.set("fieldconfig", config);      
+            job.set("separator", String.valueOf((int)separator));
+            
+            
+            job.setJobName("data-gen");
+            job.setNumMapTasks(numMappers);
+            job.setNumReduceTasks(0);          
+            job.setMapperClass(DataGenMapper.class);   
+            job.setJarByClass(DataGenMapper.class);
+            
+            // if inFile is specified, use it as input
+            if (inFile != null) {
+           	 FileInputFormat.setInputPaths(job, inFile);
+           	 job.set("hasinput", "true");
+           } else {
+        	   job.set("hasinput", "false");
+        	   Path input = genInputFiles();        
+        	   FileInputFormat.setInputPaths(job, input);
+           }
+           FileOutputFormat.setOutputPath(job, new Path(outputFile));
+                               
+            // Submit the job, then poll for progress until the job is complete
+            System.out.println("Submit hadoop job...");
+            RunningJob j = JobClient.runJob(job);
+            if (!j.isSuccessful()) {
+            	throw new IOException("Job failed");
+            }
+                        
+            if (fs.exists(tmpHome)) {            	
+            	fs.delete(tmpHome, true);
+            }
+         }
+    	
+    	 private Path genInputFiles() throws IOException {
+    		 long avgRows = numRows/numMappers;
+    	        
+    		 // create a temp directory as mappers input
+    	     Path input = createTempDir(tmpHome);
+    	     System.out.println("Generating input files into " + input.toString());
+    	     
+    	     long rowsLeft = numRows;
+    	     
+    	     // create one input file per mapper, which contains
+    	     // the number of rows 
+    	     for(int i=0; i<numMappers; i++) {
+    	    	 Object[] tmp = createTempFile(input, false);
+    	    	 PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
+    	    	 
+    	    	 if (i < numMappers-1) {
+    	    		 pw.println(avgRows);
+    	    	 }else{
+    	    		 // last mapper takes all the rows left
+    	    		 pw.println(rowsLeft);
+    	    	 }
+    	    
+    	    	 pw.close();
+    	    	 rowsLeft -= avgRows;
+    	     }
+    	     
+    	     return input;
+    	 }
+    	
+    	// generate map files for all the fields that need to pre-generate map files
+    	// return a config file which contains config info for each field, including 
+    	// the path to their map file
+    	 private Path genMapFiles() throws IOException {
+    		 Object[] tmp = createTempFile(tmpHome, false);
+    		 
+    		 System.out.println("Generating column config file in " + tmp[0].toString());
+    		 PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
+    		 for(int i=0; i<colSpecs.length; i++) {
+    			 DataGenerator.Datatype datatype = colSpecs[i].datatype;
+    			 pw.print(colSpecs[i].arg);
+    			 
+    			 if ( datatype == DataGenerator.Datatype.FLOAT || datatype == DataGenerator.Datatype.DOUBLE ||
+    					 datatype == DataGenerator.Datatype.STRING) 	 {
+    				 Path p = genMapFile(colSpecs[i]);
+    				 pw.print(':');    				
+    				 pw.print(p.toUri().getRawPath());				 
+    			 } 
+    			 
+    			 pw.println();
+    		 }
+    		 
+    		 pw.close();
+    		 
+    		 return (Path)tmp[0];
+    	 }
+    	 
+    	 // genereate a map file between random number to field value
+    	 // return the path of the map file
+    	 private Path genMapFile(DataGenerator.ColSpec col) throws IOException {
+    		 int card = col.card;
+    		 Object[] tmp = createTempFile(tmpHome, false);
+    		 
+    		 System.out.println("Generating mapping file for column " + col.arg + " into " + tmp[0].toString());
+    		 PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
+    		 HashSet<Object> hash = new HashSet<Object>(card);
+    		 for(int i=0; i<card; i++) {
+    			 pw.print(i);
+    			 pw.print("\t");
+    			 Object next = null;
+    			 do {
+    				 if (col.datatype == DataGenerator.Datatype.DOUBLE) {
+        				 next = col.gen.randomDouble();
+        			 }else if (col.datatype == DataGenerator.Datatype.FLOAT) {
+        				 next = col.gen.randomFloat();
+        			 }else if (col.datatype == DataGenerator.Datatype.STRING) {
+        				 next = col.gen.randomString();
+        			 }
+    			 }while(hash.contains(next));
+    			 
+    			 hash.add(next);
+    			 
+    			 pw.println(next);
+    			 
+    			 if ( (i>0 && i%300000 == 0) || i == card-1 ) {
+    				 System.out.println("processed " + i*100/card + "%." );
+    				 pw.flush();
+    			 }    			 
+    		 }    		 
+    		 
+    		 pw.close();
+    		 
+    		 return (Path)tmp[0];
+        	 
+    	 }
+    	 
+    	 private Path createTempDir(Path parentDir) throws IOException {
+    		 Object[] obj = createTempFile(parentDir, true);
+    		 return (Path)obj[0];
+    	 }
+    	 
+    	 private Object[] createTempFile(Path parentDir, boolean isDir) throws IOException {		
+    		 Path tmp_home = parentDir;
+    		 
+    		 if (tmp_home == null) {
+    			 tmp_home = new Path(fs.getHomeDirectory(), "tmp");
+    		 }
+    		 
+    		 if (!fs.exists(tmp_home)) {
+    			 fs.mkdirs(tmp_home);
+    		 }
+    		 
+    		 int id = r.nextInt();
+    		 Path f = new Path(tmp_home, "tmp" + id);
+    		 while (fs.exists(f)) {
+    			 id = r.nextInt();
+    			 f = new Path(tmp_home, "tmp" + id);
+    		 }
+    		 
+    		 // return a 2-element array. first element is PATH,
+    		 // second element is OutputStream
+    		 Object[] r = new Object[2];
+    		 r[0] = f;
+    		 if (!isDir) {
+    			 r[1] = fs.create(f);
+    		 }else{
+    			 fs.mkdirs(f);
+    		 }
+    		 
+    		 return r;
+    	 }    	
+    }
+
+	 public static class DataGenMapper extends MapReduceBase implements Mapper<LongWritable, Text, String, String> {
+ 		private JobConf jobConf;
+ 		private DataGenerator dg; 		
+ 		private boolean hasInput;
+ 	   
+ 		public void configure(JobConf jobconf) {
+ 			this.jobConf = jobconf;
+ 						 			
+ 			int id = Integer.parseInt(jobconf.get("mapred.task.partition"));
+ 			long time = System.currentTimeMillis() - id*3600*24*1000;
+ 			 			
+ 			dg = new DataGenerator( ((time-id*3600*24*1000) | (id << 48)));
+ 			
+ 		    dg.separator = (char)Integer.parseInt(jobConf.get("separator"));
+ 		     		    
+ 		    if (jobConf.get("hasinput").equals("true")) {
+ 		    	hasInput = true;
+ 		    }
+ 		    
+ 		    String config = jobConf.get("fieldconfig");
+ 		    		    
+ 		    try {			    
+ 			    FileSystem fs = FileSystem.get(jobconf);
+ 			    
+ 			    // load in config file for each column
+ 			    BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(new Path(config))));
+ 			    String line = null;
+ 			    List<DataGenerator.ColSpec> cols = new ArrayList<DataGenerator.ColSpec>();
+ 			    while((line = reader.readLine()) != null) { 			    	
+ 			    	cols.add(dg.new ColSpec(line));		    	
+ 			    }
+ 			    reader.close();
+ 			    dg.colSpecs = cols.toArray(new DataGenerator.ColSpec[0]); 			    
+ 			    
+ 			    // load in mapping files
+ 			    for(int i=0; i<dg.colSpecs.length; i++) {
+ 			    	DataGenerator.ColSpec col = dg.colSpecs[i];
+ 			    	if (col.mapfile != null) { 			    		
+ 			    		reader = new BufferedReader(new InputStreamReader(fs.open(new Path(col.mapfile))));
+ 			    		Map<Integer, Object> map = dg.colSpecs[i].map;
+ 			    		while((line = reader.readLine()) != null) {
+ 					    	String[] fields = line.split("\t");
+ 					    	int key = Integer.parseInt(fields[0]);
+ 					    	if (col.datatype == DataGenerator.Datatype.DOUBLE) {
+ 					    		map.put(key, Double.parseDouble(fields[1]));
+ 					    	}else if (col.datatype == DataGenerator.Datatype.FLOAT) {
+ 					    		map.put(key, Float.parseFloat(fields[1]));
+ 					    	}else {
+ 					    		map.put(key, fields[1]);
+ 					    	}
+ 					    }
+ 			    		
+ 			    		reader.close();
+ 			    	}
+ 			    }
+ 		    }catch(IOException e) {
+ 		    	throw new RuntimeException("Failed to load config file. " + e);
+ 		    }
+  		}
+ 		
+ 		public void map(LongWritable key, Text value, OutputCollector<String, String> output, Reporter reporter) throws IOException {
+ 			int intialsz = dg.colSpecs.length * 50;
+ 			
+ 			if (!hasInput) {
+ 				long numRows = Long.parseLong(value.toString().trim()); 		
+ 	 	        dg.numRows = numRows; 	 	         	       
+ 	 	        
+ 	 	        for (int i = 0; i < numRows; i++) {
+ 	 	        	StringWriter str = new StringWriter(intialsz);
+ 	 	        	PrintWriter pw = new PrintWriter(str);
+ 	 	        	dg.writeLine(pw); 	    
+ 	 	        	output.collect(null, str.toString()); 	        	 	        
+ 	 	        	
+ 	 	        	if ((i+1) % 10000 == 0) {
+ 	 	        		reporter.progress();
+ 	 	        		reporter.setStatus("" + (i+1) + " tuples generated.");  	        	
+ 	 	        	}
+ 	 	        }	       
+ 			} else {
+ 				StringWriter str = new StringWriter(intialsz);
+	 	        PrintWriter pw = new PrintWriter(str);
+	 	        pw.write(value.toString());
+	 	        dg.writeLine(pw); 	    
+	 	        output.collect(null, str.toString()); 	        	 	        	 	        	
+ 			}
+ 		}
+ 	}
+}
+
+
+
+

Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L1.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L1.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L1.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L1.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,163 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L1 {
+
+    public static class ReadPageViews extends MapReduceBase
+        implements Mapper<LongWritable, Text, Text, IntWritable> {
+
+        public void map(
+                LongWritable k,
+                Text val,
+                OutputCollector<Text, IntWritable> oc,
+                Reporter reporter) throws IOException {
+
+            // Split the line
+            List<Text> fields = Library.splitLine(val, '');
+            if (fields.size() != 9) return;
+
+            int cnt = 0;
+            if (fields.get(1).toString() == "1") {
+                Text throwAway = Library.mapLookup(fields.get(7), new Text("a"));
+                cnt++;
+            } else {
+                List<Text> le = Library.splitLine(fields.get(8), '');
+                for (Text e : le) {
+                    Text throwAway = Library.mapLookup(e, new Text("b"));
+                    cnt++;
+                }
+            }
+            oc.collect(fields.get(0), new IntWritable(cnt));
+        }
+    }
+
+    public static class Group extends MapReduceBase
+        implements Reducer<Text, IntWritable, Text, IntWritable> {
+
+        public void reduce(
+                Text key,
+                Iterator<IntWritable> iter, 
+                OutputCollector<Text, IntWritable> oc,
+                Reporter reporter) throws IOException {
+            int cnt = 0;
+            while (iter.hasNext()) {
+                cnt += iter.next().get();
+            }
+            oc.collect(key, new IntWritable(cnt));
+            reporter.setStatus("OK");
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        if (args.length!=3) {
+            System.out.println("Parameters: inputDir outputDir parallel");
+            System.exit(1);
+        }
+        String inputDir = args[0];
+        String outputDir = args[1];
+        String parallel = args[2];
+        JobConf lp = new JobConf(L1.class);
+        lp.setJobName("L1 Load Page Views");
+        lp.setInputFormat(TextInputFormat.class);
+        lp.setOutputKeyClass(Text.class);
+        lp.setOutputValueClass(IntWritable.class);
+        lp.setMapperClass(ReadPageViews.class);
+        lp.setCombinerClass(Group.class);
+        lp.setReducerClass(Group.class);
+        Properties props = System.getProperties();
+        for (Map.Entry<Object,Object> entry : props.entrySet()) {
+            lp.set((String)entry.getKey(), (String)entry.getValue());
+        }
+        FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+        FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L1out"));
+        lp.setNumReduceTasks(Integer.parseInt(parallel));
+        Job group = new Job(lp);
+
+        JobControl jc = new JobControl("L1 join");
+        jc.addJob(group);
+
+        new Thread(jc).start();
+   
+        int i = 0;
+        while(!jc.allFinished()){
+            ArrayList<Job> failures = jc.getFailedJobs();
+            if (failures != null && failures.size() > 0) {
+                for (Job failure : failures) {
+                    System.err.println(failure.getMessage());
+                }
+                break;
+            }
+
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {}
+
+            if (i % 10000 == 0) {
+                System.out.println("Running jobs");
+                ArrayList<Job> running = jc.getRunningJobs();
+                if (running != null && running.size() > 0) {
+                    for (Job r : running) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Ready jobs");
+                ArrayList<Job> ready = jc.getReadyJobs();
+                if (ready != null && ready.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Waiting jobs");
+                ArrayList<Job> waiting = jc.getWaitingJobs();
+                if (waiting != null && waiting.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Successful jobs");
+                ArrayList<Job> success = jc.getSuccessfulJobs();
+                if (success != null && success.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+            }
+            i++;
+        }
+        ArrayList<Job> failures = jc.getFailedJobs();
+        if (failures != null && failures.size() > 0) {
+            for (Job failure : failures) {
+                System.err.println(failure.getMessage());
+            }
+        }
+        jc.stop();
+    }
+
+}

Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L10.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L10.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L10.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L10.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,286 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L10 {
+
+    public static class MyType implements WritableComparable<MyType> {
+
+        public String query_term;
+        int timespent;
+        double estimated_revenue;
+
+        public MyType() {
+            query_term = null;
+            timespent = 0;
+            estimated_revenue = 0.0;
+        }
+
+        public MyType(Text qt, Text ts, Text er) {
+            query_term = qt.toString();
+            try {
+                timespent = Integer.valueOf(ts.toString());
+            } catch (NumberFormatException nfe) {
+                timespent = 0;
+            }
+            try {
+                estimated_revenue = Double.valueOf(er.toString());
+            } catch (NumberFormatException nfe) {
+                estimated_revenue = 0.0;
+            }
+        }
+
+        public void write(DataOutput out) throws IOException {
+            out.writeInt(timespent);
+            out.writeDouble(estimated_revenue);
+            out.writeInt(query_term.length());
+            out.writeBytes(query_term);
+        }
+
+        public void readFields(DataInput in) throws IOException {
+            timespent = in.readInt();
+            estimated_revenue = in.readDouble();
+            int len = in.readInt();
+            byte[] b = new byte[len];
+            in.readFully(b);
+            query_term = new String(b);
+        }
+
+        public int compareTo(MyType other) {
+            int rc = query_term.compareTo(other.query_term);
+            if (rc != 0) return rc;
+            if (estimated_revenue < other.estimated_revenue) return 1;
+            else if (estimated_revenue > other.estimated_revenue) return -1;
+            if (timespent < other.timespent) return -1;
+            else if (timespent > other.timespent) return 1;
+            return 0;
+        }
+    }
+
+    public static class ReadPageViews extends MapReduceBase
+        implements Mapper<LongWritable, Text, MyType, Text> {
+
+        public void map(
+                LongWritable k,
+                Text val,
+                OutputCollector<MyType, Text> oc,
+                Reporter reporter) throws IOException {
+
+            // Split the line
+            List<Text> fields = Library.splitLine(val, '');
+            if (fields.size() != 9) return;
+
+            oc.collect(new MyType(fields.get(3), fields.get(2), fields.get(6)),
+                val);
+        }
+    }
+
+    public static class MyPartitioner implements Partitioner<MyType, Text> {
+
+        public Map<Character, Integer> map;
+
+        public int getPartition(MyType key, Text value, int numPartitions) {
+            int rc = 0;
+            if (key.query_term == null || key.query_term.length() < 2) return 39;
+            if (key.query_term.charAt(0) > ']') rc += 20;
+            rc += map.get(key.query_term.charAt(1));
+            return rc;
+        }
+
+        public void configure(JobConf conf) {
+            // Don't actually do any configuration, do the setup of the hash
+            // because this call is guaranteed to be made each time we set up
+            // MyPartitioner
+            map = new HashMap<Character, Integer>(57);
+            map.put('A', 0);
+            map.put('B', 1);
+            map.put('C', 2);
+            map.put('D', 3);
+            map.put('E', 4);
+            map.put('F', 5);
+            map.put('G', 6);
+            map.put('I', 7);
+            map.put('H', 8);
+            map.put('J', 9);
+            map.put('K', 10);
+            map.put('L', 11);
+            map.put('M', 12);
+            map.put('N', 13);
+            map.put('O', 14);
+            map.put('P', 15);
+            map.put('Q', 16);
+            map.put('R', 17);
+            map.put('S', 18);
+            map.put('T', 19);
+            map.put('U', 0);
+            map.put('V', 1);
+            map.put('W', 2);
+            map.put('X', 3);
+            map.put('Y', 4);
+            map.put('Z', 5);
+            map.put('[', 6);
+            map.put('\\', 7);
+            map.put(']', 8);
+            map.put('^', 9);
+            map.put('_', 10);
+            map.put('`', 11);
+            map.put('a', 12);
+            map.put('b', 13);
+            map.put('c', 14);
+            map.put('d', 15);
+            map.put('e', 16);
+            map.put('f', 17);
+            map.put('g', 18);
+            map.put('h', 19);
+            map.put('i', 0);
+            map.put('j', 1);
+            map.put('k', 2);
+            map.put('l', 3);
+            map.put('m', 4);
+            map.put('n', 5);
+            map.put('o', 6);
+            map.put('p', 7);
+            map.put('q', 8);
+            map.put('r', 9);
+            map.put('s', 10);
+            map.put('t', 11);
+            map.put('u', 12);
+            map.put('v', 13);
+            map.put('w', 14);
+            map.put('x', 15);
+            map.put('y', 16);
+            map.put('z', 17);
+        }
+    }
+
+    public static class Group extends MapReduceBase
+        implements Reducer<MyType, Text, MyType, Text> {
+
+        public void reduce(
+                MyType key,
+                Iterator<Text> iter, 
+                OutputCollector<MyType, Text> oc,
+                Reporter reporter) throws IOException {
+            while (iter.hasNext()) {
+                oc.collect(null, iter.next());
+            }
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        if (args.length!=3) {
+            System.out.println("Parameters: inputDir outputDir parallel");
+            System.exit(1);
+        }
+        String inputDir = args[0];
+        String outputDir = args[1];
+        String parallel = args[2];
+        JobConf lp = new JobConf(L10.class);
+        lp.setJobName("L10 Load Page Views");
+        lp.setInputFormat(TextInputFormat.class);
+        lp.setOutputKeyClass(MyType.class);
+        lp.setOutputValueClass(Text.class);
+        lp.setMapperClass(ReadPageViews.class);
+        lp.setReducerClass(Group.class);
+        lp.setPartitionerClass(MyPartitioner.class);
+        Properties props = System.getProperties();
+        for (Map.Entry<Object,Object> entry : props.entrySet()) {
+            lp.set((String)entry.getKey(), (String)entry.getValue());
+        }
+        FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+        FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L10out"));
+        // Hardcode the parallel to 40 since MyPartitioner assumes it
+        lp.setNumReduceTasks(40);
+        Job group = new Job(lp);
+
+        JobControl jc = new JobControl("L10 join");
+        jc.addJob(group);
+
+        new Thread(jc).start();
+   
+        int i = 0;
+        while(!jc.allFinished()){
+            ArrayList<Job> failures = jc.getFailedJobs();
+            if (failures != null && failures.size() > 0) {
+                for (Job failure : failures) {
+                    System.err.println(failure.getMessage());
+                }
+                break;
+            }
+
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {}
+
+            if (i % 10000 == 0) {
+                System.out.println("Running jobs");
+                ArrayList<Job> running = jc.getRunningJobs();
+                if (running != null && running.size() > 0) {
+                    for (Job r : running) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Ready jobs");
+                ArrayList<Job> ready = jc.getReadyJobs();
+                if (ready != null && ready.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Waiting jobs");
+                ArrayList<Job> waiting = jc.getWaitingJobs();
+                if (waiting != null && waiting.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Successful jobs");
+                ArrayList<Job> success = jc.getSuccessfulJobs();
+                if (success != null && success.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+            }
+            i++;
+        }
+        ArrayList<Job> failures = jc.getFailedJobs();
+        if (failures != null && failures.size() > 0) {
+            for (Job failure : failures) {
+                System.err.println(failure.getMessage());
+            }
+        }
+        jc.stop();
+    }
+
+}

Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L11.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L11.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L11.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L11.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,217 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L11 {
+
+    public static class ReadPageViews extends MapReduceBase
+        implements Mapper<LongWritable, Text, Text, Text>,
+        Reducer<Text, Text, Text, Text> {
+
+        public void map(
+                LongWritable k,
+                Text val,
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+            List<Text> fields = Library.splitLine(val, '');
+            oc.collect(fields.get(0), new Text());
+        }
+
+        public void reduce(
+                Text key,
+                Iterator<Text> iter,
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+
+            // Just take the key and the first value.
+            oc.collect(key, iter.next());
+        }
+    }
+
+    public static class ReadWideRow extends MapReduceBase
+        implements Mapper<LongWritable, Text, Text, Text>,
+        Reducer<Text, Text, Text, Text> {
+
+        public void map(
+                LongWritable k,
+                Text val,
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+            List<Text> fields = Library.splitLine(val, '');
+            oc.collect(fields.get(0), new Text());
+        }
+
+        public void reduce(
+                Text key,
+                Iterator<Text> iter,
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+            // Just take the key and the first value.
+            oc.collect(key, iter.next());
+        }
+    }
+
+    public static class Union extends MapReduceBase
+        implements Reducer<Text, Text, Text, Text> {
+
+        public void reduce(
+                Text key,
+                Iterator<Text> iter, 
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+            // Just take the key and the first value.
+            oc.collect(key, iter.next());
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        if (args.length!=3) {
+            System.out.println("Parameters: inputDir outputDir parallel");
+            System.exit(1);
+        }
+        String inputDir = args[0];
+        String outputDir = args[1];
+        String parallel = args[2];
+        String user = System.getProperty("user.name");
+        JobConf lp = new JobConf(L11.class);
+        lp.setJobName("L11 Load Page Views");
+        lp.setInputFormat(TextInputFormat.class);
+        lp.setOutputKeyClass(Text.class);
+        lp.setOutputValueClass(Text.class);
+        lp.setMapperClass(ReadPageViews.class);
+        lp.setCombinerClass(ReadPageViews.class);
+        lp.setReducerClass(ReadPageViews.class);
+        Properties props = System.getProperties();
+        for (Map.Entry<Object,Object> entry : props.entrySet()) {
+            lp.set((String)entry.getKey(), (String)entry.getValue());
+        }
+        FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+        FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/p"));
+        lp.setNumReduceTasks(Integer.parseInt(parallel));
+        Job loadPages = new Job(lp);
+
+        JobConf lu = new JobConf(L11.class);
+        lu.setJobName("L11 Load Widerow");
+        lu.setInputFormat(TextInputFormat.class);
+        lu.setOutputKeyClass(Text.class);
+        lu.setOutputValueClass(Text.class);
+        lu.setMapperClass(ReadWideRow.class);
+        lu.setCombinerClass(ReadWideRow.class);
+        lu.setReducerClass(ReadWideRow.class);
+        props = System.getProperties();
+        for (Map.Entry<Object,Object> entry : props.entrySet()) {
+            lu.set((String)entry.getKey(), (String)entry.getValue());
+        }
+        FileInputFormat.addInputPath(lu, new Path(inputDir + "/widerow"));
+        FileOutputFormat.setOutputPath(lu, new Path(outputDir + "/wr"));
+        lu.setNumReduceTasks(Integer.parseInt(parallel));
+        Job loadWideRow = new Job(lu);
+
+        JobConf join = new JobConf(L11.class);
+        join.setJobName("L11 Union WideRow and Pages");
+        join.setInputFormat(KeyValueTextInputFormat.class);
+        join.setOutputKeyClass(Text.class);
+        join.setOutputValueClass(Text.class);
+        join.setMapperClass(IdentityMapper.class);
+        join.setCombinerClass(Union.class);
+        join.setReducerClass(Union.class);
+        props = System.getProperties();
+        for (Map.Entry<Object,Object> entry : props.entrySet()) {
+            join.set((String)entry.getKey(), (String)entry.getValue());
+        }
+        FileInputFormat.addInputPath(join, new Path(outputDir + "/p"));
+        FileInputFormat.addInputPath(join, new Path(outputDir + "/wr"));
+        FileOutputFormat.setOutputPath(join, new Path(outputDir + "/L11out"));
+        join.setNumReduceTasks(Integer.parseInt(parallel));
+        Job joinJob = new Job(join);
+        joinJob.addDependingJob(loadPages);
+        joinJob.addDependingJob(loadWideRow);
+
+        JobControl jc = new JobControl("L11 join");
+        jc.addJob(loadPages);
+        jc.addJob(loadWideRow);
+        jc.addJob(joinJob);
+
+        new Thread(jc).start();
+   
+        int i = 0;
+        while(!jc.allFinished()){
+            ArrayList<Job> failures = jc.getFailedJobs();
+            if (failures != null && failures.size() > 0) {
+                for (Job failure : failures) {
+                    System.err.println(failure.getMessage());
+                }
+                break;
+            }
+
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {}
+
+            if (i % 10000 == 0) {
+                System.out.println("Running jobs");
+                ArrayList<Job> running = jc.getRunningJobs();
+                if (running != null && running.size() > 0) {
+                    for (Job r : running) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Ready jobs");
+                ArrayList<Job> ready = jc.getReadyJobs();
+                if (ready != null && ready.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Waiting jobs");
+                ArrayList<Job> waiting = jc.getWaitingJobs();
+                if (waiting != null && waiting.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Successful jobs");
+                ArrayList<Job> success = jc.getSuccessfulJobs();
+                if (success != null && success.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+            }
+            i++;
+        }
+        ArrayList<Job> failures = jc.getFailedJobs();
+        if (failures != null && failures.size() > 0) {
+            for (Job failure : failures) {
+                System.err.println(failure.getMessage());
+            }
+        }
+        jc.stop();
+    }
+
+}