You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/02 22:36:50 UTC

svn commit: r701235 [2/3] - in /incubator/pig/branches/types: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/impl/logicalLayer/...

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java Thu Oct  2 13:36:49 2008
@@ -36,10 +36,21 @@
 import org.apache.pig.builtin.PigStorage;
 import org.junit.Test;
 import static org.apache.pig.test.utils.TypeCheckingTestUtil.* ;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.apache.pig.test.utils.TypeCheckingTestUtil;
 
 public class TestTypeCheckingValidator extends TestCase {
 
+    LogicalPlanTester planTester = new LogicalPlanTester() ;
     
+	private static final String simpleEchoStreamingCommand;
+        static {
+            if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+                simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+            else
+                simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+        }
+
     @Test
     public void testExpressionTypeChecking1() throws Throwable {
         LogicalPlan plan = new LogicalPlan() ;
@@ -2774,33 +2785,2139 @@
 
     }
 
+    @Test
+    public void testLineage1() throws Throwable {
+        planTester.buildPlan("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
+        LogicalPlan plan = planTester.buildPlan("b = foreach a generate field1 + 1.0 ;") ;
 
-    ////////////////////////// Helper //////////////////////////////////
-    private void checkForEachCasting(LOForEach foreach, int idx, boolean isCast, byte toType) {
-        LogicalPlan plan = foreach.getForEachPlans().get(idx) ;
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
 
-        if (isCast) {
-            List<LogicalOperator> leaveList = plan.getLeaves() ;
-            assertEquals(leaveList.size(), 1);
-            assertTrue(leaveList.get(0) instanceof LOCast);
-            assertTrue(leaveList.get(0).getType() == toType) ;
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
         }
-        else {
-            List<LogicalOperator> leaveList = plan.getLeaves() ;
-            assertEquals(leaveList.size(), 1);
-            assertTrue(leaveList.get(0) instanceof LOProject);
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc() == null);
+
+    }
+
+    @Test
+    public void testLineage1NoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a';") ;
+        LogicalPlan plan = planTester.buildPlan("b = foreach a generate $1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testLineage2() throws Throwable {
+        planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+        LogicalPlan plan = planTester.buildPlan("b = foreach a generate field1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testGroupLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = group a by field1 ;") ;
+        planTester.buildPlan("c = foreach b generate flatten(a) ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate field1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+    }
+
+    @Test
+    public void testGroupLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = group a by $0 ;") ;
+        planTester.buildPlan("c = foreach b generate flatten(a) ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $0 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+    }
+
+    @Test
+    public void testGroupLineage2() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = group a by field1 ;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate group + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
         }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+    }
+
+    @Test
+    public void testGroupLineage2NoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = group a by $0 ;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate group + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
         
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+    }
+
+    @Test
+    public void testCogroupLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupMapLookupLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, field1#'key' + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOMapLookup map = (LOMapLookup)foreachPlan.getSuccessors(exOp).get(0);
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(map).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupLineageFail() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+            fail("Exception expected") ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Expect error") ;
+        }
+
     }
 
-    private void printPlan(LogicalPlan lp, String title) {
+    @Test
+    public void testCogroupUDFLineageFail() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate flatten(DIFF(a, b)) as diff_a_b ;") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate diff_a_b + 1;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
         try {
-            System.err.println(title);
-            LOPrinter lv = new LOPrinter(System.err, lp);
-            lv.visit();
-            System.err.println();
-        } catch (Exception e) {
+            typeValidator.validate(plan, collector) ;
+            fail("Exception expected") ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Expect error") ;
+        }
+
+    }
+
+    @Test
+    public void testCogroupLineage2NoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, $1 + 1, $2 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
         }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testUnionLineage() throws Throwable {
+        //here the type checker will insert a cast for the union, converting the column field2 into a float
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = union a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate field2 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc() == null);
+
+    }
+
+    @Test
+    public void testUnionLineageFail() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = union a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate field1 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+            fail("Exception expected") ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Expect error") ;
+        }
+
+    }
+
+    @Test
+    public void testUnionLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("c = union a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+    }
+
+    @Test
+    public void testUnionLineageNoSchemaFail() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = union a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+            fail("Exception expected") ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Expect error") ;
+        }
+
+    }
+
+    @Test
+    public void testUnionLineageDifferentSchema() throws Throwable {
+        //here the type checker will insert a cast for the union, converting the column field2 into a float
+        planTester.buildPlan("a = load 'a' using PigStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray, field7 );") ;
+        planTester.buildPlan("c = union a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testUnionLineageDifferentSchemaFail() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray, field7 );") ;
+        planTester.buildPlan("c = union a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+            fail("Exception expected") ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Expect error") ;
+        }
+
+    }
+
+    @Test
+    public void testUnionLineageMixSchema() throws Throwable {
+        //here the type checker will insert a cast for the union, converting the column field2 into a float
+        planTester.buildPlan("a = load 'a' using PigStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = union a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testUnionLineageMixSchemaFail() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = union a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+            fail("Exception expected") ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Expect error") ;
+        }
+
+    }
+
+    @Test
+    public void testFilterLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+        LogicalPlan plan = planTester.buildPlan("b = filter a by field1 > 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOFilter filter = (LOFilter)plan.getLeaves().get(0);
+        LogicalPlan filterPlan = filter.getComparisonPlan();
+
+        LogicalOperator exOp = filterPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = filterPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)filterPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testFilterLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' ;") ;
+        LogicalPlan plan = planTester.buildPlan("b = filter a by $0 > 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOFilter filter = (LOFilter)plan.getLeaves().get(0);
+        LogicalPlan filterPlan = filter.getComparisonPlan();
+
+        LogicalOperator exOp = filterPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = filterPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)filterPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testFilterLineage1() throws Throwable {
+        planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = filter a by field2 > 1.0 ;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate field1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testFilterLineage1NoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' ;") ;
+        planTester.buildPlan("b = filter a by $0 > 1.0 ;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupFilterLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = filter d by field4 > 5;") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupFilterLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = filter d by $2 > 5;") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, $1 + 1, $2 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testSplitLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+        LogicalPlan plan = planTester.buildPlan("split a into b if field1 > 1.0, c if field1 <= 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOSplitOutput splitOutputB = (LOSplitOutput)plan.getLeaves().get(0);
+        LogicalPlan bPlan = splitOutputB.getConditionPlan();
+
+        LogicalOperator exOp = bPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = bPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)bPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+        LOSplitOutput splitOutputC = (LOSplitOutput)plan.getLeaves().get(0);
+        LogicalPlan cPlan = splitOutputC.getConditionPlan();
+
+        exOp = cPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = cPlan.getRoots().get(1);
+
+        cast = (LOCast)cPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+    }
+
+    @Test
+    public void testSplitLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' ;") ;
+        LogicalPlan plan = planTester.buildPlan("split a into b if $0 > 1.0, c if $1 <= 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOSplitOutput splitOutputB = (LOSplitOutput)plan.getLeaves().get(0);
+        LogicalPlan bPlan = splitOutputB.getConditionPlan();
+
+        LogicalOperator exOp = bPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = bPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)bPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+        LOSplitOutput splitOutputC = (LOSplitOutput)plan.getLeaves().get(0);
+        LogicalPlan cPlan = splitOutputC.getConditionPlan();
+
+        exOp = cPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = cPlan.getRoots().get(1);
+
+        cast = (LOCast)cPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testSplitLineage1() throws Throwable {
+        planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("split a into b if field2 > 1.0, c if field2 <= 1.0 ;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate field1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testSplitLineage1NoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' ;") ;
+        planTester.buildPlan("split a into b if $0 > 1.0, c if $1 <= 1.0 ;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupSplitLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("split d into e if field4 > 'm', f if field6 > 'm'  ;") ;
+        LogicalPlan plan = planTester.buildPlan("g = foreach e generate group, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupSplitLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("split d into e if $1 > 'm', f if $1 > 'm'  ;") ;
+        LogicalPlan plan = planTester.buildPlan("g = foreach e generate group, $1 + 1, $2 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testDistinctLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = distinct a;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate field1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testDistinctLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' ;") ;
+        planTester.buildPlan("b = distinct a;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupDistinctLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = distinct d ;") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupDistinctLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = distinct d ;") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, $1 + 1, $2 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testSortLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = order a by field1;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate field1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testSortLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' ;") ;
+        planTester.buildPlan("b = order a by $1;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupSortLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = order d by field4 desc;") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupSortLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = order d by $2 desc;") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, $1 + 1, $2 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCrossLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cross a, b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(1);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+    }
+
+    @Test
+    public void testCrossLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("c = cross a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+    }
+
+    @Test
+    public void testCrossLineageNoSchemaFail() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cross a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+            fail("Exception expected") ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Expect error") ;
+        }
+
+    }
+
+    @Test
+    public void testCrossLineageMixSchema() throws Throwable {
+        //here the type checker will insert a cast for the union, converting the column field2 into a float
+        planTester.buildPlan("a = load 'a' using PigStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cross a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCrossLineageMixSchemaFail() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cross a , b ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+            fail("Exception expected") ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Expect error") ;
+        }
+
+    }
+
+    @Test
+    public void testJoinLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = join a by field1, b by field4 ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(1);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+    }
+
+    @Test
+    public void testJoinLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("c = join a by $0, b by $0 ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+    }
+
+    @Test
+    public void testJoinLineageNoSchemaFail() throws Throwable {
+        //this test case should change when we decide on what flattening a tuple or bag
+        //with null schema results in a foreach flatten and hence a join
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = join a by $0, b by $0 ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $1 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+    }
+
+    @Test
+    public void testJoinLineageMixSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using PigStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = join a by field1, b by $0 ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testJoinLineageMixSchemaFail() throws Throwable {
+        //this test case should change when we decide on what flattening a tuple or bag
+        //with null schema results in a foreach flatten and hence a join
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = join a by field1, b by $0 ;") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $3 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            // good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+    }
+
+    @Test
+    public void testLimitLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = limit a 100;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate field1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testLimitLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' ;") ;
+        planTester.buildPlan("b = limit a 100;") ;
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupLimitLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = limit d 100;") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupLimitLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = limit d 100;") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, $1 + 1, $2 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupTopKLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = order d by field1 desc;") ;
+        planTester.buildPlan("f = limit e 100;") ;
+        LogicalPlan plan = planTester.buildPlan("g = foreach f generate group, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupTopKLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = order d by $2 desc;") ;
+        planTester.buildPlan("f = limit e 100;") ;
+        LogicalPlan plan = planTester.buildPlan("g = foreach f generate group, $1 + 1, $2 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testStreamingLineage1() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1: int, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = stream a through `" + simpleEchoStreamingCommand + "`;");
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate $1 + 1.0 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testStreamingLineage2() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1: int, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = stream a through `" + simpleEchoStreamingCommand + "` as (f1, f2: float);");
+        LogicalPlan plan = planTester.buildPlan("c = foreach b generate f1 + 1.0, f2 + 4 ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(1);
+
+        exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        assertTrue(foreachPlan.getSuccessors(exOp).get(0) instanceof LOAdd);
+    }
+
+    @Test
+    public void testCogroupStreamingLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = stream a through `" + simpleEchoStreamingCommand + "` as (field4, field5, field6: chararray);");
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {

[... 83 lines stripped ...]