You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC
svn commit: r883836 [22/23] - in /hadoop/pig/branches/load-store-redesign:
./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/
contrib/zebra/ contrib/zebr...
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue Nov 24 19:54:19 2009
@@ -297,7 +297,7 @@
@Test
public void testQuery22Fail() {
- buildPlan("A = load 'a';");
+ buildPlan("A = load 'a' as (a:int, b: double);");
try {
buildPlan("B = group A by (*, $0);");
} catch (AssertionFailedError e) {
@@ -329,15 +329,50 @@
@Test
public void testQuery23Fail() {
+ buildPlan("A = load 'a' as (a: int, b:double);");
+ buildPlan("B = load 'b';");
+ boolean exceptionThrown = false;
+ try {
+ buildPlan("C = cogroup A by (*, $0), B by ($0, $1);");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+ "do not match"));
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void testQuery23Fail2() {
buildPlan("A = load 'a';");
buildPlan("B = load 'b';");
+ boolean exceptionThrown = false;
try {
- buildPlan("C = group A by (*, $0), B by ($0, $1);");
+ buildPlan("C = cogroup A by (*, $0), B by ($0, $1);");
} catch (AssertionFailedError e) {
- assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
+ assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+ "the input has a schema"));
+ exceptionThrown = true;
}
+ assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void testQuery23Fail3() {
+ buildPlan("A = load 'a' as (a: int, b:double);");
+ buildPlan("B = load 'b' as (a:int);");
+ boolean exceptionThrown = false;
+ try {
+ buildPlan("C = cogroup A by *, B by *;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+ "do not match"));
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
}
+
@Test
public void testQuery24() {
buildPlan("a = load 'a';");
@@ -1591,7 +1626,7 @@
}
@Test
- public void testQuery110() throws FrontendException, ParseException {
+ public void testQuery110Fail() throws FrontendException, ParseException {
LogicalPlan lp;
LOLoad load;
LOCogroup cogroup;
@@ -1600,13 +1635,16 @@
lp = buildPlan("b = load 'two';");
load = (LOLoad) lp.getLeaves().get(0);
-
+ boolean exceptionThrown = false;
+ try{
lp = buildPlan("c = cogroup a by $0, b by *;");
- cogroup = (LOCogroup) lp.getLeaves().get(0);
-
- MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cogroup.getGroupByPlans();
- LogicalPlan cogroupPlan = (LogicalPlan)(mapGByPlans.get(load).toArray())[0];
- assertTrue(checkPlanForProjectStar(cogroupPlan) == true);
+ } catch(AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+ "the input has a schema"));
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+
}
@@ -2051,6 +2089,37 @@
fail();
}
+ @Test
+ public void testCogroupByStarFailure1() {
+ boolean exceptionThrown = false;
+ try {
+ buildPlan(" a = load '1.txt' as (a0:int, a1:int);");
+ buildPlan(" b = load '2.txt'; ");
+ buildPlan("c = cogroup a by *, b by *;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+ "the input has a schema"));
+ exceptionThrown = true;
+ }
+ assertEquals("An exception was expected but did " +
+ "not occur", true, exceptionThrown);
+ }
+
+ @Test
+ public void testCogroupByStarFailure2() {
+ boolean exceptionThrown = false;
+ try {
+ buildPlan(" a = load '1.txt' ;");
+ buildPlan(" b = load '2.txt' as (b0:int, b1:int); ");
+ buildPlan("c = cogroup a by *, b by *;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+ "the input has a schema"));
+ exceptionThrown = true;
+ }
+ assertEquals("An exception was expected but did " +
+ "not occur", true, exceptionThrown);
+ }
private void printPlan(LogicalPlan lp) {
LOPrinter graphPrinter = new LOPrinter(System.err, lp);
System.err.println("Printing the logical plan");
@@ -2131,5 +2200,5 @@
Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
Map<String, String> fileNameMap = new HashMap<String, String>();
- PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+ PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringReader;
import java.util.ArrayList;
@@ -33,19 +34,33 @@
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.util.ExecTools;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.util.ExecTools;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.data.BagFactory;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -53,14 +68,12 @@
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorPlan;
-import org.apache.pig.tools.grunt.GruntParser;
import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.tools.grunt.GruntParser;
import org.apache.pig.tools.pigscript.parser.ParseException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.jobcontrol.Job;
public class TestMultiQuery extends TestCase {
@@ -79,6 +92,285 @@
public void tearDown() throws Exception {
myPig = null;
}
+
+ @Test
+ public void testMultiQueryJiraPig1060() {
+
+ // test case:
+
+ String INPUT_FILE = "pig-1060.txt";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("apple\t2");
+ w.println("apple\t12");
+ w.println("orange\t3");
+ w.println("orange\t23");
+ w.println("strawberry\t10");
+ w.println("strawberry\t34");
+
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("data = load '" + INPUT_FILE +
+ "' as (name:chararray, gid:int);");
+ myPig.registerQuery("f1 = filter data by gid < 5;");
+ myPig.registerQuery("g1 = group f1 by name;");
+ myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);");
+ myPig.registerQuery("store p1 into '/tmp/output1';");
+
+ myPig.registerQuery("f2 = filter data by gid > 5;");
+ myPig.registerQuery("g2 = group f2 by name;");
+ myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);");
+ myPig.registerQuery("store p2 into '/tmp/output2';");
+
+ myPig.registerQuery("f3 = filter f2 by gid > 10;");
+ myPig.registerQuery("g3 = group f3 by name;");
+ myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);");
+ myPig.registerQuery("store p3 into '/tmp/output3';");
+
+ myPig.registerQuery("f4 = filter f3 by gid < 20;");
+ myPig.registerQuery("g4 = group f4 by name;");
+ myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);");
+ myPig.registerQuery("store p4 into '/tmp/output4';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 4, 27);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 4, 35);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig1060_2() {
+
+ // test case:
+
+ String INPUT_FILE = "pig-1060.txt";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("apple\t2");
+ w.println("apple\t12");
+ w.println("orange\t3");
+ w.println("orange\t23");
+ w.println("strawberry\t10");
+ w.println("strawberry\t34");
+
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("data = load '" + INPUT_FILE +
+ "' as (name:chararray, gid:int);");
+ myPig.registerQuery("f1 = filter data by gid < 5;");
+ myPig.registerQuery("g1 = group f1 by name;");
+ myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);");
+ myPig.registerQuery("store p1 into '/tmp/output1';");
+
+ myPig.registerQuery("f2 = filter data by gid > 5;");
+ myPig.registerQuery("g2 = group f2 by name;");
+ myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);");
+ myPig.registerQuery("store p2 into '/tmp/output2';");
+
+ myPig.registerQuery("f3 = filter f2 by gid > 10;");
+ myPig.registerQuery("g3 = group f3 by name;");
+ myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);");
+ myPig.registerQuery("store p3 into '/tmp/output3';");
+
+ myPig.registerQuery("f4 = filter f3 by gid < 20;");
+ myPig.registerQuery("g4 = group f4 by name;");
+ myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);");
+ myPig.registerQuery("store p4 into '/tmp/output4';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(4, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig920() {
+
+ // test case: a simple diamond query
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by gid >= 5;");
+ myPig.registerQuery("d = cogroup c by $0, b by $0;");
+ myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
+ myPig.registerQuery("store e into '/tmp/output1';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 1, 10);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 1, 13);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig920_1() {
+
+ // test case: a query with two diamonds
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by gid >= 5;");
+ myPig.registerQuery("d = filter a by uid >= 5;");
+ myPig.registerQuery("e = filter a by gid < 5;");
+ myPig.registerQuery("f = cogroup c by $0, b by $0;");
+ myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
+ myPig.registerQuery("store f1 into '/tmp/output1';");
+ myPig.registerQuery("g = cogroup d by $0, e by $0;");
+ myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
+ myPig.registerQuery("store g1 into '/tmp/output2';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 2, 17);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 23);
+
+ checkMRPlan(pp, 2, 2, 2);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig920_2() {
+
+ // test case: execution of a query with two diamonds
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by gid >= 5;");
+ myPig.registerQuery("d = filter a by uid >= 5;");
+ myPig.registerQuery("e = filter a by gid < 5;");
+ myPig.registerQuery("f = cogroup c by $0, b by $0;");
+ myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
+ myPig.registerQuery("store f1 into '/tmp/output1';");
+ myPig.registerQuery("g = cogroup d by $0, e by $0;");
+ myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
+ myPig.registerQuery("store g1 into '/tmp/output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig920_3() {
+
+ // test case: execution of a simple diamond query
+
+ String INPUT_FILE = "pig-920.txt";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("apple\tapple\t100\t10");
+ w.println("apple\tapple\t200\t20");
+ w.println("orange\torange\t100\t10");
+ w.println("orange\torange\t300\t20");
+
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load '" + INPUT_FILE +
+ "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 300;");
+ myPig.registerQuery("c = filter a by gid > 10;");
+ myPig.registerQuery("d = cogroup c by $0, b by $0;");
+ myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
+
+ Iterator<Tuple> iter = myPig.openIterator("e");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('apple',1L,2L)",
+ "('orange',1L,1L)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+
+ assertEquals(expectedResults.size(), counter);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ }
+ }
@Test
public void testMultiQueryWithDemoCase() {
@@ -327,6 +619,10 @@
List<ExecJob> jobs = myPig.executeBatch();
assertTrue(jobs.size() == 2);
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -449,7 +745,11 @@
myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);");
myPig.registerQuery("store d2 into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -481,7 +781,7 @@
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 25);
- checkMRPlan(pp, 1, 1, 3);
+ checkMRPlan(pp, 1, 1, 2);
} catch (Exception e) {
e.printStackTrace();
@@ -509,8 +809,13 @@
myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
myPig.registerQuery("store f1 into '/tmp/output2';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertTrue(jobs.size() == 2);
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -577,8 +882,12 @@
myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
myPig.registerQuery("store d2 into '/tmp/output3';");
- myPig.executeBatch();
-
+ List<ExecJob> jobs = myPig.executeBatch();
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -645,7 +954,12 @@
myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
myPig.registerQuery("store d2 into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(3, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -713,7 +1027,12 @@
myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);");
myPig.registerQuery("store d2 into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(3, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -826,7 +1145,12 @@
myPig.registerQuery("H = foreach G generate group, COUNT(A1);");
myPig.registerQuery("store H into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(3, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -892,7 +1216,12 @@
myPig.registerQuery("g1 = foreach g generate group, COUNT(d2);");
myPig.registerQuery("store g1 into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(3, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -948,7 +1277,12 @@
myPig.registerQuery("e = foreach d generate flatten(b), flatten(c);");
myPig.registerQuery("store e into '/tmp/output2';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertTrue(jobs.size() == 2);
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -1006,7 +1340,12 @@
myPig.registerQuery("e = join c by gid, d by gid using \"repl\";");
myPig.registerQuery("store e into '/tmp/output3';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(3, jobs.size());
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -1115,7 +1454,12 @@
myPig.registerQuery("b = load '/tmp/output1' using PigStorage(':'); ");
myPig.registerQuery("store b into '/tmp/output2';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertTrue(jobs.size() == 2);
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
@@ -1433,7 +1777,7 @@
}
}
-
+
@Test
public void testMultiQueryWithTwoStores() {
@@ -1904,7 +2248,156 @@
Assert.fail();
}
}
+
+ /**
+ * Test that pig calls checkOutputSpecs() method of the OutputFormat (if the
+ * StoreFunc defines an OutputFormat as the return value of
+ * {@link StoreFunc#getStorePreparationClass()}
+ * @throws IOException
+ */
+ @Test
+ public void testMultiStoreWithOutputFormat() throws IOException {
+ Util.createInputFile(cluster, "input.txt", new String[] {"hello", "bye"});
+ String query = "a = load 'input.txt';" +
+ "b = filter a by $0 < 10;" +
+ "store b into 'output1' using "+DummyStoreWithOutputFormat.class.getName()+"();" +
+ "c = group a by $0;" +
+ "d = foreach c generate group, COUNT(a.$0);" +
+ "store d into 'output2' using "+DummyStoreWithOutputFormat.class.getName()+"();" ;
+ myPig.setBatchOn();
+ Util.registerMultiLineQuery(myPig, query);
+ myPig.executeBatch();
+
+ // check that files were created as a result of the
+ // checkOutputSpecs() method of the OutputFormat being called
+ FileSystem fs = cluster.getFileSystem();
+ assertEquals(true, fs.exists(new Path("output1_checkOutputSpec_test")));
+ assertEquals(true, fs.exists(new Path("output2_checkOutputSpec_test")));
+ Util.deleteFile(cluster, "input.txt");
+ Util.deleteFile(cluster, "output1_checkOutputSpec_test");
+ Util.deleteFile(cluster, "output2_checkOutputSpec_test");
+ }
+
+ public static class DummyStoreWithOutputFormat implements StoreFunc {
+
+ /**
+ *
+ */
+ public DummyStoreWithOutputFormat() {
+ // TODO Auto-generated constructor stub
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#putNext(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public void putNext(Tuple f) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#checkSchema(org.apache.pig.ResourceSchema)
+ */
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#getOutputFormat()
+ */
+ @Override
+ public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat()
+ throws IOException {
+ return null;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#prepareToWrite(org.apache.hadoop.mapreduce.RecordWriter)
+ */
+ @Override
+ public void prepareToWrite(
+ org.apache.hadoop.mapreduce.RecordWriter writer)
+ throws IOException {
+
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#relToAbsPathForStoreLocation(java.lang.String, org.apache.hadoop.fs.Path)
+ */
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.StoreFunc#setStoreLocation(java.lang.String, org.apache.hadoop.mapreduce.Job)
+ */
+ @Override
+ public void setStoreLocation(String location, Job job)
+ throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public static class DummyOutputFormat
+ extends OutputFormat<WritableComparable, Tuple> {
+
+ public DummyOutputFormat() {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
+ */
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException,
+ InterruptedException {
+ StoreConfig sConfig = MapRedUtil.getStoreConfig(context.
+ getConfiguration());
+ FileSystem fs = FileSystem.get(context.getConfiguration());
+ // create a file to test that this method got called
+ fs.create(new Path(sConfig.getLocation() + "_checkOutputSpec_test"));
+ // TODO Auto-generated method stub
+
+ }
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.OutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public org.apache.hadoop.mapreduce.RecordWriter<WritableComparable, Tuple> getRecordWriter(
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ }
+
+
// --------------------------------------------------------------------------
// Helper methods
@@ -2041,6 +2534,9 @@
showPlanOperators(mrp);
+ System.out.println("===== Display map-reduce Plan =====");
+ System.out.println(mrp.toString());
+
Assert.assertEquals(expectedRoots, mrp.getRoots().size());
Assert.assertEquals(expectedLeaves, mrp.getLeaves().size());
Assert.assertEquals(expectedSize, mrp.size());
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQueryLocal.java Tue Nov 24 19:54:19 2009
@@ -32,11 +32,6 @@
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.executionengine.util.ExecTools;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -86,7 +81,7 @@
LogicalPlan lp = checkLogicalPlan(1, 2, 9);
// XXX Physical plan has one less node in the local case
- PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 11);
Assert.assertTrue(executePlan(pp));
@@ -186,7 +181,7 @@
LogicalPlan lp = checkLogicalPlan(1, 3, 14);
- PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 17);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14);
Assert.assertTrue(executePlan(pp));
@@ -248,7 +243,7 @@
LogicalPlan lp = checkLogicalPlan(2, 3, 16);
// XXX the total number of ops is one less in the local case
- PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 21);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 19);
Assert.assertTrue(executePlan(pp));
@@ -459,7 +454,7 @@
myPig.registerQuery("store c into '/tmp/output5';");
LogicalPlan lp = checkLogicalPlan(1, 3, 12);
- PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 19);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 15);
myPig.executeBatch();
myPig.discardBatch();
@@ -536,7 +531,7 @@
private PhysicalPlan checkPhysicalPlan(LogicalPlan lp, int expectedRoots,
int expectedLeaves, int expectedSize) throws IOException {
- System.out.println("===== check physical plan =====");
+ System.out.println("===== check physical plan =====");
PhysicalPlan pp = myPig.getPigContext().getExecutionEngine().compile(
lp, null);
@@ -565,16 +560,38 @@
}
private void deleteOutputFiles() {
- try {
- FileLocalizer.delete("/tmp/output1", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output2", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output3", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output4", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output5", myPig.getPigContext());
+ String outputFiles[] = { "/tmp/output1",
+ "/tmp/output2",
+ "/tmp/output3",
+ "/tmp/output4",
+ "/tmp/output5"
+ };
+ try {
+ for( String outputFile : outputFiles ) {
+ if( isDirectory(outputFile) ) {
+ deleteDir( new File( outputFile ) );
+ } else {
+ FileLocalizer.delete(outputFile, myPig.getPigContext());
+ }
+ }
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
}
+
+ private void deleteDir( File file ) {
+ if( file.isDirectory() && file.listFiles().length != 0 ) {
+ for( File innerFile : file.listFiles() ) {
+ deleteDir( innerFile );
+ }
+ }
+ file.delete();
+ }
+
+ private boolean isDirectory( String filepath ) {
+ File file = new File( filepath );
+ return file.isDirectory();
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java Tue Nov 24 19:54:19 2009
@@ -46,8 +46,8 @@
public class TestPigContext extends TestCase {
private static final String TMP_DIR_PROP = "/tmp/hadoop-hadoop";
- private static final String FS_NAME = "machine:9000";
- private static final String JOB_TRACKER = "machine:9001";
+ private static final String FS_NAME = "file:///";
+ private static final String JOB_TRACKER = "local";
private File input;
private PigContext pigContext;
@@ -68,7 +68,7 @@
PigServer pigServer = new PigServer(pigContext);
registerAndStore(pigServer);
- check_asserts();
+ check_asserts(pigServer);
}
/**
@@ -79,7 +79,7 @@
PigServer pigServer = new PigServer(ExecType.LOCAL, getProperties());
registerAndStore(pigServer);
- check_asserts();
+ check_asserts(pigServer);
}
/**
@@ -91,7 +91,7 @@
PigServer pigServer = new PigServer(pigContext);
registerAndStore(pigServer);
- check_asserts();
+ check_asserts(pigServer);
}
@Test
@@ -218,7 +218,7 @@
}
private void registerAndStore(PigServer pigServer) throws IOException {
- pigServer.debugOn();
+ // pigServer.debugOn();
List<String> commands = getCommands();
for (final String command : commands) {
pigServer.registerQuery(command);
@@ -226,9 +226,9 @@
pigServer.store("counts", input.getAbsolutePath() + ".out");
}
- private void check_asserts() {
- assertEquals(JOB_TRACKER, pigContext.getProperties().getProperty("mapred.job.tracker"));
- assertEquals(FS_NAME, pigContext.getProperties().getProperty("fs.default.name"));
- assertEquals(TMP_DIR_PROP, pigContext.getProperties().getProperty("hadoop.tmp.dir"));
+ private void check_asserts(PigServer pigServer) {
+ assertEquals(JOB_TRACKER, pigServer.getPigContext().getProperties().getProperty("mapred.job.tracker"));
+ assertEquals(FS_NAME, pigServer.getPigContext().getProperties().getProperty("fs.default.name"));
+ assertEquals(TMP_DIR_PROP, pigServer.getPigContext().getProperties().getProperty("hadoop.tmp.dir"));
}
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigServer.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigServer.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigServer.java Tue Nov 24 19:54:19 2009
@@ -103,6 +103,7 @@
throws IOException {
assertFalse((new File(name)).canRead());
+ System.err. println("Location: " + location);
assertTrue((new File(location)).mkdirs());
assertTrue((new File(location + FILE_SEPARATOR + name)).
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStats.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStats.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStats.java Tue Nov 24 19:54:19 2009
@@ -25,6 +25,7 @@
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.tools.pigstats.PigStats;
public class TestPigStats extends TestCase {
@@ -34,19 +35,38 @@
File outputFile = null;
try {
outputFile = File.createTempFile("JIAR_1027", ".out");
+ String filePath = outputFile.getAbsolutePath();
+ outputFile.delete();
PigServer pig = new PigServer(ExecType.LOCAL);
pig
.registerQuery("A = load 'test/org/apache/pig/test/data/passwd';");
- PigStats stats = pig.store("A", outputFile.getAbsolutePath())
+ PigStats stats = pig.store("A", filePath)
.getStatistics();
- assertEquals(outputFile.length(), stats.getBytesWritten());
+ File dataFile = new File( outputFile.getAbsoluteFile() + File.separator + "part-00000" );
+ assertEquals(dataFile.length(), stats.getBytesWritten());
} catch (IOException e) {
+ e.printStackTrace();
+ System.err.println( e.getMessage() );
fail("IOException happened");
} finally {
if (outputFile != null) {
- outputFile.delete();
+ // Hadoop Local mode creates a directory
+ // Hence we need to delete a directory recursively
+ deleteDirectory(outputFile);
}
}
}
+
+ private void deleteDirectory( File dir ) {
+ File[] files = dir.listFiles();
+ for( File file : files ) {
+ if( file.isDirectory() ) {
+ deleteDirectory(file);
+ } else {
+ file.delete();
+ }
+ }
+ dir.delete();
+ }
}
Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStorage.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStorage.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStorage.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.MAPREDUCE;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPigStorage {
+
+ protected final Log log = LogFactory.getLog(getClass());
+
+ private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static PigServer pigServer = null;
+
+
+ @BeforeClass
+ public static void setup() {
+ try {
+ pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
+ } catch (ExecException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ pigServer.shutdown();
+ }
+
+ @Test
+ public void testBlockBoundary() {
+
+ // This tests PigStorage loader with records exectly
+ // on the boundary of the file blocks.
+ String[] inputs = {
+ "abcdefgh1", "abcdefgh2", "abcdefgh3",
+ "abcdefgh4", "abcdefgh5", "abcdefgh6",
+ "abcdefgh7", "abcdefgh8", "abcdefgh9"
+ };
+
+ String[] expected = {
+ "(abcdefgh1)", "(abcdefgh2)", "(abcdefgh3)",
+ "(abcdefgh4)", "(abcdefgh5)", "(abcdefgh6)",
+ "(abcdefgh7)", "(abcdefgh8)", "(abcdefgh9)"
+ };
+
+ System.setProperty("pig.overrideBlockSize", "20");
+
+ String INPUT_FILE = "tmp.txt";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ for (String s : inputs) {
+ w.println(s);
+ }
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ pigServer.registerQuery("a = load 'file:" + INPUT_FILE + "';");
+
+ Iterator<Tuple> iter = pigServer.openIterator("a");
+ int counter = 0;
+ while (iter.hasNext()){
+ assertEquals(expected[counter++].toString(), iter.next().toString());
+ }
+
+ assertEquals(expected.length, counter);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+}
Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSecondarySort.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSecondarySort.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSecondarySort.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Before;
+
+public class TestSecondarySort extends TestCase {
+ MiniCluster cluster = MiniCluster.buildCluster();
+ private PigServer pigServer;
+
+ static PigContext pc;
+ static{
+ pc = new PigContext(ExecType.MAPREDUCE,MiniCluster.buildCluster().getProperties());
+ try {
+ pc.connect();
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception{
+ FileLocalizer.setR(new Random());
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+ public void testDistinctOptimization1() throws Exception{
+ // Limit in the foreach plan
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = LOAD 'input2' AS (b0, b1, b2);");
+ planTester.buildPlan("C = cogroup A by a0, B by b0;");
+ planTester.buildPlan("D = foreach C { E = limit A 10; F = E.a1; G = DISTINCT F; generate group, COUNT(G);};");
+
+ LogicalPlan lp = planTester.buildPlan("store D into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==0);
+ assertTrue(so.getDistinctChanged()==1);
+ }
+
+ public void testDistinctOptimization2() throws Exception{
+ // Distinct on one entire input
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan("C = foreach B { D = distinct A; generate group, D;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==0);
+ assertTrue(so.getDistinctChanged()==1);
+ }
+
+ public void testDistinctOptimization3() throws Exception{
+ // Distinct on the prefix of main sort key
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan("C = foreach B { D = A.a0; E = distinct D; generate group, E;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==0);
+ assertTrue(so.getNumSortRemoved()==0);
+ assertTrue(so.getDistinctChanged()==1);
+ }
+
+ public void testDistinctOptimization4() throws Exception{
+ // Distinct on secondary key again, should remove
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan("C = foreach B { D = A.a1; E = distinct D; F = distinct E; generate group, F;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==0);
+ assertTrue(so.getDistinctChanged()==2);
+ }
+
+ public void testDistinctOptimization5() throws Exception{
+ // Filter in foreach plan
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan("C = foreach B { D = A.a1; E = distinct D; F = filter E by $0=='1'; generate group, F;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==0);
+ assertTrue(so.getDistinctChanged()==1);
+ }
+
+ public void testDistinctOptimization6() throws Exception{
+ // group by * with no schema, and distinct key is not part of main key
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1';");
+ planTester.buildPlan("B = group A by *;");
+ planTester.buildPlan("C = foreach B { D = limit A 10; E = D.$1; F = DISTINCT E; generate group, COUNT(F);};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==0);
+ assertTrue(so.getDistinctChanged()==1);
+ }
+
+ public void testDistinctOptimization7() throws Exception{
+ // group by * with no schema, distinct key is more specific than the main key
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1';");
+ planTester.buildPlan("B = group A by *;");
+ planTester.buildPlan("C = foreach B { D = limit A 10; E = D.$0; F = DISTINCT E; generate group, COUNT(F);};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==0);
+ assertTrue(so.getDistinctChanged()==1);
+ }
+
+ public void testDistinctOptimization8() throws Exception{
+ // local arrange plan is an expression
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0+$1;");
+ planTester.buildPlan("C = foreach B { D = limit A 10; E = D.$0; F = DISTINCT E; generate group, COUNT(F);};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==0);
+ assertTrue(so.getDistinctChanged()==1);
+ }
+
+ public void testDistinctOptimization9() throws Exception{
+ // local arrange plan is nested project
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' as (a:tuple(a0:int, a1:chararray));");
+ planTester.buildPlan("B = group A by a.a1;");
+ planTester.buildPlan("C = foreach B { D = A.a; E = DISTINCT D; generate group, COUNT(E);};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==0);
+ assertTrue(so.getDistinctChanged()==1);
+ }
+
+ public void testSortOptimization1() throws Exception{
+ // Sort on something other than the main key
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan("C = foreach B { D = limit A 10; E = order D by $1; generate group, E;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==1);
+ assertTrue(so.getDistinctChanged()==0);
+ }
+
+ public void testSortOptimization2() throws Exception{
+ // Sort on the prefix of the main key
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan("C = foreach B { D = limit A 10; E = order D by $0; generate group, E;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==0);
+ assertTrue(so.getNumSortRemoved()==1);
+ assertTrue(so.getDistinctChanged()==0);
+ }
+
+ public void testSortOptimization3() throws Exception{
+ // Sort on the main key prefix / non main key prefix mixed
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan("C = foreach B { D = limit A 10; E = order D by $1; F = order E by $0; generate group, F;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==2);
+ assertTrue(so.getDistinctChanged()==0);
+ }
+
+ public void testSortOptimization4() throws Exception{
+ // Sort on the main key again
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan("C = foreach B { D = limit A 10; E = order D by $0, $1, $2; generate group, E;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==1);
+ assertTrue(so.getDistinctChanged()==0);
+ }
+
+ public void testSortOptimization5() throws Exception{
+ // Sort on the two keys, we can only take off 1
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan("C = foreach B { D = limit A 10; E = order D by $1; F = order E by $2; generate group, F;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==1);
+ assertTrue(so.getDistinctChanged()==0);
+ }
+
+ public void testSortOptimization6() throws Exception{
+ // Sort desc
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by $0;");
+ planTester.buildPlan("C = foreach B { D = order A by $0 desc; generate group, D;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==1);
+ assertTrue(so.getDistinctChanged()==0);
+ }
+
+ public void testSortOptimization7() throws Exception{
+ // Sort asc on 1st key, desc on 2nd key
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+ planTester.buildPlan("B = group A by ($0, $1);");
+ planTester.buildPlan("C = foreach B { D = order A by $0, $1 desc; generate group, D;};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==1);
+ assertTrue(so.getDistinctChanged()==0);
+ }
+
+ public void testNestedDistinctEndToEnd1() throws Exception{
+ File tmpFile1 = File.createTempFile("test", "txt");
+ PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+ ps1.println("1\t2\t3");
+ ps1.println("1\t3\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("2\t3\t4");
+ ps1.close();
+
+ File tmpFile2 = File.createTempFile("test", "txt");
+ PrintStream ps2 = new PrintStream(new FileOutputStream(tmpFile2));
+ ps2.println("1\t4\t4");
+ ps2.println("2\t3\t1");
+ ps2.close();
+ Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(),
+ tmpFile1.getCanonicalPath());
+ Util.copyFromLocalToCluster(cluster, tmpFile2.getCanonicalPath(),
+ tmpFile2.getCanonicalPath());
+
+ pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = LOAD '" + tmpFile2.getCanonicalPath() + "' AS (b0, b1, b2);");
+ pigServer.registerQuery("C = cogroup A by a0, B by b0 parallel 2;");
+ pigServer.registerQuery("D = foreach C { E = limit A 10; F = E.a1; G = DISTINCT F; generate group, COUNT(G);};");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+ assertTrue(iter.hasNext());
+ assertTrue(iter.next().toString().equals("(1,2L)"));
+ assertTrue(iter.hasNext());
+ assertTrue(iter.next().toString().equals("(2,1L)"));
+ assertFalse(iter.hasNext());
+ Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
+ Util.deleteFile(cluster, tmpFile2.getCanonicalPath());
+ }
+
+ public void testNestedDistinctEndToEnd2() throws Exception{
+ File tmpFile1 = File.createTempFile("test", "txt");
+ PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+ ps1.println("1\t2\t3");
+ ps1.println("1\t3\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("2\t3\t4");
+ ps1.close();
+ Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(),
+ tmpFile1.getCanonicalPath());
+ pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = group A by $0 parallel 2;");
+ pigServer.registerQuery("C = foreach B { D = distinct A; generate group, D;};");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ assertTrue(iter.hasNext());
+ assertTrue(iter.next().toString().equals("(1,{(1,2,3),(1,2,4),(1,3,4)})"));
+ assertTrue(iter.hasNext());
+ assertTrue(iter.next().toString().equals("(2,{(2,3,4)})"));
+ assertFalse(iter.hasNext());
+ Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
+ }
+
+ public void testNestedSortEndToEnd1() throws Exception{
+ File tmpFile1 = File.createTempFile("test", "txt");
+ PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+ ps1.println("1\t2\t3");
+ ps1.println("1\t3\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("2\t3\t4");
+ ps1.close();
+ Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(),
+ tmpFile1.getCanonicalPath());
+ pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = group A by $0 parallel 2;");
+ pigServer.registerQuery("C = foreach B { D = limit A 10; E = order D by $1; generate group, E;};");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ assertTrue(iter.hasNext());
+ assertTrue(iter.next().toString().equals("(1,{(1,2,3),(1,2,4),(1,2,4),(1,2,4),(1,3,4)})"));
+ assertTrue(iter.hasNext());
+ assertTrue(iter.next().toString().equals("(2,{(2,3,4)})"));
+ assertFalse(iter.hasNext());
+ Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
+ }
+
+ public void testNestedSortEndToEnd2() throws Exception{
+ File tmpFile1 = File.createTempFile("test", "txt");
+ PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+ ps1.println("1\t2\t3");
+ ps1.println("1\t3\t4");
+ ps1.println("1\t4\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("1\t8\t4");
+ ps1.println("2\t3\t4");
+ ps1.close();
+ Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(),
+ tmpFile1.getCanonicalPath());
+ pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
+ pigServer.registerQuery("B = group A by $0 parallel 2;");
+ pigServer.registerQuery("C = foreach B { D = order A by a1 desc; generate group, D;};");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ assertTrue(iter.hasNext());
+ assertTrue(iter.next().toString().equals("(1,{(1,8,4),(1,4,4),(1,3,4),(1,2,3),(1,2,4)})"));
+ assertTrue(iter.hasNext());
+ assertTrue(iter.next().toString().equals("(2,{(2,3,4)})"));
+ assertFalse(iter.hasNext());
+ Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
+ }
+
+ public void testNestedSortMultiQueryEndToEnd1() throws Exception{
+ pigServer.setBatchOn();
+ Util.copyFromLocalToCluster(cluster,
+ "test/org/apache/pig/test/data/passwd",
+ "testNestedSortMultiQueryEndToEnd1-input.txt");
+ pigServer.registerQuery("a = load 'testNestedSortMultiQueryEndToEnd1-input.txt'" +
+ " using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ pigServer.registerQuery("b = group a by uname parallel 2;");
+ pigServer.registerQuery("c = group a by gid parallel 2;");
+ pigServer.registerQuery("d = foreach b generate SUM(a.gid);");
+ pigServer.registerQuery("e = foreach c { f = order a by uid; generate group, f; };");
+ pigServer.registerQuery("store d into '/tmp/output1';");
+ pigServer.registerQuery("store e into '/tmp/output2';");
+
+ List<ExecJob> jobs = pigServer.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+ FileLocalizer.delete("/tmp/output1", pigServer.getPigContext());
+ FileLocalizer.delete("/tmp/output2", pigServer.getPigContext());
+ Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd1-input.txt");
+ }
+}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSkewedJoin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSkewedJoin.java Tue Nov 24 19:54:19 2009
@@ -138,7 +138,6 @@
}
-
public void testSkewedJoinWithGroup() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
@@ -290,17 +289,18 @@
lineCount[key][i] ++;
}
}
+
+ int fc = 0;
for(int i=0; i<3; i++) {
- int fc = 0;
for(int j=0; j<7; j++) {
- if (lineCount[i][j] > 0) {
+ if (lineCount[i][j] > 0) {
fc ++;
}
}
- // all three keys are skewed keys,
- // check each key should appear in more than 1 part- file
- assertTrue(fc > 1);
}
+ // atleast one key should be a skewed key
+ // check atleast one key should appear in more than 1 part- file
+ assertTrue(fc > 3);
}
public void testSkewedJoinNullKeys() throws IOException {
@@ -324,4 +324,72 @@
return;
}
+ public void testSkewedJoinOuter() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as (id,name);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);");
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by id left, B by id using \"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by id right, B by id using \"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by id full, B by id using \"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ } catch(Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ fail("Should support outer join in skewed join");
+ }
+ return;
+ }
+
+ // pig 1048
+ public void testSkewedJoinOneValue() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE3 + "' as (id,name);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE3 + "' as (id,name);");
+ // Filter key with a single value
+
+ pigServer.registerQuery("C = FILTER A by id == 400;");
+ pigServer.registerQuery("D = FILTER B by id == 400;");
+
+
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("E = join C by id, D by id using \"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("E = join C by id, D by id;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbrj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbfrj.size(), dbrj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
+
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java Tue Nov 24 19:54:19 2009
@@ -17,16 +17,12 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.assertEquals;
-
import java.util.*;
import org.apache.pig.ExecType;
-import java.io.File;
import java.io.BufferedReader;
import java.io.FileReader;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -35,13 +31,11 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.FuncSpec;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTuple;
-import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.PigServer;
@@ -52,11 +46,8 @@
import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.TestHelper;
@@ -65,10 +56,7 @@
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
-import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.DataStorageException;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java Tue Nov 24 19:54:19 2009
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.ArrayList;
+import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.pig.EvalFunc;
@@ -49,7 +50,18 @@
public class TestTypeCheckingValidator extends TestCase {
- LogicalPlanTester planTester = new LogicalPlanTester() ;
+ LogicalPlanTester planTester;
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#setUp()
+ */
+ @Override
+ protected void setUp() throws Exception {
+ // create a new instance of the plan tester
+ // for each test so that different tests do not
+ // interact with each other's plans
+ planTester = new LogicalPlanTester() ;
+ }
private static final String simpleEchoStreamingCommand;
static {
@@ -3287,77 +3299,19 @@
}
@Test
- public void testCogroupStarLineageNoSchema() throws Throwable {
- planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
- planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
- planTester.buildPlan("c = cogroup a by *, b by * ;") ;
- planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
- LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, $1 + 1, $2 + 2.0;") ;
-
- // validate
- CompilationMessageCollector collector = new CompilationMessageCollector() ;
- TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
- try {
- typeValidator.validate(plan, collector) ;
- }
- catch (PlanValidationException pve) {
- //not good
- }
-
- printMessageCollector(collector) ;
- printTypeGraph(plan) ;
- planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
-
- if (collector.hasError()) {
- throw new AssertionError("Expect no error") ;
- }
-
-
- LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
- LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
-
- LogicalOperator exOp = foreachPlan.getRoots().get(0);
-
- if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
-
- LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
- assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("BinStorage"));
-
- foreachPlan = foreach.getForEachPlans().get(2);
- exOp = foreachPlan.getRoots().get(0);
- if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
- cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
- assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
-
- }
-
- @Test
public void testCogroupStarLineageNoSchemaFail() throws Throwable {
planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
- planTester.buildPlan("c = cogroup a by *, b by * ;") ;
- planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
- LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, $1 + 1, $2 + 2.0;") ;
-
- // validate
- CompilationMessageCollector collector = new CompilationMessageCollector() ;
- TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ boolean exceptionThrown = false;
try {
- typeValidator.validate(plan, collector) ;
- fail("Exception expected") ;
- }
- catch (PlanValidationException pve) {
- //not good
- }
-
- printMessageCollector(collector) ;
- printTypeGraph(plan) ;
- planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
-
- if (!collector.hasError()) {
- throw new AssertionError("Expect error") ;
+ LogicalPlan lp = planTester.buildPlan("c = cogroup a by *, b by *;");
+ } catch(AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+ "the input has a schema"));
+ exceptionThrown = true;
}
-
+ assertTrue(exceptionThrown);
+
}
@Test
Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestUDFContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestUDFContext.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestUDFContext.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestUDFContext.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+
+public class TestUDFContext extends TestCase {
+
+ static MiniCluster cluster = null;
+
+ @Override
+ protected void setUp() throws Exception {
+ cluster = MiniCluster.buildCluster();
+ }
+
+
+ @Test
+ public void testUDFContext() throws Exception {
+ Util.createInputFile(cluster, "a.txt", new String[] { "dumb" });
+ Util.createInputFile(cluster, "b.txt", new String[] { "dumber" });
+ FileLocalizer.deleteTempFiles();
+ PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ String[] statement = { "A = LOAD 'a.txt' USING org.apache.pig.test.utils.UDFContextTestLoader('joe');",
+ "B = LOAD 'b.txt' USING org.apache.pig.test.utils.UDFContextTestLoader('jane');",
+ "C = union A, B;",
+ "D = FOREACH C GENERATE $0, $1, org.apache.pig.test.utils.UDFContextTestEvalFunc($0), org.apache.pig.test.utils.UDFContextTestEvalFunc2($0);" };
+
+ File tmpFile = File.createTempFile("temp_jira_851", ".pig");
+ FileWriter writer = new FileWriter(tmpFile);
+ for (String line : statement) {
+ writer.write(line + "\n");
+ }
+ writer.close();
+
+ pig.registerScript(tmpFile.getAbsolutePath());
+ Iterator<Tuple> iterator = pig.openIterator("D");
+ while (iterator.hasNext()) {
+ Tuple tuple = iterator.next();
+ if ("dumb".equals(tuple.get(0).toString())) {
+ assertEquals(tuple.get(1).toString(), "joe");
+ } else if ("dumber".equals(tuple.get(0).toString())) {
+ assertEquals(tuple.get(1).toString(), "jane");
+ }
+ assertEquals(Integer.valueOf(tuple.get(2).toString()), new Integer(5));
+ assertEquals(tuple.get(3).toString(), "five");
+ }
+ }
+}