You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/04/16 01:13:10 UTC
svn commit: r1468268 [3/3] - in /pig/trunk: ./
test/e2e/pig/udfs/java/org/apache/pig/test/udf/storefunc/ test/perf/
test/perf/pigmix/ test/perf/pigmix/bin/ test/perf/pigmix/conf/
test/perf/pigmix/src/ test/perf/pigmix/src/java/ test/perf/pigmix/src/jav...
Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L6.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L6.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L6.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L6.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,166 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L6 {
+
+ public static class ReadPageViews extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, IntWritable> {
+
+ public void map(
+ LongWritable k,
+ Text val,
+ OutputCollector<Text, IntWritable> oc,
+ Reporter reporter) throws IOException {
+
+ // Split the line
+ List<Text> fields = Library.splitLine(val, '');
+ if (fields.size() != 9) return;
+
+ StringBuffer sb = new StringBuffer();
+ sb.append(fields.get(0).toString());
+ sb.append("");
+ sb.append(fields.get(3).toString());
+ sb.append("");
+ sb.append(fields.get(4).toString());
+ sb.append("");
+ sb.append(fields.get(5).toString());
+ Text key = new Text(sb.toString());
+
+ try {
+ oc.collect(fields.get(0),
+ new IntWritable(Integer.valueOf(fields.get(3).toString())));
+ } catch (NumberFormatException nfe) {
+ }
+ }
+ }
+
+ public static class Group extends MapReduceBase
+ implements Reducer<Text, IntWritable, Text, IntWritable> {
+
+ public void reduce(
+ Text key,
+ Iterator<IntWritable> iter,
+ OutputCollector<Text, IntWritable> oc,
+ Reporter reporter) throws IOException {
+ int sum = 0;
+ while (iter.hasNext()) {
+ sum += iter.next().get();
+ }
+ oc.collect(key, new IntWritable(sum));
+ reporter.setStatus("OK");
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ if (args.length!=3) {
+ System.out.println("Parameters: inputDir outputDir parallel");
+ System.exit(1);
+ }
+ String inputDir = args[0];
+ String outputDir = args[1];
+ String parallel = args[2];
+ JobConf lp = new JobConf(L6.class);
+ lp.setJobName("L6 Load Page Views");
+ lp.setInputFormat(TextInputFormat.class);
+ lp.setOutputKeyClass(Text.class);
+ lp.setOutputValueClass(IntWritable.class);
+ lp.setMapperClass(ReadPageViews.class);
+ lp.setCombinerClass(Group.class);
+ lp.setReducerClass(Group.class);
+ Properties props = System.getProperties();
+ for (Map.Entry<Object,Object> entry : props.entrySet()) {
+ lp.set((String)entry.getKey(), (String)entry.getValue());
+ }
+ FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+ FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L6out"));
+ lp.setNumReduceTasks(Integer.parseInt(parallel));
+ Job group = new Job(lp);
+
+ JobControl jc = new JobControl("L6 join");
+ jc.addJob(group);
+
+ new Thread(jc).start();
+
+ int i = 0;
+ while(!jc.allFinished()){
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ break;
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {}
+
+ if (i % 10000 == 0) {
+ System.out.println("Running jobs");
+ ArrayList<Job> running = jc.getRunningJobs();
+ if (running != null && running.size() > 0) {
+ for (Job r : running) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Ready jobs");
+ ArrayList<Job> ready = jc.getReadyJobs();
+ if (ready != null && ready.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Waiting jobs");
+ ArrayList<Job> waiting = jc.getWaitingJobs();
+ if (waiting != null && waiting.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Successful jobs");
+ ArrayList<Job> success = jc.getSuccessfulJobs();
+ if (success != null && success.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ }
+ i++;
+ }
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ }
+ jc.stop();
+ }
+
+}
Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L7.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L7.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L7.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L7.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,186 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L7 {
+
+ public static class ReadPageViews extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, Text> {
+
+ public void map(
+ LongWritable k,
+ Text val,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+
+ // Split the line
+ List<Text> fields = Library.splitLine(val, '');
+ if (fields.size() != 9) return;
+
+ oc.collect(fields.get(0), fields.get(5));
+ }
+ }
+
+ public static class Combiner extends MapReduceBase
+ implements Reducer<Text, Text, Text, Text> {
+
+ public void reduce(
+ Text key,
+ Iterator<Text> iter,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+ int morning = 0, afternoon = 0;
+ while (iter.hasNext()) {
+ try {
+ if (Integer.valueOf(iter.next().toString()) < 43200) morning++;
+ else afternoon++;
+ } catch (NumberFormatException nfe) {
+ }
+ }
+ StringBuffer sb = new StringBuffer();
+ sb.append((new Integer(morning)).toString());
+ sb.append("");
+ sb.append((new Integer(afternoon)).toString());
+ oc.collect(key, new Text(sb.toString()));
+ reporter.setStatus("OK");
+ }
+ }
+
+ public static class Group extends MapReduceBase
+ implements Reducer<Text, Text, Text, Text> {
+
+ public void reduce(
+ Text key,
+ Iterator<Text> iter,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+ int morning = 0, afternoon = 0;
+ while (iter.hasNext()) {
+ List<Text> vals = Library.splitLine(iter.next(), '');
+ try {
+ morning += Integer.valueOf(vals.get(0).toString());
+ if (vals.size() > 1) afternoon += Integer.valueOf(vals.get(1).toString());
+ } catch (NumberFormatException nfe) {
+ }
+ }
+ StringBuffer sb = new StringBuffer();
+ sb.append((new Integer(morning)).toString());
+ sb.append("");
+ sb.append((new Integer(afternoon)).toString());
+ oc.collect(key, new Text(sb.toString()));
+ reporter.setStatus("OK");
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ if (args.length!=3) {
+ System.out.println("Parameters: inputDir outputDir parallel");
+ System.exit(1);
+ }
+ String inputDir = args[0];
+ String outputDir = args[1];
+ String parallel = args[2];
+ JobConf lp = new JobConf(L7.class);
+ lp.setJobName("L7 Load Page Views");
+ lp.setInputFormat(TextInputFormat.class);
+ lp.setOutputKeyClass(Text.class);
+ lp.setOutputValueClass(Text.class);
+ lp.setMapperClass(ReadPageViews.class);
+ lp.setCombinerClass(Combiner.class);
+ lp.setReducerClass(Group.class);
+ Properties props = System.getProperties();
+ for (Map.Entry<Object,Object> entry : props.entrySet()) {
+ lp.set((String)entry.getKey(), (String)entry.getValue());
+ }
+ FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+ FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L7out"));
+ lp.setNumReduceTasks(Integer.parseInt(parallel));
+ Job group = new Job(lp);
+
+ JobControl jc = new JobControl("L7 join");
+ jc.addJob(group);
+
+ new Thread(jc).start();
+
+ int i = 0;
+ while(!jc.allFinished()){
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ break;
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {}
+
+ if (i % 10000 == 0) {
+ System.out.println("Running jobs");
+ ArrayList<Job> running = jc.getRunningJobs();
+ if (running != null && running.size() > 0) {
+ for (Job r : running) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Ready jobs");
+ ArrayList<Job> ready = jc.getReadyJobs();
+ if (ready != null && ready.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Waiting jobs");
+ ArrayList<Job> waiting = jc.getWaitingJobs();
+ if (waiting != null && waiting.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Successful jobs");
+ ArrayList<Job> success = jc.getSuccessfulJobs();
+ if (success != null && success.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ }
+ i++;
+ }
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ }
+ jc.stop();
+ }
+
+}
Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L8.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L8.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L8.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L8.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,197 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L8 {
+
+ public static class ReadPageViews extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, Text> {
+
+ public void map(
+ LongWritable k,
+ Text val,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+
+ // Split the line
+ List<Text> fields = Library.splitLine(val, '');
+ if (fields.size() != 9) return;
+
+ StringBuffer sb = new StringBuffer();
+ sb.append(fields.get(2).toString());
+ sb.append("");
+ sb.append(fields.get(6).toString());
+ oc.collect(new Text("all"), new Text(sb.toString()));
+ }
+ }
+
+ public static class Combiner extends MapReduceBase
+ implements Reducer<Text, Text, Text, Text> {
+
+ public void reduce(
+ Text key,
+ Iterator<Text> iter,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+ int tsSum = 0, erCnt = 0;
+ double erSum = 0.0;
+ while (iter.hasNext()) {
+ List<Text> vals = Library.splitLine(iter.next(), '');
+ try {
+ tsSum += Integer.valueOf(vals.get(0).toString());
+ erSum += Double.valueOf(vals.get(1).toString());
+ erCnt++;
+ } catch (NumberFormatException nfe) {
+ }
+ }
+ StringBuffer sb = new StringBuffer();
+ sb.append((new Integer(tsSum)).toString());
+ sb.append("");
+ sb.append((new Double(erSum)).toString());
+ sb.append("");
+ sb.append((new Integer(erCnt)).toString());
+ oc.collect(key, new Text(sb.toString()));
+ reporter.setStatus("OK");
+ }
+ }
+ public static class Group extends MapReduceBase
+ implements Reducer<Text, Text, Text, Text> {
+
+ public void reduce(
+ Text key,
+ Iterator<Text> iter,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+ int tsSum = 0, erCnt = 0;
+ double erSum = 0.0;
+ while (iter.hasNext()) {
+ List<Text> vals = Library.splitLine(iter.next(), '');
+ try {
+ tsSum += Integer.valueOf(vals.get(0).toString());
+ erSum += Double.valueOf(vals.get(1).toString());
+ erCnt++;
+ } catch (NumberFormatException nfe) {
+ }
+ }
+ double erAvg = erSum / erCnt;
+ StringBuffer sb = new StringBuffer();
+ sb.append((new Integer(tsSum)).toString());
+ sb.append("");
+ sb.append((new Double(erAvg)).toString());
+ oc.collect(key, new Text(sb.toString()));
+ reporter.setStatus("OK");
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ if (args.length!=3) {
+ System.out.println("Parameters: inputDir outputDir parallel");
+ System.exit(1);
+ }
+ String inputDir = args[0];
+ String outputDir = args[1];
+ String parallel = args[2];
+ JobConf lp = new JobConf(L8.class);
+ lp.setJobName("L8 Load Page Views");
+ lp.setInputFormat(TextInputFormat.class);
+ lp.setOutputKeyClass(Text.class);
+ lp.setOutputValueClass(Text.class);
+ lp.setMapperClass(ReadPageViews.class);
+ lp.setCombinerClass(Combiner.class);
+ lp.setReducerClass(Group.class);
+ Properties props = System.getProperties();
+ for (Map.Entry<Object,Object> entry : props.entrySet()) {
+ lp.set((String)entry.getKey(), (String)entry.getValue());
+ }
+ FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+ FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L8out"));
+ lp.setNumReduceTasks(1);
+ Job group = new Job(lp);
+
+ JobControl jc = new JobControl("L8 join");
+ jc.addJob(group);
+
+ new Thread(jc).start();
+
+ int i = 0;
+ while(!jc.allFinished()){
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ break;
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {}
+
+ if (i % 10000 == 0) {
+ System.out.println("Running jobs");
+ ArrayList<Job> running = jc.getRunningJobs();
+ if (running != null && running.size() > 0) {
+ for (Job r : running) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Ready jobs");
+ ArrayList<Job> ready = jc.getReadyJobs();
+ if (ready != null && ready.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Waiting jobs");
+ ArrayList<Job> waiting = jc.getWaitingJobs();
+ if (waiting != null && waiting.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Successful jobs");
+ ArrayList<Job> success = jc.getSuccessfulJobs();
+ if (success != null && success.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ }
+ i++;
+ }
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ }
+ jc.stop();
+ }
+
+}
Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L9.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L9.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L9.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/L9.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,230 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.apache.pig.test.pigmix.mapreduce.Library;
+
+public class L9 {
+
+ public static class ReadPageViews extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, Text> {
+
+ public void map(
+ LongWritable k,
+ Text val,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+
+ // Split the line
+ List<Text> fields = Library.splitLine(val, '');
+ if (fields.size() != 9) return;
+
+ oc.collect(fields.get(0), val);
+ }
+ }
+
+ public static class MyPartitioner implements Partitioner<Text, Text> {
+
+ public Map<Character, Integer> map;
+
+ public int getPartition(Text key, Text value, int numPartitions) {
+ int rc = 0;
+ String s = key.toString();
+ if (s == null || s.length() < 2) return 39;
+ if (s.charAt(0) > ']') rc += 20;
+ rc += map.get(s.charAt(1));
+ return rc;
+ }
+
+ public void configure(JobConf conf) {
+ // Don't actually do any configuration, do the setup of the hash
+ // because this call is guaranteed to be made each time we set up
+ // MyPartitioner
+ map = new HashMap<Character, Integer>(57);
+ map.put('A', 0);
+ map.put('B', 1);
+ map.put('C', 2);
+ map.put('D', 3);
+ map.put('E', 4);
+ map.put('F', 5);
+ map.put('G', 6);
+ map.put('I', 7);
+ map.put('H', 8);
+ map.put('J', 9);
+ map.put('K', 10);
+ map.put('L', 11);
+ map.put('M', 12);
+ map.put('N', 13);
+ map.put('O', 14);
+ map.put('P', 15);
+ map.put('Q', 16);
+ map.put('R', 17);
+ map.put('S', 18);
+ map.put('T', 19);
+ map.put('U', 0);
+ map.put('V', 1);
+ map.put('W', 2);
+ map.put('X', 3);
+ map.put('Y', 4);
+ map.put('Z', 5);
+ map.put('[', 6);
+ map.put('\\', 7);
+ map.put(']', 8);
+ map.put('^', 9);
+ map.put('_', 10);
+ map.put('`', 11);
+ map.put('a', 12);
+ map.put('b', 13);
+ map.put('c', 14);
+ map.put('d', 15);
+ map.put('e', 16);
+ map.put('f', 17);
+ map.put('g', 18);
+ map.put('h', 19);
+ map.put('i', 0);
+ map.put('j', 1);
+ map.put('k', 2);
+ map.put('l', 3);
+ map.put('m', 4);
+ map.put('n', 5);
+ map.put('o', 6);
+ map.put('p', 7);
+ map.put('q', 8);
+ map.put('r', 9);
+ map.put('s', 10);
+ map.put('t', 11);
+ map.put('u', 12);
+ map.put('v', 13);
+ map.put('w', 14);
+ map.put('x', 15);
+ map.put('y', 16);
+ map.put('z', 17);
+ }
+ }
+
+ public static class Group extends MapReduceBase
+ implements Reducer<Text, Text, Text, Text> {
+
+ public void reduce(
+ Text key,
+ Iterator<Text> iter,
+ OutputCollector<Text, Text> oc,
+ Reporter reporter) throws IOException {
+ while (iter.hasNext()) {
+ oc.collect(null, iter.next());
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ if (args.length!=3) {
+ System.out.println("Parameters: inputDir outputDir parallel");
+ System.exit(1);
+ }
+ String inputDir = args[0];
+ String outputDir = args[1];
+ String parallel = args[2];
+ JobConf lp = new JobConf(L9.class);
+ lp.setJobName("L9 Load Page Views");
+ lp.setInputFormat(TextInputFormat.class);
+ lp.setOutputKeyClass(Text.class);
+ lp.setOutputValueClass(Text.class);
+ lp.setMapperClass(ReadPageViews.class);
+ lp.setReducerClass(Group.class);
+ lp.setPartitionerClass(MyPartitioner.class);
+ Properties props = System.getProperties();
+ for (Map.Entry<Object,Object> entry : props.entrySet()) {
+ lp.set((String)entry.getKey(), (String)entry.getValue());
+ }
+ FileInputFormat.addInputPath(lp, new Path(inputDir + "/page_views"));
+ FileOutputFormat.setOutputPath(lp, new Path(outputDir + "/L9out"));
+ // Hardcode the parallel to 40 since MyPartitioner assumes it
+ lp.setNumReduceTasks(40);
+ Job group = new Job(lp);
+
+ JobControl jc = new JobControl("L9 join");
+ jc.addJob(group);
+
+ new Thread(jc).start();
+
+ int i = 0;
+ while(!jc.allFinished()){
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ break;
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {}
+
+ if (i % 10000 == 0) {
+ System.out.println("Running jobs");
+ ArrayList<Job> running = jc.getRunningJobs();
+ if (running != null && running.size() > 0) {
+ for (Job r : running) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Ready jobs");
+ ArrayList<Job> ready = jc.getReadyJobs();
+ if (ready != null && ready.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Waiting jobs");
+ ArrayList<Job> waiting = jc.getWaitingJobs();
+ if (waiting != null && waiting.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ System.out.println("Successful jobs");
+ ArrayList<Job> success = jc.getSuccessfulJobs();
+ if (success != null && success.size() > 0) {
+ for (Job r : ready) {
+ System.out.println(r.getJobName());
+ }
+ }
+ }
+ i++;
+ }
+ ArrayList<Job> failures = jc.getFailedJobs();
+ if (failures != null && failures.size() > 0) {
+ for (Job failure : failures) {
+ System.err.println(failure.getMessage());
+ }
+ }
+ jc.stop();
+ }
+
+}
Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/Library.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/Library.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/Library.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/mapreduce/Library.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,45 @@
+package org.apache.pig.test.pigmix.mapreduce;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * A collection of static functions for use by the pigmix map reduce tasks.
+ */
+public class Library {
+
+ public static List<Text> splitLine(Text line, char delimiter) {
+ String s = line.toString();
+ List<Text> cols = new ArrayList<Text>();
+ int start = 0;
+ for (int i = 0; i < s.length(); i++) {
+ if (s.charAt(i) == delimiter) {
+ if (start == i) cols.add(new Text()); // null case
+ else cols.add(new Text(s.substring(start, i)));
+ start = i + 1;
+ }
+ }
+ // Grab the last one.
+ if (start != s.length() - 1) cols.add(new Text(s.substring(start)));
+
+ return cols;
+ }
+
+ public static Text mapLookup(Text mapCol, Text key) {
+ List<Text> kvps = splitLine(mapCol, '');
+
+ for (Text potential : kvps) {
+ // Split potential on ^D
+ List<Text> kv = splitLine(potential, '');
+ if (kv.size() != 2) return null;
+ if (kv.get(0).equals(potential)) return kv.get(1);
+ }
+
+ return null;
+ }
+
+
+
+}
Added: pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/PigPerformanceLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/PigPerformanceLoader.java?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/PigPerformanceLoader.java (added)
+++ pig/trunk/test/perf/pigmix/src/java/org/apache/pig/test/pigmix/udf/PigPerformanceLoader.java Mon Apr 15 23:13:09 2013
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.pigmix.udf;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.LoadCaster;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+
+/**
+ * A load function for the performance tests.
+ */
+public class PigPerformanceLoader extends PigStorage {
+
+ BagFactory bagFactory;
+ TupleFactory tupleFactory;
+
+ public PigPerformanceLoader() {
+ // Assume ^A as a delimiter
+ super("");
+ bagFactory = BagFactory.getInstance();
+ tupleFactory = TupleFactory.getInstance();
+ }
+
+ @Override
+ public LoadCaster getLoadCaster() throws IOException {
+ return new Caster();
+ }
+
+ class Caster implements LoadCaster {
+
+ Utf8StorageConverter helper = new Utf8StorageConverter();
+ /**
+ *
+ */
+ public Caster() {
+ // TODO Auto-generated constructor stub
+ }
+
+ public DataBag bytesToBag(byte[] b, ResourceFieldSchema fs) throws IOException {
+ if (b == null) return null;
+
+ DataBag bag = bagFactory.newDefaultBag();
+
+ int pos = 0;
+ while (pos < b.length) {
+ Tuple t = tupleFactory.newTuple(1);
+
+ // Figure out how long until the next element in the list.
+ int start = pos;
+ while (pos < b.length && b[pos] != 2) pos++; // 2 is ^B
+
+ byte[] copy = new byte[pos - start];
+ int i, j;
+ for (i = start + 1, j = 0; i < pos; i++, j++) copy[j] = b[i];
+
+ // The first byte will tell us what type the field is.
+ try {
+ switch (b[start]) {
+ case 105: t.set(0, bytesToInteger(copy)); break;
+ case 108: t.set(0, bytesToLong(copy)); break;
+ case 102: t.set(0, bytesToFloat(copy)); break;
+ case 100: t.set(0, bytesToDouble(copy)); break;
+ case 115: t.set(0, bytesToCharArray(copy)); break;
+ case 109: t.set(0, bytesToMap(copy)); break;
+ case 98: t.set(0, bytesToBag(copy, null)); break;
+ default: throw new RuntimeException("unknown type " + b[start]);
+ }
+ } catch (ExecException ee) {
+ throw new IOException(ee);
+ }
+ pos++; // move past the separator
+ bag.add(t);
+ }
+
+ return bag;
+ }
+
+ public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+ if (b == null) return null;
+
+ Map<String, Object> m = new HashMap<String, Object>(26);
+
+ int pos = 0;
+ while (pos < b.length) {
+
+ // The key is always one character at the moment.
+ byte[] k = new byte[1];
+ k[0] = b[pos];
+ String key = new String(k);
+ pos += 2;
+ int start = pos;
+ while (pos < b.length && b[pos] != 3) pos++; // 3 is ^C
+
+ byte[] copy = new byte[pos - start];
+ int i, j;
+ for (i = start + 1, j = 0; i < pos; i++, j++) copy[j] = b[i];
+ String val = bytesToCharArray(copy);
+ m.put(key, val);
+ pos++; // move past ^C
+ }
+ return m;
+ }
+
+ @Override
+ public String bytesToCharArray(byte[] arg0) throws IOException {
+ return helper.bytesToCharArray(arg0);
+ }
+
+ @Override
+ public Double bytesToDouble(byte[] arg0) throws IOException {
+ return helper.bytesToDouble(arg0);
+ }
+
+ @Override
+ public Float bytesToFloat(byte[] arg0) throws IOException {
+ return helper.bytesToFloat(arg0);
+ }
+
+ @Override
+ public Integer bytesToInteger(byte[] arg0) throws IOException {
+ return helper.bytesToInteger(arg0);
+ }
+
+ @Override
+ public Long bytesToLong(byte[] arg0) throws IOException {
+ return helper.bytesToLong(arg0);
+ }
+
+ @Override
+ public Tuple bytesToTuple(byte[] arg0, ResourceFieldSchema fs) throws IOException {
+ return helper.bytesToTuple(arg0, fs);
+ }
+
+ @Override
+ public Boolean bytesToBoolean(byte[] arg0) throws IOException {
+ return helper.bytesToBoolean(arg0);
+ }
+
+ @Override
+ public DateTime bytesToDateTime(byte[] arg0) throws IOException {
+ return helper.bytesToDateTime(arg0);
+ }
+
+ @Override
+ public Map<String, Object> bytesToMap(byte[] arg0,
+ ResourceFieldSchema fs) throws IOException {
+ return helper.bytesToMap(arg0, fs);
+ }
+
+ @Override
+ public BigInteger bytesToBigInteger(byte[] arg0) throws IOException {
+ return helper.bytesToBigInteger(arg0);
+ }
+
+ @Override
+ public BigDecimal bytesToBigDecimal(byte[] arg0) throws IOException {
+ return helper.bytesToBigDecimal(arg0);
+ }
+ }
+}
Added: pig/trunk/test/perf/pigmix/src/pig/L1.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L1.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L1.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L1.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,14 @@
+-- This script tests reading from a map, flattening a bag of maps, and use of bincond.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links);
+B = foreach A generate user, (int)action as action, (map[])page_info as page_info,
+ flatten((bag{tuple(map[])})page_links) as page_links;
+C = foreach B generate user,
+ (action == 1 ? page_info#'a' : page_links#'b') as header;
+D = group C by user parallel $PARALLEL;
+E = foreach D generate group, COUNT(C) as cnt;
+store E into '$PIGMIX_OUTPUT/L1out';
+
+
Added: pig/trunk/test/perf/pigmix/src/pig/L10.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L10.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L10.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L10.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,7 @@
+--This script covers order by of multiple values.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent:int, query_term, ip_addr, timestamp,
+ estimated_revenue:double, page_info, page_links);
+B = order A by query_term, estimated_revenue desc, timespent parallel $PARALLEL;
+store B into '$PIGMIX_OUTPUT/L10out';
Added: pig/trunk/test/perf/pigmix/src/pig/L11.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L11.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L11.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L11.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+-- This script covers distinct and union.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links);
+B = foreach A generate user;
+C = distinct B parallel $PARALLEL;
+alpha = load '$HDFS_ROOT/widerow' using PigStorage('\u0001');
+beta = foreach alpha generate $0 as name;
+gamma = distinct beta parallel $PARALLEL;
+D = union C, gamma;
+E = distinct D parallel $PARALLEL;
+store E into '$PIGMIX_OUTPUT/L11out';
Added: pig/trunk/test/perf/pigmix/src/pig/L12.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L12.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L12.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L12.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,18 @@
+-- This script covers multi-store queries.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links);
+B = foreach A generate user, action, (int)timespent as timespent, query_term,
+ (double)estimated_revenue as estimated_revenue;
+split B into C if user is not null, alpha if user is null;
+split C into D if query_term is not null, aleph if query_term is null;
+E = group D by user parallel $PARALLEL;
+F = foreach E generate group, MAX(D.estimated_revenue);
+store F into '$PIGMIX_OUTPUT/highest_value_page_per_user';
+beta = group alpha by query_term parallel $PARALLEL;
+gamma = foreach beta generate group, SUM(alpha.timespent);
+store gamma into '$PIGMIX_OUTPUT/total_timespent_per_term';
+beth = group aleph by action parallel $PARALLEL;
+gimel = foreach beth generate group, COUNT(aleph);
+store gimel into '$PIGMIX_OUTPUT/queries_per_action';
Added: pig/trunk/test/perf/pigmix/src/pig/L13.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L13.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L13.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L13.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,9 @@
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user, estimated_revenue;
+alpha = load '$HDFS_ROOT/power_users_samples' using PigStorage('\u0001') as (name, phone, address, city, state, zip);
+beta = foreach alpha generate name, phone;
+C = join B by user left outer, beta by name parallel $PARALLEL;
+store C into '$PIGMIX_OUTPUT/L13out';
+
Added: pig/trunk/test/perf/pigmix/src/pig/L14.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L14.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L14.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L14.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,9 @@
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views_sorted' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user, estimated_revenue;
+alpha = load '$HDFS_ROOT/users_sorted' using PigStorage('\u0001') as (name, phone, address, city, state, zip);
+beta = foreach alpha generate name;
+C = join B by user, beta by name using 'merge';
+store C into '$PIGMIX_OUTPUT/L14out';
+
Added: pig/trunk/test/perf/pigmix/src/pig/L15.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L15.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L15.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L15.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user, action, estimated_revenue, timespent;
+C = group B by user parallel $PARALLEL;
+D = foreach C {
+ beth = distinct B.action;
+ rev = distinct B.estimated_revenue;
+ ts = distinct B.timespent;
+ generate group, COUNT(beth), SUM(rev), (int)AVG(ts);
+}
+store D into '$PIGMIX_OUTPUT/L15out';
+
Added: pig/trunk/test/perf/pigmix/src/pig/L16.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L16.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L16.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L16.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user, estimated_revenue;
+C = group B by user parallel $PARALLEL;
+D = foreach C {
+ E = order B by estimated_revenue;
+ F = E.estimated_revenue;
+ generate group, SUM(F);
+}
+
+store D into '$PIGMIX_OUTPUT/L16out';
+
Added: pig/trunk/test/perf/pigmix/src/pig/L17.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L17.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L17.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L17.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/widegroupbydata' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links, user_1, action_1, timespent_1, query_term_1, ip_addr_1, timestamp_1,
+ estimated_revenue_1, page_info_1, page_links_1, user_2, action_2, timespent_2, query_term_2, ip_addr_2, timestamp_2,
+ estimated_revenue_2, page_info_2, page_links_2);
+B = group A by (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, user_1, action_1, timespent_1, query_term_1, ip_addr_1, timestamp_1,
+ estimated_revenue_1, user_2, action_2, timespent_2, query_term_2, ip_addr_2, timestamp_2,
+ estimated_revenue_2) parallel $PARALLEL;
+C = foreach B generate SUM(A.timespent), SUM(A.timespent_1), SUM(A.timespent_2), AVG(A.estimated_revenue), AVG(A.estimated_revenue_1), AVG(A.estimated_revenue_2);
+store C into '$PIGMIX_OUTPUT/L17out';
+
Added: pig/trunk/test/perf/pigmix/src/pig/L2.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L2.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L2.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L2.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,12 @@
+-- This script tests using a join small enough to do in fragment and replicate.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links);
+B = foreach A generate user, estimated_revenue;
+alpha = load '$HDFS_ROOT/power_users' using PigStorage('\u0001') as (name, phone,
+ address, city, state, zip);
+beta = foreach alpha generate name;
+C = join B by user, beta by name using 'replicated' parallel $PARALLEL;
+store C into '$PIGMIX_OUTPUT/L2out';
+
Added: pig/trunk/test/perf/pigmix/src/pig/L3.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L3.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L3.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L3.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,16 @@
+--This script tests a join too large for fragment and replicate. It also
+--contains a join followed by a group by on the same key, something that we
+--could potentially optimize by not regrouping.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links);
+B = foreach A generate user, (double)estimated_revenue;
+alpha = load '$HDFS_ROOT/users' using PigStorage('\u0001') as (name, phone, address,
+ city, state, zip);
+beta = foreach alpha generate name;
+C = join beta by name, B by user parallel $PARALLEL;
+D = group C by $0 parallel $PARALLEL;
+E = foreach D generate group, SUM(C.estimated_revenue);
+store E into '$PIGMIX_OUTPUT/L3out';
+
Added: pig/trunk/test/perf/pigmix/src/pig/L4.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L4.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L4.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L4.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,13 @@
+-- This script covers foreach/generate with a nested distinct.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links);
+B = foreach A generate user, action;
+C = group B by user parallel $PARALLEL;
+D = foreach C {
+ aleph = B.action;
+ beth = distinct aleph;
+ generate group, COUNT(beth);
+}
+store D into '$PIGMIX_OUTPUT/L4out';
Added: pig/trunk/test/perf/pigmix/src/pig/L5.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L5.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L5.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L5.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,14 @@
+--This script does an anti-join. This is useful because it is a use of
+--cogroup that is not a regular join.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links);
+B = foreach A generate user;
+alpha = load '$HDFS_ROOT/users' using PigStorage('\u0001') as (name, phone, address,
+ city, state, zip);
+beta = foreach alpha generate name;
+C = cogroup beta by name, B by user parallel $PARALLEL;
+D = filter C by COUNT(beta) == 0;
+E = foreach D generate group;
+store E into '$PIGMIX_OUTPUT/L5out';
Added: pig/trunk/test/perf/pigmix/src/pig/L6.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L6.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L6.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L6.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,11 @@
+-- This script covers the case where the group by key is a significant
+-- percentage of the row.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links);
+B = foreach A generate user, action, (int)timespent as timespent, query_term, ip_addr, timestamp;
+C = group B by (user, query_term, ip_addr, timestamp) parallel $PARALLEL;
+D = foreach C generate flatten(group), SUM(B.timespent);
+store D into '$PIGMIX_OUTPUT/L6out';
+
Added: pig/trunk/test/perf/pigmix/src/pig/L7.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L7.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L7.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L7.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,12 @@
+-- This script covers having a nested plan with splits.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader() as (user, action, timespent, query_term,
+ ip_addr, timestamp, estimated_revenue, page_info, page_links);
+B = foreach A generate user, timestamp;
+C = group B by user parallel $PARALLEL;
+D = foreach C {
+ morning = filter B by timestamp < 43200;
+ afternoon = filter B by timestamp >= 43200;
+ generate group, COUNT(morning), COUNT(afternoon);
+}
+store D into '$PIGMIX_OUTPUT/L7out';
Added: pig/trunk/test/perf/pigmix/src/pig/L8.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L8.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L8.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L8.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,9 @@
+-- This script covers group all.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links);
+B = foreach A generate user, (int)timespent as timespent, (double)estimated_revenue as estimated_revenue;
+C = group B all;
+D = foreach C generate SUM(B.timespent), AVG(B.estimated_revenue);
+store D into '$PIGMIX_OUTPUT/L8out';
Added: pig/trunk/test/perf/pigmix/src/pig/L9.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/src/pig/L9.pig?rev=1468268&view=auto
==============================================================================
--- pig/trunk/test/perf/pigmix/src/pig/L9.pig (added)
+++ pig/trunk/test/perf/pigmix/src/pig/L9.pig Mon Apr 15 23:13:09 2013
@@ -0,0 +1,7 @@
+--This script covers order by of a single value.
+register $PIGMIX_JAR
+A = load '$HDFS_ROOT/page_views' using org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
+ as (user, action, timespent, query_term, ip_addr, timestamp,
+ estimated_revenue, page_info, page_links);
+B = order A by query_term parallel $PARALLEL;
+store B into '$PIGMIX_OUTPUT/L9out';