You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/04/16 01:13:10 UTC
svn commit: r1468268 [1/3] - in /pig/trunk: ./
test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/ test/perf/
test/perf/pigmix/ test/perf/pigmix/bin/ test/perf/pigmix/conf/
test/perf/pigmix/src/ test/perf/pigmix/src/java/ test/perf/pigmix/src/jav...
Author: daijy
Date: Mon Apr 15 23:13:09 2013
New Revision: 1468268
URL: http://svn.apache.org/r1468268
Log:
PIG-200: Pig Performance Benchmarks
Added:
pig/trunk/test/perf/
pig/trunk/test/perf/pigmix/
pig/trunk/test/perf/pigmix/bin/
pig/trunk/test/perf/pigmix/bin/generate_data.sh
pig/trunk/test/perf/pigmix/bin/runpigmix.pl
pig/trunk/test/perf/pigmix/build.xml
pig/trunk/test/perf/pigmix/conf/
pig/trunk/test/perf/pigmix/conf/config.sh
pig/trunk/test/perf/pigmix/src/
pig/trunk/test/perf/pigmix/src/java/
pig/trunk/test/perf/pigmix/src/java/org/
pig/trunk/test/perf/pigmix/src/java/org/apache/
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/DataGenerator.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L1.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L10.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L11.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L12.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L13.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L14.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L15.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L16.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L17.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L2.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L3.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L4.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L5.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L6.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L7.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L8.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L9.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/Library.java
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/
pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/PigPerformanceLoader.java
pig/trunk/test/perf/pigmix/src/pig/
pig/trunk/test/perf/pigmix/src/pig/L1.pig
pig/trunk/test/perf/pigmix/src/pig/L10.pig
pig/trunk/test/perf/pigmix/src/pig/L11.pig
pig/trunk/test/perf/pigmix/src/pig/L12.pig
pig/trunk/test/perf/pigmix/src/pig/L13.pig
pig/trunk/test/perf/pigmix/src/pig/L14.pig
pig/trunk/test/perf/pigmix/src/pig/L15.pig
pig/trunk/test/perf/pigmix/src/pig/L16.pig
pig/trunk/test/perf/pigmix/src/pig/L17.pig
pig/trunk/test/perf/pigmix/src/pig/L2.pig
pig/trunk/test/perf/pigmix/src/pig/L3.pig
pig/trunk/test/perf/pigmix/src/pig/L4.pig
pig/trunk/test/perf/pigmix/src/pig/L5.pig
pig/trunk/test/perf/pigmix/src/pig/L6.pig
pig/trunk/test/perf/pigmix/src/pig/L7.pig
pig/trunk/test/perf/pigmix/src/pig/L8.pig
pig/trunk/test/perf/pigmix/src/pig/L9.pig
Removed:
pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/PigPerformanceLoader.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/build.xml
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1468268&r1=1468267&r2=1468268&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 15 23:13:09 2013
@@ -28,6 +28,8 @@ PIG-3174: Remove rpm and deb artifacts
IMPROVEMENTS
+PIG-200: Pig Performance Benchmarks (daijy)
+
PIG-3261: User set PIG_CLASSPATH entries must be prepended to the CLASSPATH, not
appended (qwertymaniac via daijy)
Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1468268&r1=1468267&r2=1468268&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Mon Apr 15 23:13:09 2013
@@ -115,6 +115,9 @@
<!-- e2e test properties -->
<property name="test.e2e.dir" value="${basedir}/test/e2e/pig"/>
+ <!-- pigmix properties -->
+ <property name="pigmix.dir" value="${basedir}/test/perf/pigmix"/>
+
<!-- parser properties -->
<property name="src.gen.query.parser.dir" value="${src.gen.dir}/org/apache/pig/impl/logicalLayer/parser" />
<property name="src.gen.script.parser.dir" value="${src.gen.dir}/org/apache/pig/tools/pigscript/parser" />
@@ -518,7 +521,7 @@
<echo>*** To compile with all warnings enabled, supply -Dall.warnings=1 on command line ***</echo>
<echo>*** Else, you will only be warned about deprecations ***</echo>
<compileSources sources="${test.src.dir};${src.shims.test.dir}"
- excludes="**/PigTestLoader.java **/resources/**" dist="${test.build.classes}" cp="test.classpath" warnings="${javac.args.warnings}" />
+ excludes="**/PigTestLoader.java **/resources/** perf/**" dist="${test.build.classes}" cp="test.classpath" warnings="${javac.args.warnings}" />
<copy file="${basedir}/test/hbase-site.xml" tofile="${test.build.classes}/hbase-site.xml"/>
<ivy:cachepath pathid="mr-apps-test-ivy.classpath" conf="test" />
@@ -885,6 +888,14 @@
<ant dir="${test.e2e.dir}" target="deploy-local"/>
</target>
+ <target name="pigmix-deploy" depends="jar, jar-withouthadoop" description="deploy end-to-end tests to existing cluster">
+ <ant dir="${pigmix.dir}" target="deploy"/>
+ </target>
+
+ <target name="pigmix" depends="jar, jar-withouthadoop, piggybank" description="run end-to-end tests">
+ <ant dir="${pigmix.dir}" target="test"/>
+ </target>
+
<!-- ================================================================== -->
<!-- Pigunit -->
<!-- ================================================================== -->
Added: pig/trunk/test/perf/pigmix/bin/generate_data.sh
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/bin/generate_data.sh?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/bin/generate_data.sh (added)
+++ pig/trunk/test/perf/pigmix/bin/generate_data.sh Mon Apr 15 23:13:09 2013
@@ -0,0 +1,240 @@
+#!/bin/sh
+
+if [[ -z $PIG_HOME ]]
+then
+ echo "Please set PIG_HOME environment variable to where pig-withouthadoop.jar located."
+ exit 1
+fi
+
+if [[ -z $PIG_BIN ]]
+then
+ echo "Please set PIG_BIN environment variable to the pig script (usually $PIG_HOME/bin/pig)"
+ exit 1
+fi
+
+if [[ -z $PIGMIX_HOME ]]
+then
+ export PIGMIX_HOME=$PIG_HOME/test/perf/pigmix
+fi
+
+if [[ -z $HADOOP_HOME ]]
+then
+ echo "Please set HADOOP_HOME environment variable, make sure $HADOOP_HOME/bin/hadoop exists"
+ exit 1
+fi
+
+source $PIGMIX_HOME/conf/config.sh
+
+pigjar=$PIG_HOME/pig-withouthadoop.jar
+pigmixjar=$PIGMIX_HOME/pigmix.jar
+
+classpath=$pigjar:$pigmixjar
+
+export HADOOP_CLASSPATH=$classpath
+
+export PIG_OPTS="-Xmx1024m"
+export HADOOP_CLIENT_OPTS="-Xmx1024m"
+
+echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
+
+$HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot
+
+if [ $? -ne 0 ]
+then
+ echo "Fail to run $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot, perhaps the destination already exist, if so, you need to run:"
+ echo "$HADOOP_HOME/bin/hadoop fs -rmr $hdfsroot"
+ exit 1
+fi
+
+mainclass=org.apache.pig.test.pigmix.datagen.DataGenerator
+
+user_field=s:20:1600000:z:7
+action_field=i:1:2:u:0
+os_field=i:1:20:z:0
+query_term_field=s:10:1800000:z:20
+ip_addr_field=l:1:1000000:z:0
+timestamp_field=l:1:86400:z:0
+estimated_revenue_field=d:1:100000:z:5
+page_info_field=m:10:1:z:0
+page_links_field=bm:10:1:z:20
+
+echo "Generating $pages"
+
+pages=$hdfsroot/page_views
+
+$HADOOP_HOME/bin/hadoop jar $pigmixjar $mainclass \
+ -m $mappers -r $rows -f $pages $user_field \
+ $action_field $os_field $query_term_field $ip_addr_field \
+ $timestamp_field $estimated_revenue_field $page_info_field \
+ $page_links_field
+
+
+# Skim off 1 in 10 records for the user table
+# Be careful the file is in HDFS if you run previous job as hadoop job,
+# you should either copy data into local disk to run following script
+# or run hadoop job to trim the data
+
+protousers=$hdfsroot/protousers
+echo "Skimming users"
+$PIG_BIN << EOF
+register $pigmixjar;
+A = load '$pages' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user;
+C = distinct B parallel $mappers;
+D = order C by \$0 parallel $mappers;
+store D into '$protousers';
+EOF
+
+
+# Create users table, with now user field.
+phone_field=s:10:1600000:z:20
+address_field=s:20:1600000:z:20
+city_field=s:10:1600000:z:20
+state_field=s:2:1600:z:20
+zip_field=i:2:1600:z:20
+
+users=$hdfsroot/users
+
+echo "Generating $users"
+
+$HADOOP_HOME/bin/hadoop jar $pigmixjar $mainclass \
+ -m $mappers -i $protousers -f $users $phone_field \
+ $address_field $city_field $state_field $zip_field
+
+# Find unique keys for fragment replicate join testing
+# If file is in HDFS, extra steps are required
+numuniquekeys=500
+protopowerusers=$hdfsroot/proto_power_users
+echo "Skimming power users"
+
+$PIG_BIN << EOF
+register $pigmixjar;
+fs -rmr $protopowerusers;
+A = load '$pages' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user;
+C = distinct B parallel $mappers;
+D = order C by \$0 parallel $mappers;
+E = limit D 500;
+store E into '$protopowerusers';
+EOF
+
+echo "Generating $powerusers"
+
+powerusers=$hdfsroot/power_users
+
+$HADOOP_HOME/bin/hadoop jar $pigmixjar $mainclass \
+ -m $mappers -i $protopowerusers -f $powerusers $phone_field \
+ $address_field $city_field $state_field $zip_field
+
+echo "Generating widerow"
+
+widerows=$hdfsroot/widerow
+user_field=s:20:10000:z:0
+int_field=i:1:10000:u:0
+
+$HADOOP_HOME/bin/hadoop jar $pigmixjar $mainclass \
+ -m $mappers -r $widerowcnt -f $widerows $user_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field \
+ $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field $int_field
+
+widegroupbydata=$hdfsroot/widegroupbydata
+
+$PIG_BIN << EOF
+register $pigmixjar;
+A = load '$pages' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = order A by user parallel $mappers;
+store B into '${pages}_sorted' using PigStorage('\u0001');
+
+exec
+
+alpha = load '$users' using PigStorage('\u0001') as (name, phone, address, city, state, zip);
+a1 = order alpha by name parallel $mappers;
+store a1 into '${users}_sorted' using PigStorage('\u0001');
+
+exec
+
+a = load '$powerusers' using PigStorage('\u0001') as (name, phone, address, city, state, zip);
+b = sample a 0.5;
+store b into '${powerusers}_samples' using PigStorage('\u0001');
+
+exec
+
+A = load '$pages' as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links) using org.apache.pig.test.pigmix.udf.PigPerformanceLoader();
+B = foreach A generate user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links,
+user as user1, action as action1, timespent as timespent1, query_term as query_term1, ip_addr as ip_addr1, timestamp as timestamp1, estimated_revenue as estimated_revenue1, page_info as page_info1, page_links as page_links1,
+user as user2, action as action2, timespent as timespent2, query_term as query_term2, ip_addr as ip_addr2, timestamp as timestamp2, estimated_revenue as estimated_revenue2, page_info as page_info2, page_links as page_links2;
+store B into '$widegroupbydata' using PigStorage('\u0001');;
+EOF
+
+poweruserlocal=$localtmp/$powerusers
+poweruserlocalsingle=$localtmp/${powerusers}_single
+rm -fr $poweruserlocal
+rm $poweruserlocalsingle
+mkdir -p $poweruserlocal
+$PIG_BIN << EOF
+fs -copyToLocal ${powerusers}/* $poweruserlocal;
+EOF
+
+cat $poweruserlocal/* > poweruserlocalsingle
+
+$PIG_BIN << EOF
+fs -rmr $protousers;
+fs -rmr $powerusers;
+fs -rmr $protopowerusers;
+fs -copyFromLocal poweruserlocalsingle $hdfsroot/power_users;
+EOF
+
+rm -fr $poweruserlocal
+rm poweruserlocalsingle
Added: pig/trunk/test/perf/pigmix/bin/runpigmix.pl
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/bin/runpigmix.pl?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/bin/runpigmix.pl (added)
+++ pig/trunk/test/perf/pigmix/bin/runpigmix.pl Mon Apr 15 23:13:09 2013
@@ -0,0 +1,110 @@
+#!/usr/local/bin/perl -w
+
+if(scalar(@ARGV) < 6 )
+{
+ print STDERR "Usage: $0 <pig_home> <pig_bin> <pigmix_jar> <hadoop_home> <hadoop_bin> <pig mix scripts dir> <hdfs_root> <pigmix_output> [parallel] [numruns] [runmapreduce] \n";
+ exit(-1);
+}
+my $pighome = shift;
+my $pigbin = shift;
+my $pigmixjar = shift;
+my $hadoophome = shift;
+my $hadoopbin = shift;
+my $scriptdir = shift;
+my $hdfsroot = shift;
+my $pigmixoutput = shift;
+my $parallel = shift;
+my $runs = shift;
+my $runmapreduce = shift;
+my $pigjar = "$pighome/pig-withouthadoop.jar";
+if(!defined($parallel)) {
+ $parallel = 40;
+}
+if(!defined($runs)) {
+ $runs = 3;
+}
+if(!defined($runmapreduce)) {
+ $runmapreduce = 1;
+}
+
+$ENV{'HADOOP_HOME'} = $hadoophome;
+$ENV{'HADOOP_CLIENT_OPTS'}="-Xmx1024m";
+
+my $cmd;
+my $total_pig_times = 0;
+my $total_mr_times = 0;
+
+print STDERR "Removing output dir $pigmixoutput \n";
+$cmd = "$hadoopbin fs -rmr $pigmixoutput";
+print STDERR "Going to run $cmd\n";
+print STDERR `$cmd 2>&1`;
+
+for(my $i = 1; $i <= 17; $i++) {
+ my $pig_times = 0;
+ for(my $j = 0; $j < $runs; $j++) {
+ print STDERR "Running Pig Query L".$i."\n";
+ print STDERR "L".$i.":";
+ print STDERR "Going to run $pigbin $scriptdir/L".$i.".pig\n";
+ my $s = time();
+ $cmd = "$pigbin -param PIGMIX_JAR=$pigmixjar -param HDFS_ROOT=$hdfsroot -param PIGMIX_OUTPUT=$pigmixoutput/pig -param PARALLEL=$parallel $scriptdir/L". $i.".pig" ;
+ print STDERR `$cmd 2>&1`;
+ my $e = time();
+ $pig_times += $e - $s;
+ cleanup($i);
+ }
+ # find avg
+ $pig_times = $pig_times/$runs;
+ # round to next second
+ $pig_times = int($pig_times + 0.5);
+ $total_pig_times = $total_pig_times + $pig_times;
+
+ if ($runmapreduce==0) {
+ print "PigMix_$i pig run time: $pig_times\n";
+ }
+ else {
+ $mr_times = 0;
+ for(my $j = 0; $j < $runs; $j++) {
+ print STDERR "Running Map-Reduce Query L".$i."\n";
+ print STDERR "L".$i.":";
+ print STDERR "Going to run $hadoopbin jar $pigmixjar org.apache.pig.test.pigmix.mapreduce.L"."$i $hdfsroot $pigmixoutput/mapreduce $parallel\n";
+ my $s = time();
+ $cmd = "$hadoopbin jar $pigmixjar org.apache.pig.test.pigmix.mapreduce.L$i $hdfsroot $pigmixoutput/mapreduce $parallel";
+ print STDERR `$cmd 2>&1`;
+ my $e = time();
+ $mr_times += $e - $s;
+ cleanup($i);
+ }
+ # find avg
+ $mr_times = $mr_times/$runs;
+ # round to next second
+ $mr_times = int($mr_times + 0.5);
+ $total_mr_times = $total_mr_times + $mr_times;
+
+ my $multiplier = $pig_times/$mr_times;
+ print "PigMix_$i pig run time: $pig_times, java run time: $mr_times, multiplier: $multiplier\n";
+ }
+}
+
+if ($runmapreduce==0) {
+ print "Total pig run time: $total_pig_times\n";
+}
+else {
+ my $total_multiplier = $total_pig_times / $total_mr_times;
+ print "Total pig run time: $total_pig_times, total java time: $total_mr_times, multiplier: $total_multiplier\n";
+}
+
+sub cleanup {
+ my $suffix = shift;
+ my $cmd;
+ $cmd = "$pigbin -e rmf L".$suffix."out";
+ print STDERR `$cmd 2>&1`;
+ $cmd = "$pigbin -e rmf highest_value_page_per_user";
+ print STDERR `$cmd 2>&1`;
+ $cmd = "$pigbin -e rmf total_timespent_per_term";
+ print STDERR `$cmd 2>&1`;
+ $cmd = "$pigbin -e rmf queries_per_action";
+ print STDERR `$cmd 2>&1`;
+ $cmd = "$pigbin -e rmf tmp";
+ print STDERR `$cmd 2>&1`;
+}
+
Added: pig/trunk/test/perf/pigmix/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/build.xml?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/build.xml (added)
+++ pig/trunk/test/perf/pigmix/build.xml Mon Apr 15 23:13:09 2013
@@ -0,0 +1,102 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project name="PigMix" default="jar">
+
+ <property name="pig.dir" value="${basedir}/../../.."/>
+ <property name="pig.jar" value="${pig.dir}/pig.jar"/>
+ <property name="ivy.dir" location="${pig.dir}/ivy" />
+ <loadproperties srcfile="${ivy.dir}/libraries.properties"/>
+
+ <path id="classpath">
+ <fileset dir="${basedir}/lib">
+ <include name="*.jar"/>
+ </fileset>
+ <fileset file="${pig.jar}" />
+ </path>
+
+ <property name="java.dir" value="${basedir}/src/java"/>
+ <property name="pigmix.build.dir" value="${basedir}/build"/>
+ <property name="pigmix.jar" value="${basedir}/pigmix.jar"/>
+
+ <condition property="harness.cluster.conf" value="${harness.hadoop.home}/conf">
+ <not>
+ <isset property="harness.cluster.conf"/>
+ </not>
+ </condition>
+
+ <!-- Build the UDFs -->
+ <target name="pig-jar-available">
+ <available property="pig.jar.available" file="${pig.jar}"/>
+ <fail message="You must build the main Pig jar before running these tests"
+ unless="pig.jar.available"/>
+ </target>
+
+ <target name="clean">
+ <delete dir="${pigmix.build.dir}" />
+ <delete file="${pigmix.jar}" />
+ </target>
+
+ <target name="compile" depends="pig-jar-available">
+ <mkdir dir="${pigmix.build.dir}" />
+ <javac srcdir="${java.dir}" destdir="${pigmix.build.dir}" debug="on">
+ <classpath refid="classpath" />
+ </javac>
+ </target>
+
+ <target name="jar" depends="compile">
+ <jar jarfile="${pigmix.jar}">
+ <fileset dir="build"/>
+ <zipfileset src="lib/sdsuLibJKD12.jar"/>
+ </jar>
+ </target>
+
+ <target name="property-check">
+ <fail message="Please set the property harness.hadoop.home to HADOOP_HOME, make sure ${harness.hadoop.home}/bin/hadoop exists."
+ unless="harness.hadoop.home"/>
+ </target>
+
+ <target name="deploy" depends="jar, property-check">
+ <exec executable="bash" failonerror="true">
+ <arg value="${basedir}/bin/generate_data.sh"/>
+ <env key="PIG_HOME" value="${pig.dir}"/>
+ <env key="PIG_BIN" value="${pig.dir}/bin/pig"/>
+ <env key="PIGMIX_HOME" value="${basedir}"/>
+ <env key="HADOOP_HOME" value="${harness.hadoop.home}"/>
+ <!--env key="HADOOP_CONF_DIR" value="${harness.cluster.conf}"/-->
+ </exec>
+ </target>
+
+ <target name="test" depends="jar, property-check">
+ <exec executable="perl" failonerror="true">
+ <arg value="${basedir}/bin/runpigmix.pl"/>
+ <arg value="${pig.dir}"/>
+ <arg value="${pig.dir}/bin/pig"/>
+ <arg value="${basedir}/pigmix.jar"/>
+ <arg value="${harness.hadoop.home}"/>
+ <arg value="${harness.hadoop.home}/bin/hadoop"/>
+ <arg value="${basedir}/src/pig"/>
+ <arg value="/user/pig/tests/data/pigmix"/>
+ <arg value="output"/>
+ <!--parallel is hardcoded, we can change but L9/L10 mapreduce program will still use 40 reduces-->
+ <arg value="40"/>
+ <!--number of runs-->
+ <arg value="1"/>
+ <!--env key="HADOOP_CONF_DIR" value="${harness.cluster.conf}"/-->
+ </exec>
+ </target>
+</project>
Added: pig/trunk/test/perf/pigmix/conf/config.sh
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/conf/config.sh?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/conf/config.sh (added)
+++ pig/trunk/test/perf/pigmix/conf/config.sh Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+hdfsroot=/user/pig/tests/data/pigmix
+localtmp=/tmp
+
+# configure the number of mappers for data generator
+mappers=90
+
+# ~1600 bytes per row for page_views (it is the base for most other inputs)
+rows=625000000
+
+# only used in L11 (widerow, ~2500 bytes per row)
+widerowcnt=10000000
Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/DataGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/DataGenerator.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/DataGenerator.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/datagen/DataGenerator.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.pigmix.datagen;
+
+import java.io.*;
+import java.lang.SecurityException;
+import java.text.ParseException;
+import java.util.*;
+
+import sdsu.algorithms.data.Zipf;
+
+import org.apache.pig.tools.cmdline.CmdLineParser;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+
+/**
+ * A tool to generate data for performance testing.
+ */
+public class DataGenerator extends Configured implements Tool {
+ ColSpec[] colSpecs;
+ long seed = -1;
+ long numRows = -1;
+ int numMappers = -1;
+ String outputFile;
+ String inFile;
+ char separator = '\u0001' ;
+ Random rand;
+
+ private String[] mapkey = { "a", "b", "c", "d", "e", "f", "g", "h", "i",
+ "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w",
+ "x", "y", "z"};
+
+ public static void main(String[] args) throws Exception {
+ DataGenerator dg = new DataGenerator();
+ try {
+ ToolRunner.run(new Configuration(), dg, args);
+ }catch(Exception e) {
+ throw new IOException (e);
+ }
+ dg.go();
+ }
+
+ protected DataGenerator(long seed) {
+ System.out.println("Using seed " + seed);
+ rand = new Random(seed);
+ }
+
+ protected DataGenerator() {
+
+ }
+
+ protected DataGenerator(String[] args) {
+
+ }
+
+ public int run(String[] args) throws Exception {
+ CmdLineParser opts = new CmdLineParser(args);
+ opts.registerOpt('e', "seed", CmdLineParser.ValueExpected.REQUIRED);
+ opts.registerOpt('f', "file", CmdLineParser.ValueExpected.REQUIRED);
+ opts.registerOpt('r', "rows", CmdLineParser.ValueExpected.REQUIRED);
+ opts.registerOpt('s', "separator", CmdLineParser.ValueExpected.REQUIRED);
+ opts.registerOpt('i', "input", CmdLineParser.ValueExpected.REQUIRED);
+ opts.registerOpt('m', "mappers", CmdLineParser.ValueExpected.OPTIONAL);
+
+ char opt;
+ try {
+ while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
+ switch (opt) {
+ case 'e':
+ seed = Long.valueOf(opts.getValStr());
+ break;
+
+ case 'f':
+ outputFile = opts.getValStr();
+ break;
+
+ case 'i':
+ inFile = opts.getValStr();
+ break;
+
+ case 'r':
+ numRows = Long.valueOf(opts.getValStr());
+ break;
+
+ case 's':
+ separator = opts.getValStr().charAt(0);
+ break;
+
+ case 'm':
+ numMappers = Integer.valueOf(opts.getValStr());
+ break;
+
+ default:
+ usage();
+ break;
+ }
+ }
+ } catch (ParseException pe) {
+ System.err.println("Couldn't parse the command line arguments, " +
+ pe.getMessage());
+ usage();
+ }
+
+ if (numRows < 1 && inFile == null) usage();
+
+ if (numRows > 0 && inFile != null) usage();
+
+ if (numMappers > 0 && seed != -1) usage();
+
+ if (seed == -1){
+ seed = System.currentTimeMillis();
+ }
+
+ String remainders[] = opts.getRemainingArgs();
+ colSpecs = new ColSpec[remainders.length];
+ for (int i = 0; i < remainders.length; i++) {
+ colSpecs[i] = new ColSpec(remainders[i]);
+ }
+ System.err.println("Using seed " + seed);
+ rand = new Random(seed);
+
+ return 0;
+ }
+
+ private void go() throws IOException {
+ long t1 = System.currentTimeMillis();
+ if (numMappers <= 0) {
+ System.out.println("Generate data in local mode.");
+ goLocal();
+ }else{
+ System.out.println("Generate data in hadoop mode.");
+ HadoopRunner runner = new HadoopRunner();
+ runner.goHadoop();
+ }
+
+ long t2 = System.currentTimeMillis();
+ System.out.println("Job is successful! It took " + (t2-t1)/1000 + " seconds.");
+ }
+
+ public void goLocal() throws IOException {
+
+ PrintWriter out = null;
+ try {
+ out = new PrintWriter(outputFile);
+ } catch (FileNotFoundException fnfe) {
+ System.err.println("Could not find file " + outputFile +
+ ", " + fnfe.getMessage());
+ return;
+ } catch (SecurityException se) {
+ System.err.println("Could not write to file " + outputFile +
+ ", " + se.getMessage());
+ return;
+ }
+
+ BufferedReader in = null;
+ if (inFile != null) {
+ try {
+ in = new BufferedReader(new FileReader(inFile));
+ } catch (FileNotFoundException fnfe) {
+ System.err.println("Unable to find input file " + inFile);
+ return;
+ }
+ }
+
+ if (numRows > 0) {
+ for (int i = 0; i < numRows; i++) {
+ writeLine(out);
+ out.println();
+ }
+ } else if (in != null) {
+ String line;
+ while ((line = in.readLine()) != null) {
+ out.print(line);
+ writeLine(out);
+ out.println();
+ }
+ }
+ out.close();
+ }
+
+ protected void writeLine(PrintWriter out) {
+ for (int j = 0; j < colSpecs.length; j++) {
+ if (j != 0) out.print(separator);
+ // First, decide if it's going to be null
+ if (rand.nextInt(100) < colSpecs[j].pctNull) {
+ continue;
+ }
+ writeCol(colSpecs[j], out);
+ }
+ }
+
+ private void writeCol(ColSpec colspec, PrintWriter out) {
+ switch (colspec.datatype) {
+ case INT:
+ out.print(colspec.nextInt());
+ break;
+
+ case LONG:
+ out.print(colspec.nextLong());
+ break;
+
+ case FLOAT:
+ out.print(colspec.nextFloat());
+ break;
+
+ case DOUBLE:
+ out.print(colspec.nextDouble());
+ break;
+
+ case STRING:
+ out.print(colspec.nextString());
+ break;
+
+ case MAP:
+ int len = rand.nextInt(20) + 6;
+ for (int k = 0; k < len; k++) {
+ if (k != 0) out.print('');
+ out.print(mapkey[k] + '');
+ out.print(colspec.gen.randomString());
+ }
+ break;
+
+ case BAG:
+ int numElements = rand.nextInt(5) + 5;
+ for (int i = 0; i < numElements; i++) {
+ if (i != 0) out.print('');
+ switch(colspec.contained.datatype) {
+ case INT: out.print("i"); break;
+ case LONG: out.print("l"); break;
+ case FLOAT: out.print("f"); break;
+ case DOUBLE: out.print("d"); break;
+ case STRING: out.print("s"); break;
+ case MAP: out.print("m"); break;
+ case BAG: out.print("b"); break;
+ default: throw new RuntimeException("should never be here");
+ }
+ writeCol(colspec.contained, out);
+ }
+ }
+ }
+
+ private void usage() {
+ System.err.println("Usage: datagen -rows numrows [options] colspec ...");
+ System.err.println("\tOptions:");
+ System.err.println("\t-e -seed seed value for random numbers");
+ System.err.println("\t-f -file output file, default is stdout");
+ System.err.println("\t-i -input input file, lines will be read from");
+ System.err.println("\t\tthe file and additional columns appended.");
+ System.err.println("\t\tMutually exclusive with -r.");
+ System.err.println("\t-r -rows number of rows to output");
+ System.err.println("\t-s -separator character, default is ^A");
+ System.err.println("\t-m -number of mappers to run concurrently to generate data. " +
+ "If not specified, DataGenerator runs locally. This option can NOT be used with -e.");
+ System.err.println();
+ System.err.print("\tcolspec: columntype:average_size:cardinality:");
+ System.err.println("distribution_type:percent_null");
+ System.err.println("\tcolumntype:");
+ System.err.println("\t\ti = int");
+ System.err.println("\t\tl = long");
+ System.err.println("\t\tf = float");
+ System.err.println("\t\td = double");
+ System.err.println("\t\ts = string");
+ System.err.println("\t\tm = map");
+ System.err.println("\t\tbx = bag of x, where x is a columntype");
+ System.err.println("\tdistribution_type:");
+ System.err.println("\t\tu = uniform");
+ System.err.println("\t\tz = zipf");
+
+ throw new RuntimeException();
+ }
+
+
+
+ static enum Datatype { INT, LONG, FLOAT, DOUBLE, STRING, MAP, BAG };
+ static enum DistributionType { UNIFORM, ZIPF };
+ protected class ColSpec {
+ String arg;
+ Datatype datatype;
+ DistributionType distype;
+ int avgsz;
+ int card;
+ RandomGenerator gen;
+ int pctNull;
+ ColSpec contained;
+ String mapfile;
+ Map<Integer, Object> map;
+
+ public ColSpec(String arg) {
+ this.arg = arg;
+
+ String[] parts = arg.split(":");
+ if (parts.length != 5 && parts.length != 6) {
+ System.err.println("Colspec [" + arg + "] format incorrect");
+ usage();
+ }
+
+ switch (parts[0].charAt(0)) {
+ case 'i': datatype = Datatype.INT; break;
+ case 'l': datatype = Datatype.LONG; break;
+ case 'f': datatype = Datatype.FLOAT; break;
+ case 'd': datatype = Datatype.DOUBLE; break;
+ case 's': datatype = Datatype.STRING; break;
+ case 'm': datatype = Datatype.MAP; break;
+ case 'b':
+ datatype = Datatype.BAG;
+ contained = new ColSpec(arg.substring(1));
+ return;
+ default:
+ System.err.println("Don't know column type " +
+ parts[0].charAt(0));
+ usage();
+ break;
+ }
+ avgsz = Integer.valueOf(parts[1]);
+ card = Integer.valueOf(parts[2]);
+ switch (parts[3].charAt(0)) {
+ case 'u':
+ gen = new UniformRandomGenerator(avgsz, card);
+ distype = DistributionType.UNIFORM;
+ break;
+
+ case 'z':
+ gen = new ZipfRandomGenerator(avgsz, card);
+ distype = DistributionType.ZIPF;
+ break;
+
+ default:
+ System.err.println("Don't know generator type " +
+ parts[3].charAt(0));
+ usage();
+ break;
+ }
+
+ pctNull = Integer.valueOf(parts[4]);
+ if (pctNull > 100) {
+ System.err.println("Percentage null must be between 0-100, "
+ + "you gave" + pctNull);
+ usage();
+ }
+ contained = null;
+
+ // if config has 6 columns, the last col is the file name
+ // of the mapping file from random number to field value
+ if (parts.length == 6) {
+ mapfile = parts[5];
+ gen.hasMapFile = true;
+ }
+
+ map = new HashMap<Integer, Object>();
+ }
+
+ public int nextInt() {
+ return gen.nextInt(map);
+ }
+
+ public long nextLong() {
+ return gen.nextInt(map);
+ }
+
+ public double nextDouble() {
+ return gen.nextDouble(map);
+ }
+
+ public float nextFloat() {
+ return gen.nextFloat(map);
+ }
+
+ public String nextString() {
+ return gen.nextString(map);
+ }
+ }
+
+ abstract class RandomGenerator {
+
+ protected int avgsz;
+ protected boolean hasMapFile; // indicating whether a map file from
+ // random number to the field value is pre-defined
+
+ abstract public int nextInt(Map<Integer, Object> map);
+ abstract public long nextLong(Map<Integer, Object> map);
+ abstract public float nextFloat(Map<Integer, Object> map);
+ abstract public double nextDouble(Map<Integer, Object> map);
+ abstract public String nextString(Map<Integer, Object> map);
+
+ public String randomString() {
+ int var = (int)((double)avgsz * 0.3);
+ StringBuffer sb = new StringBuffer(avgsz + var);
+ if (var < 1) var = 1;
+ int len = rand.nextInt(2 * var) + avgsz - var;
+ for (int i = 0; i < len; i++) {
+ int n = rand.nextInt(122 - 65) + 65;
+ sb.append(Character.toChars(n));
+ }
+ return sb.toString();
+ }
+
+ public float randomFloat() {
+ return rand.nextFloat() * rand.nextInt();
+ }
+
+ public double randomDouble() {
+ return rand.nextDouble() * rand.nextInt();
+ }
+ }
+
+ class UniformRandomGenerator extends RandomGenerator {
+ int card;
+
+ public UniformRandomGenerator(int a, int c) {
+ avgsz = a;
+ card = c;
+ }
+
+ public int nextInt(Map<Integer, Object> map) {
+ return rand.nextInt(card);
+ }
+
+ public long nextLong(Map<Integer, Object> map) {
+ return rand.nextLong() % card;
+ }
+
+ public float nextFloat(Map<Integer, Object> map) {
+ int seed = rand.nextInt(card);
+ Float f = (Float)map.get(seed);
+ if (f == null) {
+ if (!hasMapFile) {
+ f = randomFloat();
+ map.put(seed, f);
+ }else{
+ throw new IllegalStateException("Number " + seed + " is not found in map file");
+ }
+ }
+ return f;
+ }
+
+ public double nextDouble(Map<Integer, Object> map) {
+ int seed = rand.nextInt(card);
+ Double d = (Double)map.get(seed);
+ if (d == null) {
+ if (!hasMapFile) {
+ d = randomDouble();
+ map.put(seed, d);
+ }else{
+ throw new IllegalStateException("Number " + seed + " is not found in map file");
+ }
+ }
+ return d;
+ }
+
+ public String nextString(Map<Integer, Object> map) {
+ int seed = rand.nextInt(card);
+ String s = (String)map.get(seed);
+ if (s == null) {
+ if (!hasMapFile) {
+ s = randomString();
+ map.put(seed, s);
+ }else{
+ throw new IllegalStateException("Number " + seed + " is not found in map file");
+ }
+ }
+ return s;
+ }
+
+ }
+
+ class ZipfRandomGenerator extends RandomGenerator {
+ Zipf z;
+
+ public ZipfRandomGenerator(int a, int c) {
+ avgsz = a;
+ z = new Zipf(c);
+ }
+
+
+ // the Zipf library returns a random number [1..cardinality], so we substract by 1
+ // to get [0..cardinality)
+ // the randome number returned by zipf library is an integer, but converted into double
+ private double next() {
+ return z.nextElement()-1;
+ }
+
+ public int nextInt(Map<Integer, Object> map) {
+ return (int)next();
+ }
+
+ public long nextLong(Map<Integer, Object> map) {
+ return (long)next();
+ }
+
+ public float nextFloat(Map<Integer, Object> map) {
+ int seed = (int)next();
+ Float d = (Float)map.get(seed);
+ if (d == null) {
+ if (!hasMapFile) {
+ d = randomFloat();
+ map.put(seed, d);
+ }else{
+ throw new IllegalStateException("Number " + seed + " is not found in map file");
+ }
+ }
+ return d;
+ }
+
+ public double nextDouble(Map<Integer, Object> map) {
+ int seed = (int)next();
+ Double d = (Double)map.get(seed);
+ if (d == null) {
+ if (!hasMapFile) {
+ d = randomDouble();
+ map.put(seed, d);
+ }else{
+ throw new IllegalStateException("Number " + seed + " is not found in map file");
+ }
+ }
+ return d;
+ }
+
+ public String nextString(Map<Integer, Object> map) {
+ int seed = (int)next();
+ String s = (String)map.get(seed);
+ if (s == null) {
+ if (!hasMapFile) {
+ s = randomString();
+ map.put(seed, s);
+ }else{
+ throw new IllegalStateException("Number " + seed + " is not found in map file");
+ }
+ }
+ return s;
+ }
+ }
+
+// launch hadoop job
+ class HadoopRunner {
+ Random r;
+ FileSystem fs;
+ Path tmpHome;
+
+ public HadoopRunner() {
+ r = new Random();
+ }
+
+ public void goHadoop() throws IOException {
+ // Configuration processed by ToolRunner
+ Configuration conf = getConf();
+
+ // Create a JobConf using the processed conf
+ JobConf job = new JobConf(conf);
+ fs = FileSystem.get(job);
+
+ tmpHome = createTempDir(null);
+
+ String config = genMapFiles().toUri().getRawPath();
+ // set config properties into job conf
+ job.set("fieldconfig", config);
+ job.set("separator", String.valueOf((int)separator));
+
+
+ job.setJobName("data-gen");
+ job.setNumMapTasks(numMappers);
+ job.setNumReduceTasks(0);
+ job.setMapperClass(DataGenMapper.class);
+ job.setJarByClass(DataGenMapper.class);
+
+ // if inFile is specified, use it as input
+ if (inFile != null) {
+ FileInputFormat.setInputPaths(job, inFile);
+ job.set("hasinput", "true");
+ } else {
+ job.set("hasinput", "false");
+ Path input = genInputFiles();
+ FileInputFormat.setInputPaths(job, input);
+ }
+ FileOutputFormat.setOutputPath(job, new Path(outputFile));
+
+ // Submit the job, then poll for progress until the job is complete
+ System.out.println("Submit hadoop job...");
+ RunningJob j = JobClient.runJob(job);
+ if (!j.isSuccessful()) {
+ throw new IOException("Job failed");
+ }
+
+ if (fs.exists(tmpHome)) {
+ fs.delete(tmpHome, true);
+ }
+ }
+
+ private Path genInputFiles() throws IOException {
+ long avgRows = numRows/numMappers;
+
+ // create a temp directory as mappers input
+ Path input = createTempDir(tmpHome);
+ System.out.println("Generating input files into " + input.toString());
+
+ long rowsLeft = numRows;
+
+ // create one input file per mapper, which contains
+ // the number of rows
+ for(int i=0; i<numMappers; i++) {
+ Object[] tmp = createTempFile(input, false);
+ PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
+
+ if (i < numMappers-1) {
+ pw.println(avgRows);
+ }else{
+ // last mapper takes all the rows left
+ pw.println(rowsLeft);
+ }
+
+ pw.close();
+ rowsLeft -= avgRows;
+ }
+
+ return input;
+ }
+
+ // generate map files for all the fields that need to pre-generate map files
+ // return a config file which contains config info for each field, including
+ // the path to their map file
+ private Path genMapFiles() throws IOException {
+ Object[] tmp = createTempFile(tmpHome, false);
+
+ System.out.println("Generating column config file in " + tmp[0].toString());
+ PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
+ for(int i=0; i<colSpecs.length; i++) {
+ DataGenerator.Datatype datatype = colSpecs[i].datatype;
+ pw.print(colSpecs[i].arg);
+
+ if ( datatype == DataGenerator.Datatype.FLOAT || datatype == DataGenerator.Datatype.DOUBLE ||
+ datatype == DataGenerator.Datatype.STRING) {
+ Path p = genMapFile(colSpecs[i]);
+ pw.print(':');
+ pw.print(p.toUri().getRawPath());
+ }
+
+ pw.println();
+ }
+
+ pw.close();
+
+ return (Path)tmp[0];
+ }
+
+ // genereate a map file between random number to field value
+ // return the path of the map file
+ private Path genMapFile(DataGenerator.ColSpec col) throws IOException {
+ int card = col.card;
+ Object[] tmp = createTempFile(tmpHome, false);
+
+ System.out.println("Generating mapping file for column " + col.arg + " into " + tmp[0].toString());
+ PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
+ HashSet<Object> hash = new HashSet<Object>(card);
+ for(int i=0; i<card; i++) {
+ pw.print(i);
+ pw.print("\t");
+ Object next = null;
+ do {
+ if (col.datatype == DataGenerator.Datatype.DOUBLE) {
+ next = col.gen.randomDouble();
+ }else if (col.datatype == DataGenerator.Datatype.FLOAT) {
+ next = col.gen.randomFloat();
+ }else if (col.datatype == DataGenerator.Datatype.STRING) {
+ next = col.gen.randomString();
+ }
+ }while(hash.contains(next));
+
+ hash.add(next);
+
+ pw.println(next);
+
+ if ( (i>0 && i%300000 == 0) || i == card-1 ) {
+ System.out.println("processed " + i*100/card + "%." );
+ pw.flush();
+ }
+ }
+
+ pw.close();
+
+ return (Path)tmp[0];
+
+ }
+
+ private Path createTempDir(Path parentDir) throws IOException {
+ Object[] obj = createTempFile(parentDir, true);
+ return (Path)obj[0];
+ }
+
+ private Object[] createTempFile(Path parentDir, boolean isDir) throws IOException {
+ Path tmp_home = parentDir;
+
+ if (tmp_home == null) {
+ tmp_home = new Path(fs.getHomeDirectory(), "tmp");
+ }
+
+ if (!fs.exists(tmp_home)) {
+ fs.mkdirs(tmp_home);
+ }
+
+ int id = r.nextInt();
+ Path f = new Path(tmp_home, "tmp" + id);
+ while (fs.exists(f)) {
+ id = r.nextInt();
+ f = new Path(tmp_home, "tmp" + id);
+ }
+
+ // return a 2-element array. first element is PATH,
+ // second element is OutputStream
+ Object[] r = new Object[2];
+ r[0] = f;
+ if (!isDir) {
+ r[1] = fs.create(f);
+ }else{
+ fs.mkdirs(f);
+ }
+
+ return r;
+ }
+ }
+
+ public static class DataGenMapper extends MapReduceBase implements Mapper<LongWritable, Text, String, String> {
+ private JobConf jobConf;
+ private DataGenerator dg;
+ private boolean hasInput;
+
+ public void configure(JobConf jobconf) {
+ this.jobConf = jobconf;
+
+ int id = Integer.parseInt(jobconf.get("mapred.task.partition"));
+ long time = System.currentTimeMillis() - id*3600*24*1000;
+
+ dg = new DataGenerator( ((time-id*3600*24*1000) | (id << 48)));
+
+ dg.separator = (char)Integer.parseInt(jobConf.get("separator"));
+
+ if (jobConf.get("hasinput").equals("true")) {
+ hasInput = true;
+ }
+
+ String config = jobConf.get("fieldconfig");
+
+ try {
+ FileSystem fs = FileSystem.get(jobconf);
+
+ // load in config file for each column
+ BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(new Path(config))));
+ String line = null;
+ List<DataGenerator.ColSpec> cols = new ArrayList<DataGenerator.ColSpec>();
+ while((line = reader.readLine()) != null) {
+ cols.add(dg.new ColSpec(line));
+ }
+ reader.close();
+ dg.colSpecs = cols.toArray(new DataGenerator.ColSpec[0]);
+
+ // load in mapping files
+ for(int i=0; i<dg.colSpecs.length; i++) {
+ DataGenerator.ColSpec col = dg.colSpecs[i];
+ if (col.mapfile != null) {
+ reader = new BufferedReader(new InputStreamReader(fs.open(new Path(col.mapfile))));
+ Map<Integer, Object> map = dg.colSpecs[i].map;
+ while((line = reader.readLine()) != null) {
+ String[] fields = line.split("\t");
+ int key = Integer.parseInt(fields[0]);
+ if (col.datatype == DataGenerator.Datatype.DOUBLE) {
+ map.put(key, Double.parseDouble(fields[1]));
+ }else if (col.datatype == DataGenerator.Datatype.FLOAT) {
+ map.put(key, Float.parseFloat(fields[1]));
+ }else {
+ map.put(key, fields[1]);
+ }
+ }
+
+ reader.close();
+ }
+ }
+ }catch(IOException e) {
+ throw new RuntimeException("Failed to load config file. " + e);
+ }
+ }
+
+ public void map(LongWritable key, Text value, OutputCollector<String, String> output, Reporter reporter) throws IOException {
+ int intialsz = dg.colSpecs.length * 50;
+
+ if (!hasInput) {
+ long numRows = Long.parseLong(value.toString().trim());
+ dg.numRows = numRows;
+
+ for (int i = 0; i < numRows; i++) {
+ StringWriter str = new StringWriter(intialsz);
+ PrintWriter pw = new PrintWriter(str);
+ dg.writeLine(pw);
+ output.collect(null, str.toString());
+
+ if ((i+1) % 10000 == 0) {
+ reporter.progress();
+ reporter.setStatus("" + (i+1) + " tuples generated.");
+ }
+ }
+ } else {
+ StringWriter str = new StringWriter(intialsz);
+ PrintWriter pw = new PrintWriter(str);
+ pw.write(value.toString());
+ dg.writeLine(pw);
+ output.collect(null, str.toString());
+ }
+ }
+ }
+}
+
+
+
+
Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L1.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L1.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L1.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L1.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,163 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L1 {
+
+ public static class ReadPageViews extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, IntWritable> {
+
+ public void map(
+ LongWritable k,
+ Text val,
+ OutputCollector<Text, IntWritable> oc,
+ Reporter reporter) throws IOException {
+
+ // Split the line
+ List<Text> fields = Library.splitLine(val, '');
+ if (fields.size() != 9) return;
+
+ int cnt = 0;
+ if (fields.get(1).toString() == "1") {
+ Text throwAway = Library.mapLookup(fields.get(7), new Text("a"));
+ cnt++;
+ } else {
+ List<Text> le = Library.splitLine(fields.get(8), '');
+ for (Text e : le) {
+ Text throwAway = Library.mapLookup(e, new Text("b"));
+ cnt++;
+ }
+ }
+ oc.collect(fields.get(0), new IntWritable(cnt));
+ }
+ }
+
+ public static class Group extends MapReduceBase
+ implements Reducer<Text, IntWritable, Text, IntWritable> {
+
+ public void reduce(
+ Text key,
+ Iterator<IntWritable> iter,
+ OutputCollector<Text, IntWritable> oc,
+ Reporter reporter) throws IOException {
+ int cnt = 0;
+ while (iter.hasNext()) {
+ cnt += iter.next().get();
+ }
+ oc.collect(key, new IntWritable(cnt));
+ reporter.setStatus("OK");
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ if (args.length!=3) {
+ System.out.println("Parameters: inputDir outputDir parallel");
+ System.exit(1);
+ }
+ String inputDir = args[0];
+ String outputDir = args[1];
+ String parallel = args[2];
+ JobConf lp = new JobConf(L1.class);
+ lp.setJobName("L1 Load Page Views");
+ lp.setInputFormat(TextInputFormat.class);
+ lp.setOutputKeyClass(Text.class);
+ lp.setOutputValueClass(IntWritable.class);
+ lp.setMapperClass(ReadPageViews.class);
+ lp.setCombinerClass(Group.class);
+ lp.setReducerClass(Group.class);
+ Properties props = System.getProperties();
+ for (Map.Entry<Object,Object> entry : props.entrySet()) {
+ lp.set((String)entry.getKey(), (String)entry.getValue());
+ }
+ FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+ FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L1out"));
+ lp.setNumReduceTasks(Integer.parseInt(parallel));
+ Job group = new Job(lp);
+
+ JobControl jc = new JobControl("L1 join");
+ jc.addJob(group);
+
+ new Thread(jc).start();
+
+ int i = 0;
+ while(!jc.allFinished()){
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ break;
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {}
+
+ if (i % 10000 == 0) {
+ System.out.println("Running jobs");
+ ArrayList<Job> running = jc.getRunningJobs();
+ if (running != null && running.size() > 0) {
+ for (Job r : running) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Ready jobs");
+ ArrayList<Job> ready = jc.getReadyJobs();
+ if (ready != null && ready.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Waiting jobs");
+ ArrayList<Job> waiting = jc.getWaitingJobs();
+ if (waiting != null && waiting.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Successful jobs");
+ ArrayList<Job> success = jc.getSuccessfulJobs();
+ if (success != null && success.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ }
+ i++;
+ }
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ }
+ jc.stop();
+ }
+
+}
Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L10.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L10.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L10.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L10.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,286 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L10 {
+
+ public static class MyType implements WritableComparable<MyType> {
+
+ public String query_term;
+ int timespent;
+ double estimated_revenue;
+
+ public MyType() {
+ query_term = null;
+ timespent = 0;
+ estimated_revenue = 0.0;
+ }
+
+ public MyType(Text qt, Text ts, Text er) {
+ query_term = qt.toString();
+ try {
+ timespent = Integer.valueOf(ts.toString());
+ } catch (NumberFormatException nfe) {
+ timespent = 0;
+ }
+ try {
+ estimated_revenue = Double.valueOf(er.toString());
+ } catch (NumberFormatException nfe) {
+ estimated_revenue = 0.0;
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(timespent);
+ out.writeDouble(estimated_revenue);
+ out.writeInt(query_term.length());
+ out.writeBytes(query_term);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ timespent = in.readInt();
+ estimated_revenue = in.readDouble();
+ int len = in.readInt();
+ byte[] b = new byte[len];
+ in.readFully(b);
+ query_term = new String(b);
+ }
+
+ public int compareTo(MyType other) {
+ int rc = query_term.compareTo(other.query_term);
+ if (rc != 0) return rc;
+ if (estimated_revenue < other.estimated_revenue) return 1;
+ else if (estimated_revenue > other.estimated_revenue) return -1;
+ if (timespent < other.timespent) return -1;
+ else if (timespent > other.timespent) return 1;
+ return 0;
+ }
+ }
+
+ public static class ReadPageViews extends MapReduceBase
+ implements Mapper<LongWritable, Text, MyType, Text> {
+
+ public void map(
+ LongWritable k,
+ Text val,
+ OutputCollector<MyType, Text> oc,
+ Reporter reporter) throws IOException {
+
+ // Split the line
+ List<Text> fields = Library.splitLine(val, '');
+ if (fields.size() != 9) return;
+
+ oc.collect(new MyType(fields.get(3), fields.get(2), fields.get(6)),
+ val);
+ }
+ }
+
+ public static class MyPartitioner implements Partitioner<MyType, Text> {
+
+ public Map<Character, Integer> map;
+
+ public int getPartition(MyType key, Text value, int numPartitions) {
+ int rc = 0;
+ if (key.query_term == null || key.query_term.length() < 2) return 39;
+ if (key.query_term.charAt(0) > ']') rc += 20;
+ rc += map.get(key.query_term.charAt(1));
+ return rc;
+ }
+
+ public void configure(JobConf conf) {
+ // Don't actually do any configuration, do the setup of the hash
+ // because this call is guaranteed to be made each time we set up
+ // MyPartitioner
+ map = new HashMap<Character, Integer>(57);
+ map.put('A', 0);
+ map.put('B', 1);
+ map.put('C', 2);
+ map.put('D', 3);
+ map.put('E', 4);
+ map.put('F', 5);
+ map.put('G', 6);
+ map.put('I', 7);
+ map.put('H', 8);
+ map.put('J', 9);
+ map.put('K', 10);
+ map.put('L', 11);
+ map.put('M', 12);
+ map.put('N', 13);
+ map.put('O', 14);
+ map.put('P', 15);
+ map.put('Q', 16);
+ map.put('R', 17);
+ map.put('S', 18);
+ map.put('T', 19);
+ map.put('U', 0);
+ map.put('V', 1);
+ map.put('W', 2);
+ map.put('X', 3);
+ map.put('Y', 4);
+ map.put('Z', 5);
+ map.put('[', 6);
+ map.put('\\', 7);
+ map.put(']', 8);
+ map.put('^', 9);
+ map.put('_', 10);
+ map.put('`', 11);
+ map.put('a', 12);
+ map.put('b', 13);
+ map.put('c', 14);
+ map.put('d', 15);
+ map.put('e', 16);
+ map.put('f', 17);
+ map.put('g', 18);
+ map.put('h', 19);
+ map.put('i', 0);
+ map.put('j', 1);
+ map.put('k', 2);
+ map.put('l', 3);
+ map.put('m', 4);
+ map.put('n', 5);
+ map.put('o', 6);
+ map.put('p', 7);
+ map.put('q', 8);
+ map.put('r', 9);
+ map.put('s', 10);
+ map.put('t', 11);
+ map.put('u', 12);
+ map.put('v', 13);
+ map.put('w', 14);
+ map.put('x', 15);
+ map.put('y', 16);
+ map.put('z', 17);
+ }
+ }
+
+ public static class Group extends MapReduceBase
+ implements Reducer<MyType, Text, MyType, Text> {
+
+ public void reduce(
+ MyType key,
+ Iterator<Text> iter,
+ OutputCollector<MyType, Text> oc,
+ Reporter reporter) throws IOException {
+ while (iter.hasNext()) {
+ oc.collect(null, iter.next());
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ if (args.length!=3) {
+ System.out.println("Parameters: inputDir outputDir parallel");
+ System.exit(1);
+ }
+ String inputDir = args[0];
+ String outputDir = args[1];
+ String parallel = args[2];
+ JobConf lp = new JobConf(L10.class);
+ lp.setJobName("L10 Load Page Views");
+ lp.setInputFormat(TextInputFormat.class);
+ lp.setOutputKeyClass(MyType.class);
+ lp.setOutputValueClass(Text.class);
+ lp.setMapperClass(ReadPageViews.class);
+ lp.setReducerClass(Group.class);
+ lp.setPartitionerClass(MyPartitioner.class);
+ Properties props = System.getProperties();
+ for (Map.Entry<Object,Object> entry : props.entrySet()) {
+ lp.set((String)entry.getKey(), (String)entry.getValue());
+ }
+ FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+ FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L10out"));
+ // Hardcode the parallel to 40 since MyPartitioner assumes it
+ lp.setNumReduceTasks(40);
+ Job group = new Job(lp);
+
+ JobControl jc = new JobControl("L10 join");
+ jc.addJob(group);
+
+ new Thread(jc).start();
+
+ int i = 0;
+ while(!jc.allFinished()){
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ break;
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {}
+
+ if (i % 10000 == 0) {
+ System.out.println("Running jobs");
+ ArrayList<Job> running = jc.getRunningJobs();
+ if (running != null && running.size() > 0) {
+ for (Job r : running) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Ready jobs");
+ ArrayList<Job> ready = jc.getReadyJobs();
+ if (ready != null && ready.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Waiting jobs");
+ ArrayList<Job> waiting = jc.getWaitingJobs();
+ if (waiting != null && waiting.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Successful jobs");
+ ArrayList<Job> success = jc.getSuccessfulJobs();
+ if (success != null && success.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ }
+ i++;
+ }
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ }
+ jc.stop();
+ }
+
+}
Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L11.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L11.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L11.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L11.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,217 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L11 {
+
+ public static class ReadPageViews extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, Text>,
+ Reducer<Text, Text, Text, Text> {
+
+ public void map(
+ LongWritable k,
+ Text val,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+ List<Text> fields = Library.splitLine(val, '');
+ oc.collect(fields.get(0), new Text());
+ }
+
+ public void reduce(
+ Text key,
+ Iterator<Text> iter,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+
+ // Just take the key and the first value.
+ oc.collect(key, iter.next());
+ }
+ }
+
+ public static class ReadWideRow extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, Text>,
+ Reducer<Text, Text, Text, Text> {
+
+ public void map(
+ LongWritable k,
+ Text val,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+ List<Text> fields = Library.splitLine(val, '');
+ oc.collect(fields.get(0), new Text());
+ }
+
+ public void reduce(
+ Text key,
+ Iterator<Text> iter,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+ // Just take the key and the first value.
+ oc.collect(key, iter.next());
+ }
+ }
+
+ public static class Union extends MapReduceBase
+ implements Reducer<Text, Text, Text, Text> {
+
+ public void reduce(
+ Text key,
+ Iterator<Text> iter,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+ // Just take the key and the first value.
+ oc.collect(key, iter.next());
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ if (args.length!=3) {
+ System.out.println("Parameters: inputDir outputDir parallel");
+ System.exit(1);
+ }
+ String inputDir = args[0];
+ String outputDir = args[1];
+ String parallel = args[2];
+ String user = System.getProperty("user.name");
+ JobConf lp = new JobConf(L11.class);
+ lp.setJobName("L11 Load Page Views");
+ lp.setInputFormat(TextInputFormat.class);
+ lp.setOutputKeyClass(Text.class);
+ lp.setOutputValueClass(Text.class);
+ lp.setMapperClass(ReadPageViews.class);
+ lp.setCombinerClass(ReadPageViews.class);
+ lp.setReducerClass(ReadPageViews.class);
+ Properties props = System.getProperties();
+ for (Map.Entry<Object,Object> entry : props.entrySet()) {
+ lp.set((String)entry.getKey(), (String)entry.getValue());
+ }
+ FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+ FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/p"));
+ lp.setNumReduceTasks(Integer.parseInt(parallel));
+ Job loadPages = new Job(lp);
+
+ JobConf lu = new JobConf(L11.class);
+ lu.setJobName("L11 Load Widerow");
+ lu.setInputFormat(TextInputFormat.class);
+ lu.setOutputKeyClass(Text.class);
+ lu.setOutputValueClass(Text.class);
+ lu.setMapperClass(ReadWideRow.class);
+ lu.setCombinerClass(ReadWideRow.class);
+ lu.setReducerClass(ReadWideRow.class);
+ props = System.getProperties();
+ for (Map.Entry<Object,Object> entry : props.entrySet()) {
+ lu.set((String)entry.getKey(), (String)entry.getValue());
+ }
+ FileInputFormat.addInputPath(lu, new Path(inputDir + "/widerow"));
+ FileOutputFormat.setOutputPath(lu, new Path(outputDir + "/wr"));
+ lu.setNumReduceTasks(Integer.parseInt(parallel));
+ Job loadWideRow = new Job(lu);
+
+ JobConf join = new JobConf(L11.class);
+ join.setJobName("L11 Union WideRow and Pages");
+ join.setInputFormat(KeyValueTextInputFormat.class);
+ join.setOutputKeyClass(Text.class);
+ join.setOutputValueClass(Text.class);
+ join.setMapperClass(IdentityMapper.class);
+ join.setCombinerClass(Union.class);
+ join.setReducerClass(Union.class);
+ props = System.getProperties();
+ for (Map.Entry<Object,Object> entry : props.entrySet()) {
+ join.set((String)entry.getKey(), (String)entry.getValue());
+ }
+ FileInputFormat.addInputPath(join, new Path(outputDir + "/p"));
+ FileInputFormat.addInputPath(join, new Path(outputDir + "/wr"));
+ FileOutputFormat.setOutputPath(join, new Path(outputDir + "/L11out"));
+ join.setNumReduceTasks(Integer.parseInt(parallel));
+ Job joinJob = new Job(join);
+ joinJob.addDependingJob(loadPages);
+ joinJob.addDependingJob(loadWideRow);
+
+ JobControl jc = new JobControl("L11 join");
+ jc.addJob(loadPages);
+ jc.addJob(loadWideRow);
+ jc.addJob(joinJob);
+
+ new Thread(jc).start();
+
+ int i = 0;
+ while(!jc.allFinished()){
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ break;
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {}
+
+ if (i % 10000 == 0) {
+ System.out.println("Running jobs");
+ ArrayList<Job> running = jc.getRunningJobs();
+ if (running != null && running.size() > 0) {
+ for (Job r : running) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Ready jobs");
+ ArrayList<Job> ready = jc.getReadyJobs();
+ if (ready != null && ready.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Waiting jobs");
+ ArrayList<Job> waiting = jc.getWaitingJobs();
+ if (waiting != null && waiting.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Successful jobs");
+ ArrayList<Job> success = jc.getSuccessfulJobs();
+ if (success != null && success.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ }
+ i++;
+ }
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ }
+ jc.stop();
+ }
+
+}