You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/04/16 01:13:10 UTC

svn commit: r1468268 [3/3] - in /pig/trunk: ./ test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/ test/perf/ test/perf/pigmix/ test/perf/pigmix/bin/ test/perf/pigmix/conf/ test/perf/pigmix/src/ test/perf/pigmix/src/java/ test/perf/pigmix/src/jav...

Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L6.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L6.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L6.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L6.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,166 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L6 {
+
+    public static class ReadPageViews extends MapReduceBase
+        implements Mapper<LongWritable, Text, Text, IntWritable> {
+
+        public void map(
+                LongWritable k,
+                Text val,
+                OutputCollector<Text, IntWritable> oc,
+                Reporter reporter) throws IOException {
+
+            // Split the line
+            List<Text> fields = Library.splitLine(val, '');
+            if (fields.size() != 9) return;
+
+            StringBuffer sb = new StringBuffer();
+            sb.append(fields.get(0).toString());
+            sb.append("");
+            sb.append(fields.get(3).toString());
+            sb.append("");
+            sb.append(fields.get(4).toString());
+            sb.append("");
+            sb.append(fields.get(5).toString());
+            Text key = new Text(sb.toString());
+
+            try {
+                oc.collect(fields.get(0),
+                    new IntWritable(Integer.valueOf(fields.get(3).toString())));
+            } catch (NumberFormatException nfe) {
+            }
+        }
+    }
+
+    public static class Group extends MapReduceBase
+        implements Reducer<Text, IntWritable, Text, IntWritable> {
+
+        public void reduce(
+                Text key,
+                Iterator<IntWritable> iter, 
+                OutputCollector<Text, IntWritable> oc,
+                Reporter reporter) throws IOException {
+            int sum = 0;
+            while (iter.hasNext()) {
+                sum += iter.next().get();
+            }
+            oc.collect(key, new IntWritable(sum));
+            reporter.setStatus("OK");
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        if (args.length!=3) {
+            System.out.println("Parameters: inputDir outputDir parallel");
+            System.exit(1);
+        }
+        String inputDir = args[0];
+        String outputDir = args[1];
+        String parallel = args[2];
+        JobConf lp = new JobConf(L6.class);
+        lp.setJobName("L6 Load Page Views");
+        lp.setInputFormat(TextInputFormat.class);
+        lp.setOutputKeyClass(Text.class);
+        lp.setOutputValueClass(IntWritable.class);
+        lp.setMapperClass(ReadPageViews.class);
+        lp.setCombinerClass(Group.class);
+        lp.setReducerClass(Group.class);
+        Properties props = System.getProperties();
+        for (Map.Entry<Object,Object> entry : props.entrySet()) {
+            lp.set((String)entry.getKey(), (String)entry.getValue());
+        }
+        FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+        FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L6out"));
+        lp.setNumReduceTasks(Integer.parseInt(parallel));
+        Job group = new Job(lp);
+
+        JobControl jc = new JobControl("L6 join");
+        jc.addJob(group);
+
+        new Thread(jc).start();
+   
+        int i = 0;
+        while(!jc.allFinished()){
+            ArrayList<Job> failures = jc.getFailedJobs();
+            if (failures != null && failures.size() > 0) {
+                for (Job failure : failures) {
+                    System.err.println(failure.getMessage());
+                }
+                break;
+            }
+
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {}
+
+            if (i % 10000 == 0) {
+                System.out.println("Running jobs");
+                ArrayList<Job> running = jc.getRunningJobs();
+                if (running != null && running.size() > 0) {
+                    for (Job r : running) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Ready jobs");
+                ArrayList<Job> ready = jc.getReadyJobs();
+                if (ready != null && ready.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Waiting jobs");
+                ArrayList<Job> waiting = jc.getWaitingJobs();
+                if (waiting != null && waiting.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Successful jobs");
+                ArrayList<Job> success = jc.getSuccessfulJobs();
+                if (success != null && success.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+            }
+            i++;
+        }
+        ArrayList<Job> failures = jc.getFailedJobs();
+        if (failures != null && failures.size() > 0) {
+            for (Job failure : failures) {
+                System.err.println(failure.getMessage());
+            }
+        }
+        jc.stop();
+    }
+
+}

Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L7.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L7.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L7.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L7.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,186 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L7 {
+
+    public static class ReadPageViews extends MapReduceBase
+        implements Mapper<LongWritable, Text, Text, Text> {
+
+        public void map(
+                LongWritable k,
+                Text val,
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+
+            // Split the line
+            List<Text> fields = Library.splitLine(val, '');
+            if (fields.size() != 9) return;
+
+            oc.collect(fields.get(0), fields.get(5));
+        }
+    }
+
+    public static class Combiner extends MapReduceBase
+        implements Reducer<Text, Text, Text, Text> {
+
+        public void reduce(
+                Text key,
+                Iterator<Text> iter, 
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+            int morning = 0, afternoon = 0;
+            while (iter.hasNext()) {
+                try {
+                    if (Integer.valueOf(iter.next().toString()) < 43200) morning++;
+                    else afternoon++;
+                } catch (NumberFormatException nfe) {
+                }
+            }
+            StringBuffer sb = new StringBuffer();
+            sb.append((new Integer(morning)).toString());
+            sb.append("");
+            sb.append((new Integer(afternoon)).toString());
+            oc.collect(key, new Text(sb.toString()));
+            reporter.setStatus("OK");
+        }
+    }
+
+    public static class Group extends MapReduceBase
+        implements Reducer<Text, Text, Text, Text> {
+
+        public void reduce(
+                Text key,
+                Iterator<Text> iter, 
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+            int morning = 0, afternoon = 0;
+            while (iter.hasNext()) {
+                List<Text> vals = Library.splitLine(iter.next(), '');
+                try {
+                    morning += Integer.valueOf(vals.get(0).toString());
+                    if (vals.size() > 1) afternoon += Integer.valueOf(vals.get(1).toString());
+                } catch (NumberFormatException nfe) {
+                }
+            }
+            StringBuffer sb = new StringBuffer();
+            sb.append((new Integer(morning)).toString());
+            sb.append("");
+            sb.append((new Integer(afternoon)).toString());
+            oc.collect(key, new Text(sb.toString()));
+            reporter.setStatus("OK");
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        if (args.length!=3) {
+            System.out.println("Parameters: inputDir outputDir parallel");
+            System.exit(1);
+        }
+        String inputDir = args[0];
+        String outputDir = args[1];
+        String parallel = args[2];
+        JobConf lp = new JobConf(L7.class);
+        lp.setJobName("L7 Load Page Views");
+        lp.setInputFormat(TextInputFormat.class);
+        lp.setOutputKeyClass(Text.class);
+        lp.setOutputValueClass(Text.class);
+        lp.setMapperClass(ReadPageViews.class);
+        lp.setCombinerClass(Combiner.class);
+        lp.setReducerClass(Group.class);
+        Properties props = System.getProperties();
+        for (Map.Entry<Object,Object> entry : props.entrySet()) {
+            lp.set((String)entry.getKey(), (String)entry.getValue());
+        }
+        FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+        FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L7out"));
+        lp.setNumReduceTasks(Integer.parseInt(parallel));
+        Job group = new Job(lp);
+
+        JobControl jc = new JobControl("L7 join");
+        jc.addJob(group);
+
+        new Thread(jc).start();
+   
+        int i = 0;
+        while(!jc.allFinished()){
+            ArrayList<Job> failures = jc.getFailedJobs();
+            if (failures != null && failures.size() > 0) {
+                for (Job failure : failures) {
+                    System.err.println(failure.getMessage());
+                }
+                break;
+            }
+
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {}
+
+            if (i % 10000 == 0) {
+                System.out.println("Running jobs");
+                ArrayList<Job> running = jc.getRunningJobs();
+                if (running != null && running.size() > 0) {
+                    for (Job r : running) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Ready jobs");
+                ArrayList<Job> ready = jc.getReadyJobs();
+                if (ready != null && ready.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Waiting jobs");
+                ArrayList<Job> waiting = jc.getWaitingJobs();
+                if (waiting != null && waiting.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Successful jobs");
+                ArrayList<Job> success = jc.getSuccessfulJobs();
+                if (success != null && success.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+            }
+            i++;
+        }
+        ArrayList<Job> failures = jc.getFailedJobs();
+        if (failures != null && failures.size() > 0) {
+            for (Job failure : failures) {
+                System.err.println(failure.getMessage());
+            }
+        }
+        jc.stop();
+    }
+
+}

Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L8.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L8.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L8.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L8.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,197 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L8 {
+
+    public static class ReadPageViews extends MapReduceBase
+        implements Mapper<LongWritable, Text, Text, Text> {
+
+        public void map(
+                LongWritable k,
+                Text val,
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+
+            // Split the line
+            List<Text> fields = Library.splitLine(val, '');
+            if (fields.size() != 9) return;
+
+            StringBuffer sb = new StringBuffer();
+            sb.append(fields.get(2).toString());
+            sb.append("");
+            sb.append(fields.get(6).toString());
+            oc.collect(new Text("all"), new Text(sb.toString()));
+        }
+    }
+
+    public static class Combiner extends MapReduceBase
+        implements Reducer<Text, Text, Text, Text> {
+
+        public void reduce(
+                Text key,
+                Iterator<Text> iter, 
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+            int tsSum = 0, erCnt = 0;
+            double erSum = 0.0;
+            while (iter.hasNext()) {
+                List<Text> vals = Library.splitLine(iter.next(), '');
+                try {
+                    tsSum += Integer.valueOf(vals.get(0).toString());
+                    erSum += Double.valueOf(vals.get(1).toString());
+                    erCnt++;
+                } catch (NumberFormatException nfe) {
+                }
+            }
+            StringBuffer sb = new StringBuffer();
+            sb.append((new Integer(tsSum)).toString());
+            sb.append("");
+            sb.append((new Double(erSum)).toString());
+            sb.append("");
+            sb.append((new Integer(erCnt)).toString());
+            oc.collect(key, new Text(sb.toString()));
+            reporter.setStatus("OK");
+        }
+    }
+    public static class Group extends MapReduceBase
+        implements Reducer<Text, Text, Text, Text> {
+
+        public void reduce(
+                Text key,
+                Iterator<Text> iter, 
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+            int tsSum = 0, erCnt = 0;
+            double erSum = 0.0;
+            while (iter.hasNext()) {
+                List<Text> vals = Library.splitLine(iter.next(), '');
+                try {
+                    tsSum += Integer.valueOf(vals.get(0).toString());
+                    erSum += Double.valueOf(vals.get(1).toString());
+                    erCnt++;
+                } catch (NumberFormatException nfe) {
+                }
+            }
+            double erAvg = erSum / erCnt;
+            StringBuffer sb = new StringBuffer();
+            sb.append((new Integer(tsSum)).toString());
+            sb.append("");
+            sb.append((new Double(erAvg)).toString());
+            oc.collect(key, new Text(sb.toString()));
+            reporter.setStatus("OK");
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        if (args.length!=3) {
+            System.out.println("Parameters: inputDir outputDir parallel");
+            System.exit(1);
+        }
+        String inputDir = args[0];
+        String outputDir = args[1];
+        String parallel = args[2];
+        JobConf lp = new JobConf(L8.class);
+        lp.setJobName("L8 Load Page Views");
+        lp.setInputFormat(TextInputFormat.class);
+        lp.setOutputKeyClass(Text.class);
+        lp.setOutputValueClass(Text.class);
+        lp.setMapperClass(ReadPageViews.class);
+        lp.setCombinerClass(Combiner.class);
+        lp.setReducerClass(Group.class);
+        Properties props = System.getProperties();
+        for (Map.Entry<Object,Object> entry : props.entrySet()) {
+            lp.set((String)entry.getKey(), (String)entry.getValue());
+        }
+        FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+        FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L8out"));
+        lp.setNumReduceTasks(1);
+        Job group = new Job(lp);
+
+        JobControl jc = new JobControl("L8 join");
+        jc.addJob(group);
+
+        new Thread(jc).start();
+   
+        int i = 0;
+        while(!jc.allFinished()){
+            ArrayList<Job> failures = jc.getFailedJobs();
+            if (failures != null && failures.size() > 0) {
+                for (Job failure : failures) {
+                    System.err.println(failure.getMessage());
+                }
+                break;
+            }
+
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {}
+
+            if (i % 10000 == 0) {
+                System.out.println("Running jobs");
+                ArrayList<Job> running = jc.getRunningJobs();
+                if (running != null && running.size() > 0) {
+                    for (Job r : running) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Ready jobs");
+                ArrayList<Job> ready = jc.getReadyJobs();
+                if (ready != null && ready.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Waiting jobs");
+                ArrayList<Job> waiting = jc.getWaitingJobs();
+                if (waiting != null && waiting.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Successful jobs");
+                ArrayList<Job> success = jc.getSuccessfulJobs();
+                if (success != null && success.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+            }
+            i++;
+        }
+        ArrayList<Job> failures = jc.getFailedJobs();
+        if (failures != null && failures.size() > 0) {
+            for (Job failure : failures) {
+                System.err.println(failure.getMessage());
+            }
+        }
+        jc.stop();
+    }
+
+}

Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L9.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L9.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L9.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L9.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,230 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L9 {
+
+    public static class ReadPageViews extends MapReduceBase
+        implements Mapper<LongWritable, Text, Text, Text> {
+
+        public void map(
+                LongWritable k,
+                Text val,
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+
+            // Split the line
+            List<Text> fields = Library.splitLine(val, '');
+            if (fields.size() != 9) return;
+
+            oc.collect(fields.get(0), val);
+        }
+    }
+
+    public static class MyPartitioner implements Partitioner<Text, Text> {
+
+        public Map<Character, Integer> map;
+
+        public int getPartition(Text key, Text value, int numPartitions) {
+            int rc = 0;
+            String s = key.toString();
+            if (s == null || s.length() < 2) return 39;
+            if (s.charAt(0) > ']') rc += 20;
+            rc += map.get(s.charAt(1));
+            return rc;
+        }
+
+        public void configure(JobConf conf) {
+            // Don't actually do any configuration, do the setup of the hash
+            // because this call is guaranteed to be made each time we set up
+            // MyPartitioner
+            map = new HashMap<Character, Integer>(57);
+            map.put('A', 0);
+            map.put('B', 1);
+            map.put('C', 2);
+            map.put('D', 3);
+            map.put('E', 4);
+            map.put('F', 5);
+            map.put('G', 6);
+            map.put('I', 7);
+            map.put('H', 8);
+            map.put('J', 9);
+            map.put('K', 10);
+            map.put('L', 11);
+            map.put('M', 12);
+            map.put('N', 13);
+            map.put('O', 14);
+            map.put('P', 15);
+            map.put('Q', 16);
+            map.put('R', 17);
+            map.put('S', 18);
+            map.put('T', 19);
+            map.put('U', 0);
+            map.put('V', 1);
+            map.put('W', 2);
+            map.put('X', 3);
+            map.put('Y', 4);
+            map.put('Z', 5);
+            map.put('[', 6);
+            map.put('\\', 7);
+            map.put(']', 8);
+            map.put('^', 9);
+            map.put('_', 10);
+            map.put('`', 11);
+            map.put('a', 12);
+            map.put('b', 13);
+            map.put('c', 14);
+            map.put('d', 15);
+            map.put('e', 16);
+            map.put('f', 17);
+            map.put('g', 18);
+            map.put('h', 19);
+            map.put('i', 0);
+            map.put('j', 1);
+            map.put('k', 2);
+            map.put('l', 3);
+            map.put('m', 4);
+            map.put('n', 5);
+            map.put('o', 6);
+            map.put('p', 7);
+            map.put('q', 8);
+            map.put('r', 9);
+            map.put('s', 10);
+            map.put('t', 11);
+            map.put('u', 12);
+            map.put('v', 13);
+            map.put('w', 14);
+            map.put('x', 15);
+            map.put('y', 16);
+            map.put('z', 17);
+        }
+    }
+
+    public static class Group extends MapReduceBase
+        implements Reducer<Text, Text, Text, Text> {
+
+        public void reduce(
+                Text key,
+                Iterator<Text> iter, 
+                OutputCollector<Text, Text> oc,
+                Reporter reporter) throws IOException {
+            while (iter.hasNext()) {
+                oc.collect(null, iter.next());
+            }
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        if (args.length!=3) {
+            System.out.println("Parameters: inputDir outputDir parallel");
+            System.exit(1);
+        }
+        String inputDir = args[0];
+        String outputDir = args[1];
+        String parallel = args[2];
+        JobConf lp = new JobConf(L9.class);
+        lp.setJobName("L9 Load Page Views");
+        lp.setInputFormat(TextInputFormat.class);
+        lp.setOutputKeyClass(Text.class);
+        lp.setOutputValueClass(Text.class);
+        lp.setMapperClass(ReadPageViews.class);
+        lp.setReducerClass(Group.class);
+        lp.setPartitionerClass(MyPartitioner.class);
+        Properties props = System.getProperties();
+        for (Map.Entry<Object,Object> entry : props.entrySet()) {
+            lp.set((String)entry.getKey(), (String)entry.getValue());
+        }
+        FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+        FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L9out"));
+        // Hardcode the parallel to 40 since MyPartitioner assumes it
+        lp.setNumReduceTasks(40);
+        Job group = new Job(lp);
+
+        JobControl jc = new JobControl("L9 join");
+        jc.addJob(group);
+
+        new Thread(jc).start();
+   
+        int i = 0;
+        while(!jc.allFinished()){
+            ArrayList<Job> failures = jc.getFailedJobs();
+            if (failures != null && failures.size() > 0) {
+                for (Job failure : failures) {
+                    System.err.println(failure.getMessage());
+                }
+                break;
+            }
+
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {}
+
+            if (i % 10000 == 0) {
+                System.out.println("Running jobs");
+                ArrayList<Job> running = jc.getRunningJobs();
+                if (running != null && running.size() > 0) {
+                    for (Job r : running) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Ready jobs");
+                ArrayList<Job> ready = jc.getReadyJobs();
+                if (ready != null && ready.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Waiting jobs");
+                ArrayList<Job> waiting = jc.getWaitingJobs();
+                if (waiting != null && waiting.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+                System.out.println("Successful jobs");
+                ArrayList<Job> success = jc.getSuccessfulJobs();
+                if (success != null && success.size() > 0) {
+                    for (Job r : ready) {
+                        System.out.println(r.getJobName());
+                    }
+                }
+            }
+            i++;
+        }
+        ArrayList<Job> failures = jc.getFailedJobs();
+        if (failures != null && failures.size() > 0) {
+            for (Job failure : failures) {
+                System.err.println(failure.getMessage());
+            }
+        }
+        jc.stop();
+    }
+
+}

Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/Library.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/Library.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/Library.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/Library.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,45 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * A collection of static functions for use by the pigmix map reduce tasks.
+ */
+public class Library {
+
+    public static List<Text> splitLine(Text line, char delimiter) {
+        String s = line.toString();
+        List<Text> cols = new ArrayList<Text>();
+        int start = 0;
+        for (int i = 0; i < s.length(); i++) {
+            if (s.charAt(i) == delimiter) {
+                if (start == i) cols.add(new Text()); // null case
+                else cols.add(new Text(s.substring(start, i)));
+                start = i + 1;
+            }
+        }
+        // Grab the last one.
+        if (start != s.length() - 1) cols.add(new Text(s.substring(start)));
+
+        return cols;
+    }
+
+    public static Text mapLookup(Text mapCol, Text key) {
+        List<Text> kvps = splitLine(mapCol, '');
+
+        for (Text potential : kvps) {
+            // Split potential on ^D
+            List<Text> kv = splitLine(potential, '');
+            if (kv.size() != 2) return null;
+            if (kv.get(0).equals(potential)) return kv.get(1);
+        }
+
+        return null;
+    }
+
+        
+                
+}

Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/PigPerformanceLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/PigPerformanceLoader.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/PigPerformanceLoader.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/PigPerformanceLoader.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.pigmix.udf;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.LoadCaster;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+
+/**
+ * A load function for the performance tests.
+ */
+public class PigPerformanceLoader extends PigStorage {
+
+    BagFactory bagFactory;
+    TupleFactory tupleFactory;
+
+    public PigPerformanceLoader() {
+        // Assume ^A as a delimiter
+        super("");
+        bagFactory = BagFactory.getInstance();
+        tupleFactory = TupleFactory.getInstance();
+    }
+
+    @Override
+    public LoadCaster getLoadCaster() throws IOException {
+        return new Caster();
+    }
+        
+    class Caster implements LoadCaster {
+        
+        Utf8StorageConverter helper = new Utf8StorageConverter();
+        /**
+         * 
+         */
+        public Caster() {
+            // TODO Auto-generated constructor stub
+        }
+        
+        public DataBag bytesToBag(byte[] b, ResourceFieldSchema fs) throws IOException {
+            if (b == null) return null;
+
+            DataBag bag = bagFactory.newDefaultBag();
+
+            int pos = 0;
+            while (pos < b.length) {
+                Tuple t = tupleFactory.newTuple(1);
+
+                // Figure out how long until the next element in the list.
+                int start = pos;
+                while (pos < b.length && b[pos] != 2) pos++; // 2 is ^B
+
+                byte[] copy = new byte[pos - start];
+                int i, j;
+                for (i = start + 1, j = 0; i < pos; i++, j++) copy[j] = b[i];
+
+                // The first byte will tell us what type the field is.
+                try {
+                    switch (b[start]) {
+                        case 105: t.set(0, bytesToInteger(copy)); break;
+                        case 108: t.set(0, bytesToLong(copy)); break;
+                        case 102: t.set(0, bytesToFloat(copy)); break;
+                        case 100: t.set(0, bytesToDouble(copy)); break;
+                        case 115: t.set(0, bytesToCharArray(copy)); break;
+                        case 109: t.set(0, bytesToMap(copy)); break;
+                        case 98: t.set(0, bytesToBag(copy, null)); break;
+                        default: throw new RuntimeException("unknown type " + b[start]);
+                    }
+                } catch (ExecException ee) {
+                    throw new IOException(ee);
+                }
+                pos++; // move past the separator
+                bag.add(t);
+            }
+
+            return bag;
+        }
+
+        public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+            if (b == null) return null;
+
+            Map<String, Object> m = new HashMap<String, Object>(26);
+
+            int pos = 0;
+            while (pos < b.length) {
+
+                // The key is always one character at the moment.
+                byte[] k = new byte[1];
+                k[0] = b[pos];
+                String key = new String(k);
+                pos += 2;
+                int start = pos;
+                while (pos < b.length && b[pos] != 3) pos++; // 3 is ^C
+
+                byte[] copy = new byte[pos - start];
+                int i, j;
+                for (i = start + 1, j = 0; i < pos; i++, j++) copy[j] = b[i];
+                String val = bytesToCharArray(copy);
+                m.put(key, val);
+                pos++; // move past ^C
+            }
+            return m; 
+        }
+
+        @Override
+        public String bytesToCharArray(byte[] arg0) throws IOException {
+            return helper.bytesToCharArray(arg0);
+        }
+
+        @Override
+        public Double bytesToDouble(byte[] arg0) throws IOException {
+            return helper.bytesToDouble(arg0);
+        }
+
+        @Override
+        public Float bytesToFloat(byte[] arg0) throws IOException {
+            return helper.bytesToFloat(arg0);
+        }
+
+        @Override
+        public Integer bytesToInteger(byte[] arg0) throws IOException {
+            return helper.bytesToInteger(arg0);
+        }
+
+        @Override
+        public Long bytesToLong(byte[] arg0) throws IOException {
+            return helper.bytesToLong(arg0);
+        }
+
+        @Override
+        public Tuple bytesToTuple(byte[] arg0, ResourceFieldSchema fs) throws IOException {
+            return helper.bytesToTuple(arg0, fs);
+        }
+
+        @Override
+        public Boolean bytesToBoolean(byte[] arg0) throws IOException {
+            return helper.bytesToBoolean(arg0);
+        }
+
+        @Override
+        public DateTime bytesToDateTime(byte[] arg0) throws IOException {
+            return helper.bytesToDateTime(arg0);
+        }
+
+        @Override
+        public Map<String, Object> bytesToMap(byte[] arg0,
+                ResourceFieldSchema fs) throws IOException {
+            return helper.bytesToMap(arg0, fs);
+        }
+
+        @Override
+        public BigInteger bytesToBigInteger(byte[] arg0) throws IOException {
+            return helper.bytesToBigInteger(arg0);
+        }
+
+        @Override
+        public BigDecimal bytesToBigDecimal(byte[] arg0) throws IOException {
+            return helper.bytesToBigDecimal(arg0);
+        }
+    }
+}

Added: pig/trunk/test/perf/pigmix/src/pig/L1.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L1.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L1.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L1.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,14 @@
+-- This script tests reading from a map, flattening a bag of maps, and use of bincond.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links);
+B = foreach A generate user, (int)action as action, (map[])page_info as page_info,
+    flatten((bag{tuple(map[])})page_links) as page_links;
+C = foreach B generate user,
+    (action == 1 ? page_info#'a' : page_links#'b') as header;
+D = group C by user parallel $PARALLEL;
+E = foreach D generate group, COUNT(C) as cnt;
+store E into '$PIGMIX_OUTPUT/L1out';
+
+

Added: pig/trunk/test/perf/pigmix/src/pig/L10.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L10.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L10.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L10.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,7 @@
+--This script covers order by of multiple values.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent:int, query_term, ip_addr, timestamp,
+        estimated_revenue:double, page_info, page_links);
+B = order A by query_term, estimated_revenue desc, timespent parallel $PARALLEL;
+store B into '$PIGMIX_OUTPUT/L10out';

Added: pig/trunk/test/perf/pigmix/src/pig/L11.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L11.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L11.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L11.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+-- This script covers distinct and union.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links);
+B = foreach A generate user;
+C = distinct B parallel $PARALLEL;
+alpha = load '$HDFS_ROOT/widerow' using PigStorage('\u0001');
+beta = foreach alpha generate $0 as name;
+gamma = distinct beta parallel $PARALLEL;
+D = union C, gamma;
+E = distinct D parallel $PARALLEL;
+store E into '$PIGMIX_OUTPUT/L11out';

Added: pig/trunk/test/perf/pigmix/src/pig/L12.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L12.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L12.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L12.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,18 @@
+-- This script covers multi-store queries.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links);
+B = foreach A generate user, action, (int)timespent as timespent, query_term,
+    (double)estimated_revenue as estimated_revenue;
+split B into C if user is not null, alpha if user is null;
+split C into D if query_term is not null, aleph if query_term is null;
+E = group D by user parallel $PARALLEL;
+F = foreach E generate group, MAX(D.estimated_revenue);
+store F into '$PIGMIX_OUTPUT/highest_value_page_per_user';
+beta = group alpha by query_term parallel $PARALLEL;
+gamma = foreach beta generate group, SUM(alpha.timespent);
+store gamma into '$PIGMIX_OUTPUT/total_timespent_per_term';
+beth = group aleph by action parallel $PARALLEL;
+gimel = foreach beth generate group, COUNT(aleph);
+store gimel into '$PIGMIX_OUTPUT/queries_per_action';

Added: pig/trunk/test/perf/pigmix/src/pig/L13.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L13.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L13.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L13.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,9 @@
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+	as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user, estimated_revenue;
+alpha = load '$HDFS_ROOT/power_users_samples' using PigStorage('\u0001') as (name, phone, address, city, state, zip);
+beta = foreach alpha generate name, phone;
+C = join B by user left outer, beta by name parallel $PARALLEL;
+store C into '$PIGMIX_OUTPUT/L13out';
+

Added: pig/trunk/test/perf/pigmix/src/pig/L14.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L14.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L14.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L14.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,9 @@
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views_sorted' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user, estimated_revenue;
+alpha = load '$HDFS_ROOT/users_sorted' using PigStorage('\u0001') as (name, phone, address, city, state, zip);
+beta = foreach alpha generate name;
+C = join B by user, beta by name using 'merge';
+store C into '$PIGMIX_OUTPUT/L14out';
+

Added: pig/trunk/test/perf/pigmix/src/pig/L15.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L15.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L15.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L15.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user, action, estimated_revenue, timespent;
+C = group B by user parallel $PARALLEL;
+D = foreach C {
+    beth = distinct B.action;
+    rev = distinct B.estimated_revenue;
+    ts = distinct B.timespent;
+    generate group, COUNT(beth), SUM(rev), (int)AVG(ts);
+}
+store D into '$PIGMIX_OUTPUT/L15out';
+

Added: pig/trunk/test/perf/pigmix/src/pig/L16.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L16.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L16.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L16.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user, estimated_revenue;
+C = group B by user parallel $PARALLEL;
+D = foreach C {
+    E = order B by estimated_revenue;
+    F = E.estimated_revenue;
+    generate group, SUM(F);
+}
+
+store D into '$PIGMIX_OUTPUT/L16out';
+

Added: pig/trunk/test/perf/pigmix/src/pig/L17.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L17.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L17.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L17.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/widegroupbydata' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links, user_1, action_1, timespent_1, query_term_1, ip_addr_1, timestamp_1,
+        estimated_revenue_1, page_info_1, page_links_1, user_2, action_2, timespent_2, query_term_2, ip_addr_2, timestamp_2,
+        estimated_revenue_2, page_info_2, page_links_2);
+B = group A by (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, user_1, action_1, timespent_1, query_term_1, ip_addr_1, timestamp_1,
+        estimated_revenue_1, user_2, action_2, timespent_2, query_term_2, ip_addr_2, timestamp_2,
+        estimated_revenue_2) parallel $PARALLEL;
+C = foreach B generate SUM(A.timespent), SUM(A.timespent_1), SUM(A.timespent_2), AVG(A.estimated_revenue), AVG(A.estimated_revenue_1), AVG(A.estimated_revenue_2);
+store C into '$PIGMIX_OUTPUT/L17out';
+

Added: pig/trunk/test/perf/pigmix/src/pig/L2.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L2.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L2.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L2.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,12 @@
+-- This script tests using a join small enough to do in fragment and replicate. 
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links);
+B = foreach A generate user, estimated_revenue;
+alpha = load '$HDFS_ROOT/power_users' using PigStorage('\u0001') as (name, phone,
+        address, city, state, zip);
+beta = foreach alpha generate name;
+C = join B by user, beta by name using 'replicated' parallel $PARALLEL;
+store C into '$PIGMIX_OUTPUT/L2out';
+

Added: pig/trunk/test/perf/pigmix/src/pig/L3.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L3.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L3.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L3.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,16 @@
+--This script tests a join too large for fragment and replicate.  It also 
+--contains a join followed by a group by on the same key, something that we
+--could potentially optimize by not regrouping.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links);
+B = foreach A generate user, (double)estimated_revenue;
+alpha = load '$HDFS_ROOT/users' using PigStorage('\u0001') as (name, phone, address,
+        city, state, zip);
+beta = foreach alpha generate name;
+C = join beta by name, B by user parallel $PARALLEL;
+D = group C by $0 parallel $PARALLEL;
+E = foreach D generate group, SUM(C.estimated_revenue);
+store E into '$PIGMIX_OUTPUT/L3out';
+

Added: pig/trunk/test/perf/pigmix/src/pig/L4.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L4.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L4.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L4.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+-- This script covers foreach/generate with a nested distinct.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links);
+B = foreach A generate user, action;
+C = group B by user parallel $PARALLEL;
+D = foreach C {
+    aleph = B.action;
+    beth = distinct aleph;
+    generate group, COUNT(beth);
+}
+store D into '$PIGMIX_OUTPUT/L4out';

Added: pig/trunk/test/perf/pigmix/src/pig/L5.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L5.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L5.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L5.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,14 @@
+--This script does an anti-join.  This is useful because it is a use of
+--cogroup that is not a regular join.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links);
+B = foreach A generate user;
+alpha = load '$HDFS_ROOT/users' using PigStorage('\u0001') as (name, phone, address,
+        city, state, zip);
+beta = foreach alpha generate name;
+C = cogroup beta by name, B by user parallel $PARALLEL;
+D = filter C by COUNT(beta) == 0;
+E = foreach D generate group;
+store E into '$PIGMIX_OUTPUT/L5out';

Added: pig/trunk/test/perf/pigmix/src/pig/L6.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L6.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L6.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L6.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,11 @@
+-- This script covers the case where the group by key is a significant
+-- percentage of the row.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links);
+B = foreach A generate user, action, (int)timespent as timespent, query_term, ip_addr, timestamp;
+C = group B by (user, query_term, ip_addr, timestamp) parallel $PARALLEL;
+D = foreach C generate flatten(group), SUM(B.timespent);
+store D into '$PIGMIX_OUTPUT/L6out';
+

Added: pig/trunk/test/perf/pigmix/src/pig/L7.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L7.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L7.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L7.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,12 @@
+-- This script covers having a nested plan with splits.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader() as (user, action, timespent, query_term,
+            ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user, timestamp;
+C = group B by user parallel $PARALLEL;
+D = foreach C {
+    morning = filter B by timestamp < 43200;
+    afternoon = filter B by timestamp >= 43200;
+    generate group, COUNT(morning), COUNT(afternoon);
+}
+store D into '$PIGMIX_OUTPUT/L7out';

Added: pig/trunk/test/perf/pigmix/src/pig/L8.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L8.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L8.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L8.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,9 @@
+-- This script covers group all.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links);
+B = foreach A generate user, (int)timespent as timespent, (double)estimated_revenue as estimated_revenue;
+C = group B all;
+D = foreach C generate SUM(B.timespent), AVG(B.estimated_revenue);
+store D into '$PIGMIX_OUTPUT/L8out';

Added: pig/trunk/test/perf/pigmix/src/pig/L9.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L9.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L9.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L9.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,7 @@
+--This script covers order by of a single value.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+    as (user, action, timespent, query_term, ip_addr, timestamp,
+        estimated_revenue, page_info, page_links);
+B = order A by query_term parallel $PARALLEL;
+store B into '$PIGMIX_OUTPUT/L9out';