You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/02 22:36:50 UTC
svn commit: r701235 [2/3] - in /incubator/pig/branches/types: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/impl/logicalLayer/...
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java Thu Oct 2 13:36:49 2008
@@ -36,10 +36,21 @@
import org.apache.pig.builtin.PigStorage;
import org.junit.Test;
import static org.apache.pig.test.utils.TypeCheckingTestUtil.* ;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.apache.pig.test.utils.TypeCheckingTestUtil;
public class TestTypeCheckingValidator extends TestCase {
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ private static final String simpleEchoStreamingCommand;
+ static {
+ if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+ simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+ else
+ simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+ }
+
@Test
public void testExpressionTypeChecking1() throws Throwable {
LogicalPlan plan = new LogicalPlan() ;
@@ -2774,33 +2785,2139 @@
}
+ @Test
+ public void testLineage1() throws Throwable {
+ planTester.buildPlan("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
+ LogicalPlan plan = planTester.buildPlan("b = foreach a generate field1 + 1.0 ;") ;
- ////////////////////////// Helper //////////////////////////////////
- private void checkForEachCasting(LOForEach foreach, int idx, boolean isCast, byte toType) {
- LogicalPlan plan = foreach.getForEachPlans().get(idx) ;
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
- if (isCast) {
- List<LogicalOperator> leaveList = plan.getLeaves() ;
- assertEquals(leaveList.size(), 1);
- assertTrue(leaveList.get(0) instanceof LOCast);
- assertTrue(leaveList.get(0).getType() == toType) ;
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
}
- else {
- List<LogicalOperator> leaveList = plan.getLeaves() ;
- assertEquals(leaveList.size(), 1);
- assertTrue(leaveList.get(0) instanceof LOProject);
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc() == null);
+
+ }
+
+ @Test
+ public void testLineage1NoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a';") ;
+ LogicalPlan plan = planTester.buildPlan("b = foreach a generate $1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testLineage2() throws Throwable {
+ planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+ LogicalPlan plan = planTester.buildPlan("b = foreach a generate field1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testGroupLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = group a by field1 ;") ;
+ planTester.buildPlan("c = foreach b generate flatten(a) ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate field1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ }
+
+ @Test
+ public void testGroupLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = group a by $0 ;") ;
+ planTester.buildPlan("c = foreach b generate flatten(a) ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $0 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ }
+
+ @Test
+ public void testGroupLineage2() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = group a by field1 ;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate group + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
}
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ }
+
+ @Test
+ public void testGroupLineage2NoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = group a by $0 ;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate group + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ }
+
+ @Test
+ public void testCogroupLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupMapLookupLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, field1#'key' + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOMapLookup map = (LOMapLookup)foreachPlan.getSuccessors(exOp).get(0);
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(map).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupLineageFail() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ fail("Exception expected") ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
}
- private void printPlan(LogicalPlan lp, String title) {
+ @Test
+ public void testCogroupUDFLineageFail() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate flatten(DIFF(a, b)) as diff_a_b ;") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate diff_a_b + 1;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
try {
- System.err.println(title);
- LOPrinter lv = new LOPrinter(System.err, lp);
- lv.visit();
- System.err.println();
- } catch (Exception e) {
+ typeValidator.validate(plan, collector) ;
+ fail("Exception expected") ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
+ }
+
+ @Test
+ public void testCogroupLineage2NoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, $1 + 1, $2 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
}
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testUnionLineage() throws Throwable {
+ //here the type checker will insert a cast for the union, converting the column field2 into a float
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = union a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate field2 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc() == null);
+
+ }
+
+ @Test
+ public void testUnionLineageFail() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = union a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate field1 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ fail("Exception expected") ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
+ }
+
+ @Test
+ public void testUnionLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("c = union a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+ }
+
+ @Test
+ public void testUnionLineageNoSchemaFail() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = union a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ fail("Exception expected") ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
+ }
+
+ @Test
+ public void testUnionLineageDifferentSchema() throws Throwable {
+ //here the type checker will insert a cast for the union, converting the column field2 into a float
+ planTester.buildPlan("a = load 'a' using PigStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray, field7 );") ;
+ planTester.buildPlan("c = union a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testUnionLineageDifferentSchemaFail() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray, field7 );") ;
+ planTester.buildPlan("c = union a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ fail("Exception expected") ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
+ }
+
+ @Test
+ public void testUnionLineageMixSchema() throws Throwable {
+ //here the type checker will insert a cast for the union, converting the column field2 into a float
+ planTester.buildPlan("a = load 'a' using PigStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = union a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testUnionLineageMixSchemaFail() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = union a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ fail("Exception expected") ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
+ }
+
+ @Test
+ public void testFilterLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+ LogicalPlan plan = planTester.buildPlan("b = filter a by field1 > 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOFilter filter = (LOFilter)plan.getLeaves().get(0);
+ LogicalPlan filterPlan = filter.getComparisonPlan();
+
+ LogicalOperator exOp = filterPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = filterPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)filterPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testFilterLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' ;") ;
+ LogicalPlan plan = planTester.buildPlan("b = filter a by $0 > 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOFilter filter = (LOFilter)plan.getLeaves().get(0);
+ LogicalPlan filterPlan = filter.getComparisonPlan();
+
+ LogicalOperator exOp = filterPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = filterPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)filterPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testFilterLineage1() throws Throwable {
+ planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = filter a by field2 > 1.0 ;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate field1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testFilterLineage1NoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' ;") ;
+ planTester.buildPlan("b = filter a by $0 > 1.0 ;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupFilterLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = filter d by field4 > 5;") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupFilterLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = filter d by $2 > 5;") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, $1 + 1, $2 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testSplitLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+ LogicalPlan plan = planTester.buildPlan("split a into b if field1 > 1.0, c if field1 <= 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOSplitOutput splitOutputB = (LOSplitOutput)plan.getLeaves().get(0);
+ LogicalPlan bPlan = splitOutputB.getConditionPlan();
+
+ LogicalOperator exOp = bPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = bPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)bPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ LOSplitOutput splitOutputC = (LOSplitOutput)plan.getLeaves().get(0);
+ LogicalPlan cPlan = splitOutputC.getConditionPlan();
+
+ exOp = cPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = cPlan.getRoots().get(1);
+
+ cast = (LOCast)cPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+ }
+
+ @Test
+ public void testSplitLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' ;") ;
+ LogicalPlan plan = planTester.buildPlan("split a into b if $0 > 1.0, c if $1 <= 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOSplitOutput splitOutputB = (LOSplitOutput)plan.getLeaves().get(0);
+ LogicalPlan bPlan = splitOutputB.getConditionPlan();
+
+ LogicalOperator exOp = bPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = bPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)bPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ LOSplitOutput splitOutputC = (LOSplitOutput)plan.getLeaves().get(0);
+ LogicalPlan cPlan = splitOutputC.getConditionPlan();
+
+ exOp = cPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = cPlan.getRoots().get(1);
+
+ cast = (LOCast)cPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testSplitLineage1() throws Throwable {
+ planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("split a into b if field2 > 1.0, c if field2 <= 1.0 ;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate field1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testSplitLineage1NoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' ;") ;
+ planTester.buildPlan("split a into b if $0 > 1.0, c if $1 <= 1.0 ;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupSplitLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("split d into e if field4 > 'm', f if field6 > 'm' ;") ;
+ LogicalPlan plan = planTester.buildPlan("g = foreach e generate group, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupSplitLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("split d into e if $1 > 'm', f if $1 > 'm' ;") ;
+ LogicalPlan plan = planTester.buildPlan("g = foreach e generate group, $1 + 1, $2 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testDistinctLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = distinct a;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate field1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testDistinctLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' ;") ;
+ planTester.buildPlan("b = distinct a;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupDistinctLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = distinct d ;") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupDistinctLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = distinct d ;") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, $1 + 1, $2 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testSortLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = order a by field1;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate field1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testSortLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' ;") ;
+ planTester.buildPlan("b = order a by $1;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupSortLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = order d by field4 desc;") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupSortLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = order d by $2 desc;") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, $1 + 1, $2 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCrossLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cross a, b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(1);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+ }
+
+ @Test
+ public void testCrossLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("c = cross a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+ }
+
+ @Test
+ public void testCrossLineageNoSchemaFail() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cross a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ fail("Exception expected") ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
+ }
+
+ @Test
+ public void testCrossLineageMixSchema() throws Throwable {
+ //here the type checker will insert a cast for the union, converting the column field2 into a float
+ planTester.buildPlan("a = load 'a' using PigStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cross a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCrossLineageMixSchemaFail() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cross a , b ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ fail("Exception expected") ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
+ }
+
+ @Test
+ public void testJoinLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = join a by field1, b by field4 ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(1);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+ }
+
+ @Test
+ public void testJoinLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("c = join a by $0, b by $0 ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+ }
+
+ @Test
+ public void testJoinLineageNoSchemaFail() throws Throwable {
+ //this test case should change when we decide on what flattening a tuple or bag
+ //with null schema results in a foreach flatten and hence a join
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = join a by $0, b by $0 ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+ }
+
+ @Test
+ public void testJoinLineageMixSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using PigStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = join a by field1, b by $0 ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testJoinLineageMixSchemaFail() throws Throwable {
+ //this test case should change when we decide on what flattening a tuple or bag
+ //with null schema results in a foreach flatten and hence a join
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = join a by field1, b by $0 ;") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ // good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+ }
+
+ @Test
+ public void testLimitLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = limit a 100;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate field1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testLimitLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' ;") ;
+ planTester.buildPlan("b = limit a 100;") ;
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupLimitLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = limit d 100;") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupLimitLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = limit d 100;") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, $1 + 1, $2 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupTopKLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = order d by field1 desc;") ;
+ planTester.buildPlan("f = limit e 100;") ;
+ LogicalPlan plan = planTester.buildPlan("g = foreach f generate group, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupTopKLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = order d by $2 desc;") ;
+ planTester.buildPlan("f = limit e 100;") ;
+ LogicalPlan plan = planTester.buildPlan("g = foreach f generate group, $1 + 1, $2 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testStreamingLineage1() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1: int, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = stream a through `" + simpleEchoStreamingCommand + "`;");
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testStreamingLineage2() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1: int, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = stream a through `" + simpleEchoStreamingCommand + "` as (f1, f2: float);");
+ LogicalPlan plan = planTester.buildPlan("c = foreach b generate f1 + 1.0, f2 + 4 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(1);
+
+ exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ assertTrue(foreachPlan.getSuccessors(exOp).get(0) instanceof LOAdd);
+ }
+
+ @Test
+ public void testCogroupStreamingLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = stream a through `" + simpleEchoStreamingCommand + "` as (field4, field5, field6: chararray);");
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
[... 83 lines stripped ...]