You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/03 18:58:21 UTC
svn commit: r1099123 [6/16] - in /pig/branches/branch-0.9: ./
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/impl/ src/org/apache/pig/impl/logi...
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java Tue May 3 16:58:19 2011
@@ -18,7 +18,6 @@
package org.apache.pig.test;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -27,14 +26,15 @@ import java.util.Set;
import org.apache.pig.ExecType;
import org.apache.pig.FilterFunc;
+import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.test.utils.Identity;
-import org.apache.pig.test.utils.LogicalPlanTester;
-import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
-import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
@@ -42,13 +42,9 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
-import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
-import org.apache.pig.newplan.optimizer.PlanOptimizer;
-import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.OptimizerUtils;
import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
-import org.apache.pig.newplan.logical.rules.TypeCastInserter;
import org.junit.Assert;
import org.junit.Test;
@@ -59,11 +55,9 @@ import org.junit.Before;
*/
public class TestNewPlanPushDownForeachFlatten {
PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
- LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
@Before
public void tearDown() {
- planTester.reset();
}
/**
@@ -84,9 +78,7 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testErrorEmptyInput() throws Exception {
- org.apache.pig.impl.logicalLayer.LogicalPlan lp =
- new org.apache.pig.impl.logicalLayer.LogicalPlan();
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( "" );
Assert.assertTrue( newLogicalPlan.getOperators().hasNext() == false );
}
@@ -96,22 +88,23 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testErrorNonForeachInput() throws Exception {
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "store A into 'output';";
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
List<Operator> nexts = newLogicalPlan.getSuccessors( load );
- Assert.assertTrue( nexts == null || nexts.size() == 0 );
+ Assert.assertTrue( nexts != null && nexts.size() == 1 );
}
@Test
public void testForeachNoFlatten() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, $2;");
- planTester.buildPlan("C = order B by $0, $1;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan( "D = store C into 'dummy';" );
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, $2;" +
+ "C = order B by $0, $1;" +
+ "D = store C into 'dummy';";
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -125,9 +118,10 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachNoSuccessors() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("B = foreach A generate flatten($1);");
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate flatten($1);" +
+ "Store B into 'output';";
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -137,10 +131,11 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachStreaming() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate flatten($1);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = stream B through `" + "pc -l" + "`;");
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate flatten($1);" +
+ "C = stream B through `" + "pc -l" + "`;" +
+ "Store C into 'output';";
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -150,11 +145,12 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachDistinct() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate flatten($1);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = distinct B;");
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate flatten($1);" +
+ "C = distinct B;" +
+ "store C into 'output';";
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -164,11 +160,12 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachForeach() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = foreach B generate $0;");
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, flatten(1);" +
+ "C = foreach B generate $0;" +
+ "store C into 'output';";
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -185,11 +182,12 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachFilter() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = filter B by $1 < 18;");
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = filter B by $1 < 18;" +
+ "store C into 'output';";
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -202,11 +200,13 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachSplitOutput() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("split B into C if $1 < 18, D if $1 >= 18;");
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "split B into C if $1 < 18, D if $1 >= 18;" +
+ "store C into 'output1';" +
+ "store D into 'output2';";
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -219,11 +219,12 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachLimit() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("B = limit B 10;");
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = limit B 10;" +
+ "store C into 'output';";
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -236,12 +237,13 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachUnion() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("D = union B, C;");
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = load 'anotherfile' as (name, age, preference);" +
+ "D = union B, C;" +
+ "store D into 'output';";
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -262,12 +264,13 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachCogroup() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("D = cogroup B by $0, C by $0;");
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = load 'anotherfile' as (name, age, preference);" +
+ "D = cogroup B by $0, C by $0;" +
+ "store D into 'output';";
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -288,11 +291,12 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachGroupBy() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = group B by $0;");
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = group B by $0;" +
+ "store C into 'output';";
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -305,11 +309,11 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachSort() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- planTester.buildPlan("C = order B by $0, $1;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan( "D = store C into 'dummy';" );
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = order B by $0, $1;" +
+ "D = store C into 'dummy';";
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -326,11 +330,11 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachSortNegative1() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0 + 5, $1, flatten($2);");
- planTester.buildPlan("C = order B by $0, $1;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan( "D = store C into 'dummy';" );
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0 + 5, $1, flatten($2);" +
+ "C = order B by $0, $1;" +
+ "D = store C into 'dummy';";
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -348,11 +352,11 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachSortNegative2() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:tuple(x,y));");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- planTester.buildPlan("C = order B by $0, $3;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan( "D = store C into 'dummy';" );
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:tuple(x,y));" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = order B by $0, $3;" +
+ "D = store C into 'dummy';";
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -366,10 +370,12 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachFlattenAddedColumnSort() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, flatten(1);" +
+ "C = order B by $0, $1;" +
+ "store C into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -383,11 +389,12 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachUDFSort() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate $0, $1, " + Identity.class.getName() + "($2) ;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, $1, " + Identity.class.getName() + "($2) ;" +
+ "C = order B by $0, $1;" +
+ "store C into 'output';";
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -401,11 +408,12 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachCastSort() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
- planTester.buildPlan("B = foreach A generate (chararray)$0, $1, flatten($2);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate (chararray)$0, $1, flatten($2);" +
+ "C = order B by $0, $1;" +
+ "store C into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -419,13 +427,14 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachCross() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
- planTester.buildPlan("D = cross B, C;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = load 'anotherfile' as (name, age, preference);" +
+ "D = cross B, C;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -453,13 +462,14 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachCross1() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
- planTester.buildPlan("D = cross A, C;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "C = foreach B generate $0, $1, flatten($2);" +
+ "D = cross A, C;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -490,17 +500,21 @@ public class TestNewPlanPushDownForeachF
// A new rule should optimize this case
@Test
public void testForeachCross2() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
- planTester.buildPlan("E = cross B, D;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = foreach C generate $0, $1, flatten($2);" +
+ "E = cross B, D;" +
+ "F = limit E 10;" +
+ "store F into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
// No optimization about foreach flatten.
- Assert.assertTrue( newLogicalPlan.getPredecessors( newLogicalPlan.getSinks().get( 0 ) ).get( 0 ) instanceof LOCross );
+ Operator store = newLogicalPlan.getSinks().get( 0 );
+ Operator limit = newLogicalPlan.getPredecessors(store).get(0);
+ Operator cross = newLogicalPlan.getPredecessors(limit).get(0);
+ Assert.assertTrue( cross instanceof LOCross );
}
/**
@@ -509,13 +523,14 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachFlattenAddedColumnCross() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = cross B, C;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, $1, flatten(1);" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = cross B, C;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -546,13 +561,14 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachUDFCross() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = cross B, C;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = cross B, C;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -583,13 +599,14 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachCastCross() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten( $2 );");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = cross B, C;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, (int)$1, flatten( $2 );" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = cross B, C;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -617,13 +634,15 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachFRJoin() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
- planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = load 'anotherfile' as (name, age, preference);" +
+ "D = join B by $0, C by $0 using 'replicated';" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -651,13 +670,14 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachFRJoin1() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
- planTester.buildPlan("D = join A by $0, C by $0 using \"replicated\";");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "C = foreach B generate $0, $1, flatten($2);" +
+ "D = join A by $0, C by $0 using 'replicated';" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -688,17 +708,21 @@ public class TestNewPlanPushDownForeachF
// A new rule should optimize this case
@Test
public void testForeachFRJoin2() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
- planTester.buildPlan("E = join B by $0, D by $0 using \"replicated\";");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = foreach C generate $0, $1, flatten($2);" +
+ "E = join B by $0, D by $0 using 'replicated';" +
+ "F = limit E 10;" +
+ "store F into 'output';";
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
// No optimization about foreach flatten.
- Assert.assertTrue( newLogicalPlan.getPredecessors( newLogicalPlan.getSinks().get( 0 ) ).get( 0 ) instanceof LOJoin );
+ Operator store = newLogicalPlan.getSinks().get( 0 );
+ Operator limit = newLogicalPlan.getPredecessors( store ).get( 0 );
+ Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
+ Assert.assertTrue( join instanceof LOJoin );
}
/**
@@ -707,13 +731,14 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachFlattenAddedColumnFRJoin() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, $1, flatten(1);" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = join B by $0, C by $0 using 'replicated';" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -745,13 +770,14 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachUDFFRJoin() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = join B by $0, C by $0 using 'replicated';" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -783,13 +809,14 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachCastFRJoin() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten($2);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, (int)$1, flatten($2);" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = join B by $0, C by $0 using 'replicated';" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -817,13 +844,14 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachInnerJoin() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = join B by $0, C by $0;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = join B by $0, C by $0;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -851,13 +879,14 @@ public class TestNewPlanPushDownForeachF
@Test
public void testForeachInnerJoin1() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
- planTester.buildPlan("D = join A by $0, C by $0;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "C = foreach B generate $0, $1, flatten($2);" +
+ "D = join A by $0, C by $0;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -888,17 +917,21 @@ public class TestNewPlanPushDownForeachF
// A new rule should optimize this case
@Test
public void testForeachInnerJoin2() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
- planTester.buildPlan("E = join B by $0, D by $0;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, $1, flatten($2);" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = foreach C generate $0, $1, flatten($2);" +
+ "E = join B by $0, D by $0;" +
+ "F = limit E 10;" +
+ "store F into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
// No optimization about foreach flatten.
- Assert.assertTrue( newLogicalPlan.getPredecessors( newLogicalPlan.getSinks().get( 0 ) ).get( 0 ) instanceof LOJoin );
+ Operator store = newLogicalPlan.getSinks().get( 0 );
+ Operator limit = newLogicalPlan.getPredecessors( store ).get( 0 );
+ Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
+ Assert.assertTrue( join instanceof LOJoin );
}
/**
@@ -907,17 +940,14 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachFlattenAddedColumnInnerJoin() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = join B by $0, C by $0;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- planTester.setPlan(lp);
- planTester.setProjectionMap(lp);
- planTester.rebuildSchema(lp);
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, $1, flatten(1);" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = join B by $0, C by $0;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -948,13 +978,14 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachUDFInnerJoin() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = join B by $0, C by $0;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = join B by $0, C by $0;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -985,13 +1016,14 @@ public class TestNewPlanPushDownForeachF
*/
@Test
public void testForeachCastInnerJoin() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
- planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten($2);");
- planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
- planTester.buildPlan("D = join B by $0, C by $0;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+ "B = foreach A generate $0, (int)$1, flatten($2);" +
+ "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+ "D = join B by $0, C by $0;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
@@ -1020,26 +1052,31 @@ public class TestNewPlanPushDownForeachF
// See PIG-1172
@Test
public void testForeachJoinRequiredField() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});");
- planTester.buildPlan("B = FOREACH A generate flatten($0);");
- planTester.buildPlan("C = load '3.txt' AS (c0, c1);");
- planTester.buildPlan("D = JOIN B by a1, C by c1;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});" +
+ "B = FOREACH A generate flatten($0);" +
+ "C = load '3.txt' AS (c0, c1);" +
+ "D = JOIN B by a1, C by c1;" +
+ "E = limit D 10;" +
+ "store E into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
// No optimization about foreach flatten.
- Assert.assertTrue( newLogicalPlan.getPredecessors( newLogicalPlan.getSinks().get( 0 ) ).get( 0 ) instanceof LOJoin );
+ Operator store = newLogicalPlan.getSinks().get( 0 );
+ Operator limit = newLogicalPlan.getPredecessors( store ).get( 0 );
+ Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
+ Assert.assertTrue( join instanceof LOJoin );
}
// See PIG-1374
@Test
public void testForeachRequiredField() throws Exception {
- planTester.buildPlan("A = load 'myfile' as (b{t(a0:chararray,a1:int)});");
- planTester.buildPlan("B = foreach A generate flatten($0);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = order B by $1 desc;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "A = load 'myfile' as (b:bag{t:tuple(a0:chararray,a1:int)});" +
+ "B = foreach A generate flatten($0);" +
+ "C = order B by $1 desc;" +
+ "store C into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
@@ -1054,14 +1091,16 @@ public class TestNewPlanPushDownForeachF
// See PIG-1706
@Test
public void testForeachWithUserDefinedSchema() throws Exception {
- planTester.buildPlan("a = load '1.txt' as (a0:int, a1, a2:bag{t:(i1:int, i2:int)});");
- planTester.buildPlan("b = load '2.txt' as (b0:int, b1);");
- planTester.buildPlan("c = foreach a generate a0, flatten(a2) as (q1, q2);");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("d = join c by a0, b by b0;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "a = load '1.txt' as (a0:int, a1, a2:bag{t:(i1:int, i2:int)});" +
+ "b = load '2.txt' as (b0:int, b1);" +
+ "c = foreach a generate a0, flatten(a2) as (q1, q2);" +
+ "d = join c by a0, b by b0;" +
+ "store d into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
- LOForEach foreach = (LOForEach)newLogicalPlan.getSinks().get( 0 );
+ Operator store = newLogicalPlan.getSinks().get( 0 );
+ LOForEach foreach = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
Assert.assertTrue(foreach.getSchema().getField(1).alias.equals("q1"));
Assert.assertTrue(foreach.getSchema().getField(2).alias.equals("q2"));
}
@@ -1069,14 +1108,16 @@ public class TestNewPlanPushDownForeachF
// See PIG-1751
@Test
public void testForeachWithUserDefinedSchema2() throws Exception {
- planTester.buildPlan("a = load '1.txt' as (a0:chararray);");
- planTester.buildPlan("b = load '2.txt' as (b0:chararray);");
- planTester.buildPlan("c = foreach b generate flatten(STRSPLIT(b0)) as c0;");
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("d = join c by (chararray)c0, a by a0;");
-
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+ String query = "a = load '1.txt' as (a0:chararray);" +
+ "b = load '2.txt' as (b0:chararray);" +
+ "c = foreach b generate flatten(STRSPLIT(b0)) as c0;" +
+ "d = join c by (chararray)c0, a by a0;" +
+ "store d into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
- Operator op = newLogicalPlan.getSinks().get( 0 );
+ Operator store = newLogicalPlan.getSinks().get( 0 );
+ Operator op = newLogicalPlan.getPredecessors(store).get(0);
Assert.assertTrue(op instanceof LOJoin);
}
@@ -1103,18 +1144,13 @@ public class TestNewPlanPushDownForeachF
}
}
- private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws IOException {
- LogicalPlan newLogicalPlan = migratePlan( plan );
+ private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
+ PigServer pigServer = new PigServer( pc );
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
return newLogicalPlan;
}
- private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
- LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);
- visitor.visit();
- org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
- return newPlan;
- }
}
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestOptimizeLimit.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestOptimizeLimit.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestOptimizeLimit.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestOptimizeLimit.java Tue May 3 16:58:19 2011
@@ -23,7 +23,7 @@ import java.io.IOException;
import java.util.*;
import org.apache.pig.ExecType;
-import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.PigServer;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
@@ -31,17 +31,13 @@ import org.apache.pig.newplan.logical.ru
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.optimizer.OpLimitOptimizer;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.optimizer.OptimizerException;
-import org.apache.pig.test.TestLogicalOptimizer.LogicalOptimizerDerivative;
-import org.apache.pig.test.utils.LogicalPlanTester;
import junit.framework.Assert;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
public class TestOptimizeLimit {
@@ -49,15 +45,15 @@ public class TestOptimizeLimit {
static final int MAX_SIZE = 100000;
PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
- LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
+ PigServer pigServer;
- @BeforeClass
- public static void setup() {
-
+ @Before
+ public void setup() throws ExecException {
+ pigServer = new PigServer( pc );
}
- @AfterClass
- public static void tearDown() {
+ @After
+ public void tearDown() {
}
@@ -87,149 +83,123 @@ public class TestOptimizeLimit {
@Test
// Merget limit into sort
public void testOPLimit1Optimizer() throws Exception {
- planTester.buildPlan("A = load 'myfile';");
- planTester.buildPlan("B = order A by $0;");
- planTester.buildPlan("C = limit B 100;");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+ String query = "A = load 'myfile';" +
+ "B = order A by $0;" +
+ "C = limit B 100;" +
+ "store C into 'empty';";
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan1.dot");
}
@Test
// Merge limit into limit
public void testOPLimit2Optimizer() throws Exception {
- planTester.buildPlan("A = load 'myfile';");
- planTester.buildPlan("B = limit A 10;");
- planTester.buildPlan("C = limit B 100;");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+ String query = "A = load 'myfile';" +
+ "B = limit A 10;" +
+ "C = limit B 100;" + "store C into 'empty';";
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan2.dot");
}
@Test
// Duplicate limit with two inputs
public void testOPLimit3Optimizer() throws Exception {
- planTester.buildPlan("A = load 'myfile1';");
- planTester.buildPlan("B = load 'myfile2';");
- planTester.buildPlan("C = cross A, B;");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("D = limit C 100;");
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+ String query = "A = load 'myfile1';" +
+ "B = load 'myfile2';" +
+ "C = cross A, B;" +
+ "D = limit C 100;" + "store D into 'empty';";
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan3.dot");
}
@Test
// Duplicte limit with one input
public void testOPLimit4Optimizer() throws Exception {
- planTester.buildPlan("A = load 'myfile1';");
- planTester.buildPlan("B = group A by $0;");
- planTester.buildPlan("C = foreach B generate flatten(A);");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("D = limit C 100;");
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+ String query = "A = load 'myfile1';" +
+ "B = group A by $0;" + "C = foreach B generate flatten(A);" + "D = limit C 100;" +
+ "store D into 'empty';";
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan4.dot");
}
@Test
// Move limit up
public void testOPLimit5Optimizer() throws Exception {
- planTester.buildPlan("A = load 'myfile1';");
- planTester.buildPlan("B = foreach A generate $0;");
- planTester.buildPlan("C = limit B 100;");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+ String query = "A = load 'myfile1';" +
+ "B = foreach A generate $0;" +
+ "C = limit B 100;" + "store C into 'empty';" ;
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan5.dot");
}
@Test
// Multiple LOLimit
public void testOPLimit6Optimizer() throws Exception {
- planTester.buildPlan("A = load 'myfile';");
- planTester.buildPlan("B = limit A 50;");
- planTester.buildPlan("C = limit B 20;");
- planTester.buildPlan("D = limit C 100;");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store D into 'empty';" );
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+ String query = "A = load 'myfile';" +
+ "B = limit A 50;" +
+ "C = limit B 20;" +
+ "D = limit C 100;" + "store D into 'empty';";
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan6.dot");
}
@Test
// Limit stay the same for ForEach with a flatten
public void testOPLimit7Optimizer() throws Exception {
- planTester.buildPlan("A = load 'myfile1';");
- planTester.buildPlan("B = foreach A generate flatten($0);");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("C = limit B 100;");
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+ String query = "A = load 'myfile1';" +
+ "B = foreach A generate flatten($0);" +
+ "C = limit B 100;" + "store C into 'empty';";
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan7.dot");
}
@Test
//Limit in the local mode, need to make sure limit stays after a sort
public void testOPLimit8Optimizer() throws Exception {
- planTester.buildPlan("A = load 'myfile';");
- planTester.buildPlan("B = order A by $0;");
- planTester.buildPlan("C = limit B 10;");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
- compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan8.dot");
+ String query = "A = load 'myfile';" +
+ "B = order A by $0;" +
+ "C = limit B 10;" + "store C into 'empty';";
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
+ compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan8.dot");
}
@Test
public void testOPLimit9Optimizer() throws Exception {
- planTester.buildPlan("A = load 'myfile';");
- planTester.buildPlan("B = order A by $0;");
- planTester.buildPlan("C = limit B 10;");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
- compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan9.dot");
+ String query = "A = load 'myfile';" +
+ "B = order A by $0;" +
+ "C = limit B 10;" + "store C into 'empty';";
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
+ compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan9.dot");
}
@Test
//See bug PIG-913
public void testOPLimit10Optimizer() throws Exception {
- planTester.buildPlan("A = load 'myfile' AS (s:chararray);");
- planTester.buildPlan("B = limit A 100;");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("C = GROUP B by $0;");
- LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+ String query = "A = load 'myfile' AS (s:chararray);" +
+ "B = limit A 100;" + "C = GROUP B by $0;" + "store C into 'empty';";
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan10.dot");
}
- /**
- * Test that {@link OpLimitOptimizer} returns false on the check if
- * pre-conditions for pushing limit up are not met
- * @throws Exception
- */
- @Test
- public void testOpLimitOptimizerCheck() throws Exception {
- planTester.buildPlan("A = load 'myfile';");
- planTester.buildPlan("B = foreach A generate $0;");
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("C = limit B 100;");
- LogicalOptimizerDerivative optimizer = new LogicalOptimizerDerivative(plan);
- int numIterations = optimizer.optimize();
- Assert.assertFalse("Checking number of iterations of the optimizer [actual = "
- + numIterations + ", expected < " + optimizer.getMaxIterations() +
- "]", optimizer.getMaxIterations() == numIterations);
-
- }
-
- @Test
- //Test to ensure that the right exception is thrown
- public void testErrOpLimitOptimizer() throws Exception {
- org.apache.pig.impl.logicalLayer.LogicalPlan lp = new org.apache.pig.impl.logicalLayer.LogicalPlan();
- OpLimitOptimizer olo = new OpLimitOptimizer(lp);
- try {
- olo.transform(lp.getRoots());
- } catch(Exception e) {
- Assert.assertTrue(((OptimizerException)e).getErrorCode() == 2052);
- }
- }
-
@Test
//See bug PIG-995
//We shall throw no exception here
public void testOPLimit11Optimizer() throws Exception {
- org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("B = foreach (limit (order (load 'myfile' AS (a0, a1, a2)) by $1) 10) generate $0;");
- migrateAndOptimizePlan(plan);
+ String query = "B = foreach (limit (order (load 'myfile' AS (a0, a1, a2)) by $1) 10) generate $0;";
+ LogicalPlan plan = Util.buildLp(pigServer, query);
+ optimizePlan(plan);
}
@@ -258,17 +228,10 @@ public class TestOptimizeLimit {
}
}
- private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws IOException {
- LogicalPlan newLogicalPlan = migratePlan( plan );
- PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+ private LogicalPlan optimizePlan(LogicalPlan plan) throws IOException {
+ PlanOptimizer optimizer = new MyPlanOptimizer( plan, 3 );
optimizer.optimize();
- return newLogicalPlan;
+ return plan;
}
- private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
- LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);
- visitor.visit();
- org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
- return newPlan;
- }
}
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterOptimization.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterOptimization.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterOptimization.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterOptimization.java Tue May 3 16:58:19 2011
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.regex.Pattern;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.pig.ExecType;
-import org.apache.pig.Expression;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
-import org.apache.pig.PigServer;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOFilter;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.PColFilterExtractor;
-import org.apache.pig.impl.logicalLayer.PlanSetter;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.test.utils.LogicalPlanTester;
-import org.junit.Test;
-
-/**
- * unit tests to test extracting partition filter conditions out of the filter
- * condition in the filter following a load which talks to metadata system (.i.e.
- * implements {@link LoadMetadata})
- */
-public class TestPartitionFilterOptimization extends TestCase {
-
- PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
- LogicalPlanTester lpTester;
-
- @Override
- protected void setUp() throws Exception {
- lpTester = new LogicalPlanTester(pc);
- lpTester.buildPlan("a = load 'foo' as (srcid, mrkt, dstid, name, age);");
- }
-
- /**
- * test case where there is a single expression on partition columns in
- * the filter expression along with an expression on non partition column
- * @throws FrontendException
- */
- @Test
- public void testSimpleMixed() throws FrontendException {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by srcid == 10 and name == 'foo';");
- test(lp, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
- }
-
- /**
- * test case where filter does not contain any condition on partition cols
- * @throws Exception
- */
- @Test
- public void testNoPartFilter() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by age == 20 and name == 'foo';");
- test(lp, Arrays.asList("srcid"), null,
- "((age == 20) and (name == 'foo'))");
- }
-
- /**
- * test case where filter only contains condition on partition cols
- * @throws Exception
- */
- @Test
- public void testOnlyPartFilter1() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by srcid > 20 and mrkt == 'us';");
- test(lp, Arrays.asList("srcid", "mrkt"),
- "((srcid > 20) and (mrkt == 'us'))", null);
-
- }
-
- /**
- * test case where filter only contains condition on partition cols
- * @throws Exception
- */
- @Test
- public void testOnlyPartFilter2() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by mrkt == 'us';");
- test(lp, Arrays.asList("srcid", "mrkt"),
- "(mrkt == 'us')", null);
-
- }
-
- /**
- * test case where filter only contains condition on partition cols
- * @throws Exception
- */
- @Test
- public void testOnlyPartFilter3() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by srcid == 20 or mrkt == 'us';");
- test(lp, Arrays.asList("srcid", "mrkt"),
- "((srcid == 20) or (mrkt == 'us'))", null);
-
- }
-
- /**
- * test case where filter has both conditions on partition cols and non
- * partition cols and the filter condition will be split to extract the
- * conditions on partition columns
- */
- @Test
- public void testMixed1() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by " +
- "(age < 20 and mrkt == 'us') and (srcid == 10 and " +
- "name == 'foo');");
- test(lp, Arrays.asList("srcid", "mrkt"),
- "((mrkt == 'us') and (srcid == 10))",
- "((age < 20) and (name == 'foo'))");
- }
-
-
- /**
- * test case where filter has both conditions on partition cols and non
- * partition cols and the filter condition will be split to extract the
- * conditions on partition columns
- */
- @Test
- public void testMixed2() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by " +
- "(age >= 20 and mrkt == 'us') and (srcid == 10 and " +
- "dstid == 15);");
- test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
- "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))",
- "(age >= 20)");
- }
-
- /**
- * test case where filter has both conditions on partition cols and non
- * partition cols and the filter condition will be split to extract the
- * conditions on partition columns
- */
- @Test
- public void testMixed3() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by " +
- "age >= 20 and mrkt == 'us' and srcid == 10;");
- test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
- "((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
- }
-
- /**
- * test case where filter has both conditions on partition cols and non
- * partition cols and the filter condition will be split to extract the
- * conditions on partition columns - this testcase also has a condition
- * based on comparison of two partition columns
- */
- @Test
- public void testMixed4() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by " +
- "age >= 20 and mrkt == 'us' and name == 'foo' and " +
- "srcid == dstid;");
- test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
- "((mrkt == 'us') and (srcid == dstid))",
- "((age >= 20) and (name == 'foo'))");
- }
-
- /**
- * test case where filter has both conditions on partition cols and non
- * partition cols and the filter condition will be split to extract the
- * conditions on partition columns -
- * This testcase has two partition col conditions with OR + non parition
- * col conditions
- */
- @Test
- public void testMixed5() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by " +
- "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
- "dstid == 30;");
- test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
- "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))",
- "(name == 'foo')");
- }
-
- /**
- * test case where filter has both conditions on partition cols and non
- * partition cols and the filter condition will be split to extract the
- * conditions on partition columns -
- * This testcase has two partition col conditions with OR + non parition
- * col conditions
- */
- @Test
- public void testMixed6() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by " +
- "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';");
- test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
- "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))",
- "(name == 'foo')");
- }
- /**
- * test case where filter has both conditions on partition cols and non
- * partition cols and the filter condition will be split to extract the
- * conditions on partition columns. This testcase also tests arithmetic
- * in partition column conditions
- */
- @Test
- public void testMixedArith() throws Exception {
- LogicalPlan lp =
- lpTester.buildPlan("b = filter a by " +
- "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;");
- test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
- "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))",
- "(age != 15)");
- }
-
- @Test
- public void testNegPColConditionWithNonPCol() throws Exception {
- // use of partition column condition and non partition column in
- // same condition should fail
- LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
- "srcid > age;");
- negativeTest(lp, Arrays.asList("srcid"), 1111);
- lp = lpTester.buildPlan("b = filter a by " +
- "srcid + age == 20;");
- negativeTest(lp, Arrays.asList("srcid"), 1111);
-
- // OR of partition column condition and non partiton col condition
- // should fail
- lp = lpTester.buildPlan("b = filter a by " +
- "srcid > 10 or name == 'foo';");
- negativeTest(lp, Arrays.asList("srcid"), 1111);
- }
-
- @Test
- public void testNegPColInWrongPlaces() throws Exception {
-
- int expectedErrCode = 1112;
- LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
- "(srcid > 10 and name == 'foo') or dstid == 10;");
- negativeTest(lp, Arrays.asList("srcid", "dstid"), expectedErrCode);
-
- expectedErrCode = 1110;
- lp = lpTester.buildPlan("b = filter a by " +
- "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
- negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-
- lp = lpTester.buildPlan("b = filter a by " +
- "mrkt matches '.*us.*' and age < 15;");
- negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-
- lp = lpTester.buildPlan("b = filter a by " +
- "(int)mrkt == 10 and name matches '.*foo.*';");
- negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"),expectedErrCode);
-
- lp = lpTester.buildPlan("b = filter a by " +
- "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
- negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-
- lp = lpTester.buildPlan("b = filter a by " +
- "(mrkt is null) and name matches '.*foo.*';");
- negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-
- lp = lpTester.buildPlan("b = filter a by " +
- "(mrkt is not null) and name matches '.*foo.*';");
- negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
- }
-
-
- /**
- * Test that pig sends correct partition column names in setPartitionFilter
- * when the user has a schema in the load statement which renames partition
- * columns
- * @throws Exception
- */
- @Test
- public void testColNameMapping1() throws Exception {
- TestLoader.partFilter = null;
- lpTester.buildPlan("a = load 'foo' using "
- + TestLoader.class.getName() +
- "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
- "'srcid,mrkt') as (f1, f2, f3, f4, f5);");
- LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
- "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
- lpTester.typeCheckPlan(lp);
- lpTester.optimizePlan(lp);
- assertEquals("checking partition filter:",
- "((mrkt == 'us') and (srcid == 10))",
- TestLoader.partFilter.toString());
- LOFilter filter = (LOFilter) lp.getLeaves().get(0);
- String actual = PColFilterExtractor.getExpression(
- (ExpressionOperator) filter.getComparisonPlan().
- getLeaves().get(0)).
- toString().toLowerCase();
- assertEquals("checking trimmed filter expression:",
- "((f5 >= 20) and (f3 == 15))", actual);
- }
-
-
- /**
- * Test that pig sends correct partition column names in setPartitionFilter
- * when the user has a schema in the load statement which renames partition
- * columns - in this test case there is no condition on partition columns
- * - so setPartitionFilter() should not be called and the filter condition
- * should remain as is.
- * @throws Exception
- */
- @Test
- public void testColNameMapping2() throws Exception {
- TestLoader.partFilter = null;
- lpTester.buildPlan("a = load 'foo' using "
- + TestLoader.class.getName() +
- "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
- "'srcid') as (f1, f2, f3, f4, f5);");
- LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
- "f5 >= 20 and f2 == 'us' and f3 == 15;");
- lpTester.typeCheckPlan(lp);
- lpTester.optimizePlan(lp);
- assertEquals("checking partition filter:",
- null,
- TestLoader.partFilter);
- LOFilter filter = (LOFilter) lp.getLeaves().get(0);
- String actual = PColFilterExtractor.getExpression(
- (ExpressionOperator) filter.getComparisonPlan().
- getLeaves().get(0)).
- toString().toLowerCase();
- assertEquals("checking trimmed filter expression:",
- "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual);
- }
-
- /**
- * Test that pig sends correct partition column names in setPartitionFilter
- * when the user has a schema in the load statement which renames partition
- * columns - in this test case the filter only has conditions on partition
- * columns
- * @throws Exception
- */
- @Test
- public void testColNameMapping3() throws Exception {
- TestLoader.partFilter = null;
- lpTester.buildPlan("a = load 'foo' using "
- + TestLoader.class.getName() +
- "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
- "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);");
- LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
- "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);");
- lpTester.typeCheckPlan(lp);
- lpTester.optimizePlan(lp);
- assertEquals("checking partition filter:",
- "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
- "(dstid == 15)))",
- TestLoader.partFilter.toString());
- Iterator<LogicalOperator> it = lp.iterator();
- assertTrue("Checking that filter has been removed since it contained" +
- " only conditions on partition cols:",
- (it.next() instanceof LOLoad));
- assertFalse("Checking that filter has been removed since it contained" +
- " only conditions on partition cols:",
- it.hasNext());
-
- }
-
- /**
- * Test that pig sends correct partition column names in setPartitionFilter
- * when the user has a schema in the load statement which renames partition
- * columns - in this test case the schema in load statement is a prefix
- * (with columns renamed) of the schema returned by
- * {@link LoadMetadata#getSchema(String, Configuration)}
- * @throws Exception
- */
- @Test
- public void testColNameMapping4() throws Exception {
- TestLoader.partFilter = null;
- lpTester.buildPlan("a = load 'foo' using "
- + TestLoader.class.getName() +
- "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
- "'srcid,mrkt') as (f1, f2, f3);");
- LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
- "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
- lpTester.typeCheckPlan(lp);
- lpTester.optimizePlan(lp);
- assertEquals("checking partition filter:",
- "((mrkt == 'us') and (srcid == 10))",
- TestLoader.partFilter.toString());
- LOFilter filter = (LOFilter) lp.getLeaves().get(0);
- String actual = PColFilterExtractor.getExpression(
- (ExpressionOperator) filter.getComparisonPlan().
- getLeaves().get(0)).
- toString().toLowerCase();
- assertEquals("checking trimmed filter expression:",
- "((age >= 20) and (f3 == 15))", actual);
- }
-
- /**
- * Test PIG-1267
- * @throws Exception
- */
- @Test
- public void testColNameMapping5() throws Exception {
- TestLoader.partFilter = null;
- lpTester.buildPlan("a = load 'foo' using "
- + TestLoader.class.getName() +
- "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
- "'srcid');");
- lpTester.buildPlan("b = load 'bar' using "
- + TestLoader.class.getName() +
- "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
- "'srcid');");
- lpTester.buildPlan("a1 = filter a by srcid == 10;");
- lpTester.buildPlan("b1 = filter b by srcid == 20;");
- lpTester.buildPlan("c = join a1 by bcookie, b1 by bcookie;");
- LogicalPlan lp = lpTester
- .buildPlan("d = foreach c generate $4 as bcookie:chararray, " +
- "$5 as dstid:int, $0 as mrkt:chararray;");
-
- new PlanSetter(lp).visit();
-
- lpTester.typeCheckPlan(lp);
- lpTester.optimizePlan(lp);
-
- assertEquals("checking partition filter:",
- "(srcid == 20)",
- TestLoader.partFilter.toString());
-
- int counter = 0;
- Iterator<LogicalOperator> iter = lp.getKeys().values().iterator();
- while (iter.hasNext()) {
- assertTrue(!(iter.next() instanceof LOFilter));
- counter++;
- }
- assertEquals(counter, 6);
- }
-
- //// helper methods ///////
-
- private PColFilterExtractor test(LogicalPlan lp, List<String> partitionCols,
- String expPartFilterString, String expFilterString)
- throws FrontendException {
- LOFilter filter = (LOFilter)lp.getLeaves().get(0);
- PColFilterExtractor pColExtractor = new PColFilterExtractor(
- filter.getComparisonPlan(), partitionCols);
- pColExtractor.visit();
-
- if(expPartFilterString == null) {
- assertEquals("Checking partition column filter:", null,
- pColExtractor.getPColCondition());
- } else {
- assertEquals("Checking partition column filter:",
- expPartFilterString.toLowerCase(),
- pColExtractor.getPColCondition().toString().toLowerCase());
- }
-
- if(expFilterString == null) {
- assertTrue("Check that filter can be removed:",
- pColExtractor.isFilterRemovable());
- } else {
- String actual = PColFilterExtractor.getExpression(
- (ExpressionOperator) filter.getComparisonPlan().
- getLeaves().get(0)).
- toString().toLowerCase();
- assertEquals("checking trimmed filter expression:", expFilterString,
- actual);
- }
- return pColExtractor;
- }
-
- private void negativeTest(LogicalPlan lp, List<String> partitionCols,
- int expectedErrorCode) {
- LOFilter filter = (LOFilter)lp.getLeaves().get(0);
- PColFilterExtractor pColExtractor = new PColFilterExtractor(
- filter.getComparisonPlan(), partitionCols);
- try {
- pColExtractor.visit();
- } catch(Exception e) {
- assertEquals("Checking if exception has right error code",
- expectedErrorCode, LogUtils.getPigException(e).getErrorCode());
- return;
- }
- fail("Exception expected!");
- }
-
- /**
- * this loader is only used to test that parition column filters are given
- * in the manner expected in terms of column names - hence it does not
- * implement many of the methods and only implements required ones.
- */
- public static class TestLoader extends LoadFunc implements LoadMetadata {
-
- Schema schema;
- String[] partCols;
- static Expression partFilter = null;
-
- public TestLoader(String schemaString, String commaSepPartitionCols)
- throws ParseException {
- schema = Util.getSchemaFromString(schemaString);
- partCols = commaSepPartitionCols.split(",");
- }
-
- @Override
- public InputFormat getInputFormat() throws IOException {
- return null;
- }
-
- @Override
- public Tuple getNext() throws IOException {
- return null;
- }
-
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split)
- throws IOException {
- }
-
- @Override
- public void setLocation(String location, Job job) throws IOException {
- }
-
- @Override
- public String[] getPartitionKeys(String location, Job job)
- throws IOException {
- return partCols;
- }
-
- @Override
- public ResourceSchema getSchema(String location, Job job)
- throws IOException {
- return new ResourceSchema(schema);
- }
-
- @Override
- public ResourceStatistics getStatistics(String location,
- Job job) throws IOException {
- return null;
- }
-
- @Override
- public void setPartitionFilter(Expression partitionFilter)
- throws IOException {
- partFilter = partitionFilter;
- }
-
- }
-}