You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/03/04 19:15:14 UTC

svn commit: r1078085 [11/12] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ s...

Added: pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java?rev=1078085&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java Fri Mar  4 18:15:11 2011
@@ -0,0 +1,5239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.genDummyLOLoadNewLP;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.genFlatSchema;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.genFlatSchemaInTuple;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.printCurrentMethodName;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.printMessageCollector;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.logicalLayer.validators.TypeCheckerException;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.CompilationMessageCollector.Message;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.Util;
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinCondExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.DereferenceExpression;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NegativeExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.newplan.logical.visitor.CastLineageSetter;
+import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
+import org.apache.pig.newplan.logical.visitor.TypeCheckingExpVisitor;
+import org.apache.pig.newplan.logical.visitor.TypeCheckingRelVisitor;
+import org.apache.pig.parser.ParserTestingUtils;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTypeCheckingValidatorNewLP {
+
+    PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+    LogicalPlanTester planTester;
+    private static final String CAST_LOAD_NOT_FOUND = 
+        "Cannot resolve load function to use for casting from bytearray";
+    
+    
+    /* (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+    @Before
+    public void setUp() throws Exception {
+        // create a new instance of the plan tester
+        // for each test so that different tests do not
+        // interact with each other's plans
+        planTester = new LogicalPlanTester(pc) ;
+    }
+
+    private static final String simpleEchoStreamingCommand;
+    static {
+        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+            simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+        else
+            simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+        File fileA = new File("a");
+        File fileB = new File("b");
+        try {
+            fileA.delete();
+            fileB.delete();
+            if(!fileA.createNewFile() || !fileB.createNewFile())
+                fail("Unable to create input files");
+        } catch (IOException e) {
+            fail("Unable to create input files:" + e.getMessage());
+        }
+        fileA.deleteOnExit();
+        fileB.deleteOnExit();
+    }
+
+    @Test
+    public void testExpressionTypeChecking1() throws Throwable {
+        LogicalExpressionPlan expPlan = new LogicalExpressionPlan();
+
+        ConstantExpression constant1 = new ConstantExpression(expPlan, 10);
+
+        ConstantExpression constant2 =  new ConstantExpression(expPlan, 20D) ;
+        ConstantExpression constant3 =  new ConstantExpression(expPlan, 123f) ;
+
+        AddExpression add1 = new AddExpression(expPlan, constant1, constant2) ;
+        CastExpression cast1 = new CastExpression(expPlan,constant3, createFS(DataType.DOUBLE)) ;
+        MultiplyExpression mul1 = new MultiplyExpression(expPlan, add1, cast1) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(expPlan, collector, null);
+        expTypeChecker.visit();
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (collector.hasError()) {
+            throw new Exception("Error during type checking") ;
+        }       
+
+        // Induction check
+        assertEquals(DataType.DOUBLE, add1.getType()) ;
+        assertEquals(DataType.DOUBLE, mul1.getType()) ;
+
+        // Cast insertion check
+        assertEquals(DataType.DOUBLE, add1.getLhs().getType()) ;
+        assertEquals(DataType.DOUBLE, mul1.getRhs().getType()) ;
+
+    }
+
+    private LogicalFieldSchema createFS(byte datatype) {
+        return new LogicalFieldSchema(null, null, datatype);
+    }
+
+    @Test
+    public void testExpressionTypeCheckingFail1() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, 20D) ;
+        ConstantExpression constant3 =  new ConstantExpression(plan, "123") ;
+
+        AddExpression add1 = new AddExpression(plan, constant1, constant2) ;
+        CastExpression cast1 = new CastExpression(plan, constant3,  createFS(DataType.BYTEARRAY)) ;
+        MultiplyExpression mul1 = new MultiplyExpression(plan, add1, cast1) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+        try {
+            expTypeChecker.visit();
+            fail("Exception expected") ;
+        }
+        catch (TypeCheckerException pve) {
+            // good
+        }
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (!collector.hasError()) {
+            throw new Exception("Error during type checking") ;
+        }       
+    }
+
+    @Test
+    public void testExpressionTypeChecking2() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan,  new DataByteArray()) ;
+        ConstantExpression constant3 =  new ConstantExpression(plan,  123L) ;
+        ConstantExpression constant4 =  new ConstantExpression(plan,  true) ;
+
+        SubtractExpression sub1 = new SubtractExpression(plan, constant1, constant2) ;
+        GreaterThanExpression gt1 = new GreaterThanExpression(plan, sub1, constant3) ;
+        AndExpression and1 = new AndExpression(plan, gt1, constant4) ;
+        NotExpression not1 = new NotExpression(plan, and1) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+        expTypeChecker.visit();
+
+
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (collector.hasError()) {
+            throw new Exception("Error not expected during type checking") ;
+        }       
+
+
+        // Induction check   
+        assertEquals(DataType.INTEGER, sub1.getType()) ;    
+        assertEquals(DataType.BOOLEAN, gt1.getType()) ;     
+        assertEquals(DataType.BOOLEAN, and1.getType()) ;    
+        assertEquals(DataType.BOOLEAN, not1.getType()) ;    
+
+        // Cast insertion check     
+        assertEquals(DataType.INTEGER, sub1.getRhs().getType()) ;    
+        assertEquals(DataType.LONG, gt1.getLhs().getType()) ;
+
+    }
+
+
+    @Test
+    public void testExpressionTypeChecking3() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, 20L) ;
+        ConstantExpression constant3 =  new ConstantExpression(plan, 123) ;
+
+        ModExpression mod1 = new ModExpression(plan, constant1, constant2) ;
+        EqualExpression equal1 = new EqualExpression(plan, mod1, constant3) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+        expTypeChecker.visit();
+
+        plan.explain(System.out, "text", true);
+
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (collector.hasError()) {
+            throw new Exception("Error during type checking") ;
+        }      
+
+        // Induction check
+        assertEquals(DataType.LONG, mod1.getType()) ;
+        assertEquals(DataType.BOOLEAN, equal1.getType()) ;
+
+        // Cast insertion check
+        assertEquals(DataType.LONG, mod1.getLhs().getType()) ;
+        assertEquals(DataType.LONG, equal1.getRhs().getType()) ;
+
+    }
+
+    @Test
+    public void testExpressionTypeChecking4() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, 20D) ;
+        ConstantExpression constant3 =  new ConstantExpression(plan,  123f) ;
+
+        DivideExpression div1 = new DivideExpression(plan, constant1, constant2) ;
+        CastExpression cast1 = new CastExpression(plan, constant3, createFS(DataType.DOUBLE));
+        NotEqualExpression notequal1 = new NotEqualExpression(plan, div1, cast1) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+        expTypeChecker.visit();        
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (collector.hasError()) {
+            throw new Exception("Error during type checking") ;
+        }  
+
+        // Induction check
+        assertEquals(DataType.DOUBLE, div1.getType()) ;
+        assertEquals(DataType.BOOLEAN, notequal1.getType()) ;
+
+        // Cast insertion check
+        assertEquals(DataType.DOUBLE, div1.getLhs().getType()) ;
+        assertEquals(DataType.DOUBLE, notequal1.getRhs().getType()) ;
+
+    }
+
+    @Test
+    public void testExpressionTypeCheckingFail4() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, 20D) ;
+        ConstantExpression constant3 =  new ConstantExpression(plan, "123") ;
+
+        DivideExpression div1 = new DivideExpression(plan, constant1, constant2) ;
+        CastExpression cast1 = new CastExpression(plan,  constant3, createFS(DataType.BYTEARRAY)) ;
+        NotEqualExpression notequal1 = new NotEqualExpression(plan, div1, cast1) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+
+        try{
+            expTypeChecker.visit();        
+            fail("Exception expected") ;
+        }
+        catch (TypeCheckerException pve) {
+            // good
+        }
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (!collector.hasError()) {
+            throw new Exception("Error during type checking") ;
+        }  
+    }
+
+    @Test
+    public void testExpressionTypeChecking5() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, 10F) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, 20L) ;
+        ConstantExpression constant3 =  new ConstantExpression(plan, 123F) ;
+        ConstantExpression constant4 =  new ConstantExpression(plan, 123D) ;
+
+        LessThanEqualExpression lesser1 = new LessThanEqualExpression(plan, constant1, constant2) ;
+        BinCondExpression bincond1 = new BinCondExpression(plan, lesser1, constant3, constant4) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+        expTypeChecker.visit();             
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (collector.hasError()) {
+            throw new Exception("Error during type checking") ;
+        }  
+
+        // Induction check
+        assertEquals(DataType.BOOLEAN, lesser1.getType()) ;
+        assertEquals(DataType.DOUBLE, bincond1.getType()) ;
+
+        // Cast insertion check
+        assertEquals(DataType.FLOAT, lesser1.getLhs().getType()) ;
+        assertEquals(DataType.FLOAT, lesser1.getRhs().getType()) ;
+        assertEquals(DataType.DOUBLE, bincond1.getLhs().getType()) ;
+        assertEquals(DataType.DOUBLE, bincond1.getRhs().getType()) ;
+
+    }
+
+    @Test
+    public void testExpressionTypeChecking6() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, "10") ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, 20L) ;
+
+        AddExpression add1 = new AddExpression(plan, constant1, constant2) ; 
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        LogicalRelationalOperator dummyRelOp = createDummyRelOpWithAlias();
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, dummyRelOp);
+        try {
+            expTypeChecker.visit();        
+            fail("Exception expected") ;
+        }
+        catch (TypeCheckerException pve) {
+            // good
+        }
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Error expected") ;
+        }         
+
+    }
+
+    /**
+     * @return a dummy logical relational operator 
+     */
+    private LogicalRelationalOperator createDummyRelOpWithAlias() {
+        class DummyRelOp extends LogicalRelationalOperator{
+            DummyRelOp(){
+                super("dummy", new LogicalPlan());
+                this.alias = "dummy";
+            }
+
+            @Override
+            public LogicalSchema getSchema() throws FrontendException {
+                          return null;
+            }
+
+            @Override
+            public void accept(PlanVisitor v) throws FrontendException {
+
+            }
+
+            @Override
+            public boolean isEqual(Operator operator) throws FrontendException {
+                return false;
+            }
+
+        }
+        return new DummyRelOp();
+    }
+
+
+
+
+    @Test
+    public void testExpressionTypeChecking7() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, 20D) ;
+        ConstantExpression constant3 =  new ConstantExpression(plan, 123L) ;
+
+        GreaterThanExpression gt1 = new GreaterThanExpression(plan, constant1, constant2) ;
+        EqualExpression equal1 = new EqualExpression(plan, gt1, constant3) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        LogicalRelationalOperator dummyRelOp = createDummyRelOpWithAlias();
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, dummyRelOp);
+        try {
+            expTypeChecker.visit();        
+            fail("Exception expected") ;
+        }
+        catch (TypeCheckerException pve) {
+            // good
+        }      
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Error expected") ;
+        }   
+    }
+
+    @Test
+    public void testExpressionTypeChecking8() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+
+        TupleFactory tupleFactory = TupleFactory.getInstance();
+
+        ArrayList<Object> innerObjList = new ArrayList<Object>(); 
+        ArrayList<Object> objList = new ArrayList<Object>(); 
+
+        innerObjList.add(10);
+        innerObjList.add(3);
+        innerObjList.add(7);
+        innerObjList.add(17);
+
+        Tuple innerTuple = tupleFactory.newTuple(innerObjList);
+
+        objList.add("World");
+        objList.add(42);
+        objList.add(innerTuple);
+
+        Tuple tuple = tupleFactory.newTuple(objList);
+
+        ArrayList<Schema.FieldSchema> innerFss = new ArrayList<Schema.FieldSchema>();
+        ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+        ArrayList<Schema.FieldSchema> castFss = new ArrayList<Schema.FieldSchema>();
+
+        Schema.FieldSchema stringFs = new Schema.FieldSchema(null, DataType.CHARARRAY);
+        Schema.FieldSchema intFs = new Schema.FieldSchema(null, DataType.INTEGER);
+
+        for(int i = 0; i < innerObjList.size(); ++i) {
+            innerFss.add(intFs);
+        }
+
+        Schema innerTupleSchema = new Schema(innerFss);
+
+        fss.add(stringFs);
+        fss.add(intFs);
+        fss.add(new Schema.FieldSchema(null, innerTupleSchema, DataType.TUPLE));
+
+        Schema tupleSchema = new Schema(fss);
+
+        Schema.FieldSchema byteArrayFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+        for(int i = 0; i < 4; ++i) {
+            castFss.add(byteArrayFs);
+        }
+
+        Schema castSchema = new Schema(castFss);
+
+
+        ConstantExpression constant1 = new ConstantExpression(plan, innerTuple) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, tuple) ;
+        CastExpression cast1 = new CastExpression(plan, constant1,
+                Util.translateFieldSchema(new FieldSchema(null, castSchema, DataType.TUPLE))) ;
+
+        EqualExpression equal1 = new EqualExpression(plan, cast1, constant2) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+
+        LogicalRelationalOperator dummyRelOp = createDummyRelOpWithAlias();
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, dummyRelOp);
+        expTypeChecker.visit();    
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (collector.hasError()) {
+            throw new Exception("Error during type checking") ;
+        }   
+
+        assertEquals(DataType.BOOLEAN, equal1.getType()) ;
+        assertEquals(DataType.TUPLE, equal1.getRhs().getType()) ;
+        assertEquals(DataType.TUPLE, equal1.getLhs().getType()) ;
+    }
+
+    /*
+     * chararray can been cast to int when jira-893 been resolved
+     */
+    @Test
+    public void testExpressionTypeChecking9() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+
+        TupleFactory tupleFactory = TupleFactory.getInstance();
+
+        ArrayList<Object> innerObjList = new ArrayList<Object>(); 
+        ArrayList<Object> objList = new ArrayList<Object>(); 
+
+        innerObjList.add("10");
+        innerObjList.add("3");
+        innerObjList.add(7);
+        innerObjList.add("17");
+
+        Tuple innerTuple = tupleFactory.newTuple(innerObjList);
+
+        objList.add("World");
+        objList.add(42);
+        objList.add(innerTuple);
+
+        Tuple tuple = tupleFactory.newTuple(objList);
+
+        ArrayList<Schema.FieldSchema> innerFss = new ArrayList<Schema.FieldSchema>();
+        ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+        ArrayList<Schema.FieldSchema> castFss = new ArrayList<Schema.FieldSchema>();
+
+        Schema.FieldSchema stringFs = new Schema.FieldSchema(null, DataType.CHARARRAY);
+        Schema.FieldSchema intFs = new Schema.FieldSchema(null, DataType.INTEGER);
+        Schema.FieldSchema doubleFs = new Schema.FieldSchema(null, DataType.DOUBLE);
+
+        innerFss.add(stringFs);
+        innerFss.add(stringFs);
+        innerFss.add(intFs);
+        innerFss.add(stringFs);
+
+        Schema innerTupleSchema = new Schema(innerFss);
+
+        fss.add(stringFs);
+        fss.add(intFs);
+        fss.add(new Schema.FieldSchema(null, innerTupleSchema, DataType.TUPLE));
+
+        Schema tupleSchema = new Schema(fss);
+
+        castFss.add(stringFs);
+        castFss.add(stringFs);
+        castFss.add(doubleFs);
+        castFss.add(intFs);
+
+        Schema castSchema = new Schema(castFss);
+
+
+        ConstantExpression constant1 = new ConstantExpression(plan, innerTuple) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, tuple) ;
+        CastExpression cast1 = new CastExpression(plan, constant1,
+                Util.translateFieldSchema(new FieldSchema(null, castSchema, DataType.TUPLE))) ;
+
+        EqualExpression equal1 = new EqualExpression(plan, cast1, constant2) ;
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        LogicalRelationalOperator dummyRelOp = createDummyRelOpWithAlias();
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, dummyRelOp);
+        try {
+            expTypeChecker.visit();        
+        }
+        catch (TypeCheckerException pve) {
+            fail("Exception not expected") ;
+        } 
+
+        printMessageCollector(collector) ;
+        //printTypeGraph(plan) ;
+
+        if (collector.hasError()) {
+            throw new Exception("Error expected") ;
+        }   
+    }
+
+    @Test
+    public void testArithmeticOpCastInsert1() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, 20D) ;
+
+        MultiplyExpression mul1 = new MultiplyExpression(plan,constant1, constant2) ;
+
+        // Before type checking its set correctly - PIG-421
+//        System.out.println(DataType.findTypeName(mul1.getType())) ;
+//        assertEquals(DataType.DOUBLE, mul1.getType()) ;    
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+        expTypeChecker.visit();       
+        printMessageCollector(collector) ;
+
+        //printTypeGraph(plan) ;
+
+        // After type checking
+        System.out.println(DataType.findTypeName(mul1.getType())) ;
+        assertEquals(DataType.DOUBLE, mul1.getType()) ;
+    }
+
+    @Test
+    public void testArithmeticOpCastInsert2() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, 20L) ;
+
+        NegativeExpression neg1 = new NegativeExpression(plan, constant1) ;
+        SubtractExpression subtract1 = new SubtractExpression(plan, neg1, constant2) ;
+
+        // Before type checking its set correctly = PIG-421
+//        assertEquals(DataType.LONG, subtract1.getType()) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+        expTypeChecker.visit();   
+        printMessageCollector(collector) ;
+
+        //printTypeGraph(plan) ;
+
+        // After type checking
+        System.out.println(DataType.findTypeName(subtract1.getType())) ;
+        assertEquals(DataType.LONG, subtract1.getType()) ;
+
+        assertTrue(subtract1.getLhs() instanceof CastExpression);
+        assertEquals(((CastExpression)subtract1.getLhs()).getType(), DataType.LONG);
+        assertTrue(((CastExpression)subtract1.getLhs()).getExpression() == neg1);
+    }
+
+    @Test
+    public void testModCastInsert1() throws Throwable {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+        ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+        ConstantExpression constant2 =  new ConstantExpression(plan, 20L) ;
+
+        ModExpression mod1 = new ModExpression(plan, constant1, constant2) ;
+
+        // Before type checking its set correctly = PIG-421
+//        assertEquals(DataType.LONG, mod1.getType()) ;
+
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+        expTypeChecker.visit();   
+        printMessageCollector(collector) ;
+
+        //printTypeGraph(plan) ;
+
+        // After type checking
+        System.out.println(DataType.findTypeName(mod1.getType())) ;
+        assertEquals(DataType.LONG, mod1.getType()) ;
+
+        assertTrue(mod1.getLhs() instanceof CastExpression);
+        assertEquals(((CastExpression)mod1.getLhs()).getType(), DataType.LONG);
+        assertTrue(((CastExpression)mod1.getLhs()).getExpression() == constant1);
+    }
+    
+    
+        // Positive case
+        @Test
+        public void testRegexTypeChecking1() throws Throwable {
+            LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+            ConstantExpression constant1 = new ConstantExpression(plan, "10") ;
+            ConstantExpression constant2 = new ConstantExpression(plan, "Regex");
+    
+            RegexExpression regex = new RegexExpression(plan, constant1, constant2) ;
+    
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+            expTypeChecker.visit();  
+            printMessageCollector(collector) ;
+    
+            //printTypeGraph(plan) ;
+    
+            // After type checking
+            System.out.println(DataType.findTypeName(regex.getType())) ;
+            assertEquals(DataType.BOOLEAN, regex.getType()) ;
+        }
+    
+        // Positive case with cast insertion
+        @Test
+        public void testRegexTypeChecking2() throws Throwable {
+            LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+            ConstantExpression constant1 = new ConstantExpression(plan, 
+                    new DataByteArray()
+            ) ;
+            ConstantExpression constant2 = new ConstantExpression(plan, "Regex");
+    
+            RegexExpression regex = new RegexExpression(plan, constant1, constant2);
+
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+            expTypeChecker.visit();  
+            printMessageCollector(collector) ;
+    
+            //printTypeGraph(plan) ;
+    
+            // After type checking
+    
+            if (collector.hasError()) {
+                throw new Exception("Error not expected during type checking") ;
+            }       
+            
+            // check type
+            System.out.println(DataType.findTypeName(regex.getType())) ;
+            assertEquals(DataType.BOOLEAN, regex.getType()) ;
+                                             
+            // check wiring      
+            CastExpression cast = (CastExpression) regex.getLhs() ;      
+            assertEquals(cast.getType(), DataType.CHARARRAY);    
+            assertEquals(cast.getExpression(), constant1) ;
+        }
+    
+            // Negative case
+        @Test
+        public void testRegexTypeChecking3() throws Throwable {
+            LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+            ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+            ConstantExpression constant2 = new ConstantExpression(plan, "Regex");
+            
+            RegexExpression regex = new RegexExpression(plan, constant1, constant2);
+    
+            try {
+                CompilationMessageCollector collector = new CompilationMessageCollector() ;
+                TypeCheckingExpVisitor expTypeChecker = new TypeCheckingExpVisitor(plan, collector, null);
+                expTypeChecker.visit();  
+                printMessageCollector(collector) ;
+                fail("Exception expected") ;
+            }
+            catch (TypeCheckerException pve) {
+                // good
+            }
+    
+        }
+    
+        // Expression plan has to support DAG before this can be used.
+        // Currently it supports only tree.
+    
+        /*
+        @Test
+        public void testDiamondShapedExpressionPlan1() throws Throwable {
+            LogicalExpressionPlan plan = new LogicalExpressionPlan() ;
+            ConstantExpression constant1 = new ConstantExpression(plan, 10) ;
+            constant1.setType(DataType.LONG) ;
+    
+            LONegative neg1 = new LONegative(plan, constant1) ;
+            LONegative neg2 = new LONegative(plan, constant1) ;
+    
+            DivideExpression div1 = new DivideExpression(plan, neg1, neg2) ;
+    
+            LONegative neg3 = new LONegative(plan, div1) ;
+            LONegative neg4 = new LONegative(plan, div1) ;
+    
+            AddExpression add1 = new AddExpression(plan, neg3, neg4) ;
+    
+            plan.add(constant1) ;
+            plan.add(neg1) ;
+            plan.add(neg2) ;
+            plan.add(div1) ;
+            plan.add(neg3) ;
+            plan.add(neg4) ;
+            plan.add(add1) ;
+    
+            plan.connect(constant1, neg1) ;
+            plan.connect(constant1, neg2) ;
+    
+            plan.connect(neg1, div1) ;
+            plan.connect(neg2, div1) ;
+    
+            plan.connect(div1, neg3) ;
+            plan.connect(div1, neg3) ;
+    
+            plan.connect(neg3, add1) ;
+            plan.connect(neg4, add1) ;
+    
+            // Before type checking
+            assertEquals(DataType.UNKNOWN, add1.getType()) ;
+    
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+            typeValidator.validate(plan, collector) ;
+            printMessageCollector(collector) ;
+    
+            //printTypeGraph(plan) ;
+    
+            // After type checking
+            assertEquals(DataType.LONG, div1.getType()) ;
+            assertEquals(DataType.LONG, add1.getType()) ;
+    
+        }
+        */
+        
+        // This tests when both inputs need casting
+        //@Test
+        public void testUnionCastingInsert1() throws Throwable {
+    
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+    
+            String pigStorage = PigStorage.class.getName() ;
+            
+            LOLoad load1 = new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+                
+            LOLoad load2 = new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+//                
+//                new LOLoad(plan,
+//                                      
+//                                      new FileSpec("pi", new FuncSpec(pigStorage)),
+//                                      null) ;
+//            LOLoad load2 = new LOLoad(plan,
+//                                      
+//                                      new FileSpec("pi", new FuncSpec(pigStorage)),
+//                                      null) ;
+//    
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+                fsList1.add(new FieldSchema("field2a", DataType.LONG)) ;
+                fsList1.add(new FieldSchema(null, DataType.BYTEARRAY)) ;
+                fsList1.add(new FieldSchema(null, DataType.CHARARRAY)) ;
+                inputSchema1 = new Schema(fsList1) ;
+            }
+    
+            // schema for input#2
+            Schema inputSchema2 = null ;
+            {
+                List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+                fsList2.add(new FieldSchema("field1b", DataType.DOUBLE)) ;
+                fsList2.add(new FieldSchema(null, DataType.INTEGER)) ;
+                fsList2.add(new FieldSchema("field3b", DataType.FLOAT)) ;
+                fsList2.add(new FieldSchema("field4b", DataType.CHARARRAY)) ;
+                inputSchema2 = new Schema(fsList2) ;
+            }
+    
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema(inputSchema1));
+            load2.setScriptSchema(Util.translateSchema(inputSchema2));
+    
+            // create union operator
+            ArrayList<LogicalRelationalOperator> inputList = new ArrayList<LogicalRelationalOperator>() ;
+            inputList.add(load1) ;
+            inputList.add(load2) ;
+            LOUnion union = new LOUnion(plan, false) ;
+    
+            // wiring
+            plan.add(load1) ;
+            plan.add(load2) ;
+            plan.add(union) ;
+    
+            plan.connect(load1, union);
+            plan.connect(load2, union);
+    
+            // validate
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            typeChecker.visit();  
+            printMessageCollector(collector) ;
+    
+            // check end result schema
+            Schema outputSchema = Util.translateSchema(union.getSchema()) ;
+    
+            Schema expectedSchema = null ;
+            {
+                List<FieldSchema> fsListExpected = new ArrayList<FieldSchema>() ;
+                fsListExpected.add(new FieldSchema("field1a", DataType.DOUBLE)) ;
+                fsListExpected.add(new FieldSchema("field2a", DataType.LONG)) ;
+                fsListExpected.add(new FieldSchema("field3b", DataType.FLOAT)) ;
+                fsListExpected.add(new FieldSchema("field4b", DataType.CHARARRAY)) ;
+                expectedSchema = new Schema(fsListExpected) ;
+            }
+    
+            assertTrue(Schema.equals(outputSchema, expectedSchema, true, false)) ;
+    
+            //printTypeGraph(plan) ;
+    
+            // check the inserted casting of input1
+            {
+                // Check wiring
+                List<Operator> sucList1 = plan.getSuccessors(load1);
+                assertEquals(sucList1.size(), 1);
+                LOForEach foreach = (LOForEach) sucList1.get(0) ;
+                assertTrue(foreach instanceof LOForEach) ;
+    
+                List<Operator> sucList2 = plan.getSuccessors(foreach);
+                assertEquals(sucList2.size(), 1);
+                assertTrue(sucList2.get(0) instanceof LOUnion) ;
+    
+                // Check inserted casting
+                checkForEachCasting(foreach, 0, true, DataType.DOUBLE) ;
+                checkForEachCasting(foreach, 1, false, DataType.UNKNOWN) ;
+                checkForEachCasting(foreach, 2, true, DataType.FLOAT) ;
+                checkForEachCasting(foreach, 3, false, DataType.UNKNOWN) ;
+    
+            }
+    
+            // check the inserted casting of input2
+            {
+                // Check wiring
+                List<Operator> sucList1 = plan.getSuccessors(load2) ;
+                assertEquals(sucList1.size(), 1);
+                LOForEach foreach = (LOForEach) sucList1.get(0) ;
+                assertTrue(foreach instanceof LOForEach) ;
+    
+                List<Operator> sucList2 = plan.getSuccessors(foreach) ;
+                assertEquals(sucList2.size(), 1);
+                assertTrue(sucList2.get(0) instanceof LOUnion) ;
+    
+                // Check inserted casting
+                checkForEachCasting(foreach, 0, false, DataType.UNKNOWN) ;
+                checkForEachCasting(foreach, 1, true, DataType.LONG) ;
+                checkForEachCasting(foreach, 2, false, DataType.UNKNOWN) ;
+                checkForEachCasting(foreach, 3, false, DataType.UNKNOWN) ;
+    
+            }
+    
+        }
+    //
+    //
+    //    // This tests when both only on input needs casting
+    //    //@Test
+    //    public void testUnionCastingInsert2() throws Throwable {
+    //
+    //        printCurrentMethodName();
+    //        LogicalPlan plan = new LogicalPlan() ;
+    //
+    //        String pigStorage = PigStorage.class.getName() ;
+    //
+    //        LOLoad load1 = new LOLoad(plan,
+    //                                  
+    //                                  new FileSpec("pi", new FuncSpec(pigStorage)),
+    //                                  null) ;
+    //        LOLoad load2 = new LOLoad(plan,
+    //                                  
+    //                                  new FileSpec("pi", new FuncSpec(pigStorage)),
+    //                                  null) ;
+    //
+    //        // schema for input#1
+    //        Schema inputSchema1 = null ;
+    //        {
+    //            List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+    //            fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+    //            fsList1.add(new FieldSchema("field2a", DataType.BYTEARRAY)) ;
+    //            inputSchema1 = new Schema(fsList1) ;
+    //        }
+    //
+    //        // schema for input#2
+    //        Schema inputSchema2 = null ;
+    //        {
+    //            List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+    //            fsList2.add(new FieldSchema("field1b", DataType.DOUBLE)) ;
+    //            fsList2.add(new FieldSchema("field2b", DataType.DOUBLE)) ;
+    //            inputSchema2 = new Schema(fsList2) ;
+    //        }
+    //
+    //        // set schemas
+    //        load1.setEnforcedSchema(inputSchema1) ;
+    //        load2.setEnforcedSchema(inputSchema2) ;
+    //
+    //        // create union operator
+    //        ArrayList<LogicalOperator> inputList = new ArrayList<LogicalOperator>() ;
+    //        inputList.add(load1) ;
+    //        inputList.add(load2) ;
+    //        LOUnion union = new LOUnion(plan) ;
+    //
+    //        // wiring
+    //        plan.add(load1) ;
+    //        plan.add(load2) ;
+    //        plan.add(union) ;
+    //
+    //        plan.connect(load1, union);
+    //        plan.connect(load2, union);
+    //
+    //        // validate
+    //        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+    //        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+    //        typeValidator.validate(plan, collector) ;
+    //        printMessageCollector(collector) ;
+    //
+    //        // check end result schema
+    //        Schema outputSchema = union.getSchema() ;
+    //
+    //        Schema expectedSchema = null ;
+    //        {
+    //            List<FieldSchema> fsListExpected = new ArrayList<FieldSchema>() ;
+    //            fsListExpected.add(new FieldSchema("field1a", DataType.DOUBLE)) ;
+    //            fsListExpected.add(new FieldSchema("field2a", DataType.DOUBLE)) ;
+    //            expectedSchema = new Schema(fsListExpected) ;
+    //        }
+    //
+    //        assertTrue(Schema.equals(outputSchema, expectedSchema, true, false)) ;
+    //
+    //        //printTypeGraph(plan) ;
+    //
+    //        // check the inserted casting of input1
+    //        {
+    //            // Check wiring
+    //            List<LogicalOperator> sucList1 = plan.getSuccessors(load1) ;
+    //            assertEquals(sucList1.size(), 1);
+    //            LOForEach foreach = (LOForEach) sucList1.get(0) ;
+    //            assertTrue(foreach instanceof LOForEach) ;
+    //
+    //            List<LogicalOperator> sucList2 = plan.getSuccessors(foreach) ;
+    //            assertEquals(sucList2.size(), 1);
+    //            assertTrue(sucList2.get(0) instanceof LOUnion) ;
+    //
+    //            // Check inserted casting
+    //            checkForEachCasting(foreach, 0, true, DataType.DOUBLE) ;
+    //            checkForEachCasting(foreach, 1, true, DataType.DOUBLE) ;
+    //
+    //        }
+    //
+    //        // check the inserted casting of input2
+    //        {
+    //            // Check wiring
+    //            List<LogicalOperator> sucList1 = plan.getSuccessors(load2) ;
+    //            assertEquals(sucList1.size(), 1);
+    //            assertTrue(sucList1.get(0) instanceof LOUnion) ;
+    //        }
+    //
+    //    }
+    //    
+    //    // This has to fail under strict typing mode
+    //    /*
+    //    // This is a negative test
+    //    // Two inputs cannot be merged due to incompatible schemas
+    //    @Test
+    //    public void testUnionCastingInsert3() throws Throwable {
+    //        LogicalPlan plan = new LogicalPlan() ;
+    //
+    //        String pigStorage = PigStorage.class.getName() ;
+    //
+    //        LOLoad load1 = new LOLoad(plan,
+    //                                  
+    //                                  new FileSpec("pi", new FuncSpec(pigStorage)),
+    //                                  null) ;
+    //        LOLoad load2 = new LOLoad(plan,
+    //                                  
+    //                                  new FileSpec("pi", new FuncSpec(pigStorage)),
+    //                                  null) ;
+    //
+    //        // schema for input#1
+    //        Schema inputSchema1 = null ;
+    //        {
+    //            List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+    //            fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+    //            fsList1.add(new FieldSchema("field2a", DataType.BYTEARRAY)) ;
+    //            inputSchema1 = new Schema(fsList1) ;
+    //        }
+    //
+    //        // schema for input#2
+    //        Schema inputSchema2 = null ;
+    //        {
+    //            List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+    //            fsList2.add(new FieldSchema("field1b", DataType.CHARARRAY)) ;
+    //            fsList2.add(new FieldSchema("field2b", DataType.DOUBLE)) ;
+    //            inputSchema2 = new Schema(fsList2) ;
+    //        }
+    //
+    //        // set schemas
+    //        load1.setEnforcedSchema(inputSchema1) ;
+    //        load2.setEnforcedSchema(inputSchema2) ;
+    //
+    //        // create union operator
+    //        ArrayList<LogicalOperator> inputList = new ArrayList<LogicalOperator>() ;
+    //        inputList.add(load1) ;
+    //        inputList.add(load2) ;
+    //        LOUnion union = new LOUnion(plan, inputList) ;
+    //
+    //        // wiring
+    //        plan.add(load1) ;
+    //        plan.add(load2) ;
+    //        plan.add(union) ;
+    //
+    //        plan.connect(load1, union);
+    //        plan.connect(load2, union);
+    //
+    //        // validate
+    //        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+    //        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+    //        try {
+    //            typeValidator.validate(plan, collector) ;
+    //            fail("Exception expected") ;
+    //        }
+    //        catch (TypeCheckerException pve) {
+    //            // good
+    //        }
+    //        printMessageCollector(collector) ;
+    //
+    //    }
+    //    */
+    
+        @Test
+        public void testDistinct1() throws Throwable {
+    
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+    
+            String pigStorage = PigStorage.class.getName() ;
+    
+            LOLoad load1 = new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+    
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> innerList = new ArrayList<FieldSchema>() ;
+                innerList.add(new FieldSchema("innerfield1", DataType.BAG)) ;
+                innerList.add(new FieldSchema("innerfield2", DataType.FLOAT)) ;
+                Schema innerSchema = new Schema(innerList) ;
+    
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1", DataType.INTEGER)) ;
+                fsList1.add(new FieldSchema("field2", DataType.BYTEARRAY)) ;
+                fsList1.add(new FieldSchema("field3", innerSchema)) ;
+                fsList1.add(new FieldSchema("field4", DataType.BAG)) ;
+    
+                inputSchema1 = new Schema(fsList1) ;
+            }
+    
+            // set schemas
+            load1.setSchema(Util.translateSchema(inputSchema1)) ;
+    
+            // create union operator
+            LODistinct distinct1 = new LODistinct(plan) ;
+    
+            // wiring
+            plan.add(load1) ;
+            plan.add(distinct1) ;
+    
+            plan.connect(load1, distinct1);
+    
+            // validate
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            typeChecker.visit();  
+            printMessageCollector(collector) ;
+    
+            // check end result schema
+            LogicalSchema outputSchema = distinct1.getSchema() ;
+            assertTrue(load1.getSchema().isEqual(outputSchema));
+
+
+        }
+
+        // Positive test
+        @Test
+        public void testFilterWithInnerPlan1() throws Throwable {
+            testFilterWithInnerPlan(DataType.INTEGER, DataType.LONG) ;
+        }
+
+        // Positive test
+        @Test
+        public void testFilterWithInnerPlan2() throws Throwable {
+            testFilterWithInnerPlan(DataType.INTEGER, DataType.BYTEARRAY) ;    
+        }
+
+        // Filter test helper
+        public void testFilterWithInnerPlan(byte field1Type, byte field2Type) throws Throwable {
+
+            // Create outer plan
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+
+            String pigStorage = PigStorage.class.getName() ;
+
+            LOLoad load1 = new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1", field1Type)) ;
+                fsList1.add(new FieldSchema("field2", field2Type)) ;
+
+                inputSchema1 = new Schema(fsList1) ;
+            }
+
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema(inputSchema1)) ;
+
+            // Create inner plan
+            LogicalExpressionPlan innerPlan = new LogicalExpressionPlan() ;
+            // filter
+            LOFilter filter1 = new LOFilter(plan) ;
+            filter1.setFilterPlan(innerPlan);
+            
+            ProjectExpression project1 = new ProjectExpression(innerPlan, 0, 0, filter1);
+                
+//                new ProjectExpression(innerPlan, load1, 0) ;
+  //          project1.setSentinel(true);
+            ProjectExpression project2 = new ProjectExpression(innerPlan, 0, 1, filter1); 
+                //new ProjectExpression(innerPlan, load1, 1) ;
+            //project2.setSentinel(true);
+
+            GreaterThanExpression gt1 = new GreaterThanExpression(innerPlan, project1, project2);
+            
+            plan.add(load1);
+            plan.add(filter1);
+            plan.connect(load1, filter1) ;
+
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            typeChecker.visit();  
+            printMessageCollector(collector) ;
+            //printTypeGraph(plan) ;
+
+            if (collector.hasError()) {
+                throw new AssertionError("Expect no error") ;
+            }
+
+            LogicalSchema endResultSchema = filter1.getSchema() ;
+            assertEquals(endResultSchema.getField(0).type, field1Type) ;
+            assertEquals(endResultSchema.getField(1).type, field2Type) ;
+
+        }
+
+        // Negative test
+        @Test
+        public void testFilterWithInnerPlan3() throws Throwable {
+
+            // Create outer plan
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+
+            String pigStorage = PigStorage.class.getName() ;
+
+            LOLoad load1 = new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+            
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1", DataType.INTEGER)) ;
+                fsList1.add(new FieldSchema("field2", DataType.LONG)) ;
+
+                inputSchema1 = new Schema(fsList1) ;
+            }
+
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema((inputSchema1))) ;
+
+            // Create inner plan
+            LogicalExpressionPlan innerPlan = new LogicalExpressionPlan() ;
+            
+            // filter
+            LOFilter filter1 = new LOFilter(plan) ;
+            filter1.setFilterPlan(innerPlan);
+            
+            ProjectExpression project1 = new ProjectExpression(innerPlan, 0, 0, filter1) ;
+            ProjectExpression project2 = new ProjectExpression(innerPlan, 0, 1, filter1) ;
+
+            AddExpression add1 = new AddExpression(innerPlan, project1, project2) ;
+
+            plan.add(load1);
+            plan.add(filter1);
+            plan.connect(load1, filter1) ;
+
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            try {
+                typeChecker.visit();  
+            }
+            catch (Exception t) {
+                // good
+            }
+            printMessageCollector(collector) ;
+            //printTypeGraph(plan) ;
+
+            if (!collector.hasError()) {
+                throw new AssertionError("Expect error") ;
+            }
+
+        }
+    
+    
+        // Simple project sort columns
+        @Test
+        public void testSortWithInnerPlan1() throws Throwable {
+    
+            // Create outer plan
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+    
+            String pigStorage = PigStorage.class.getName() ;
+    
+            LOLoad load1 = new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+
+    
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1", DataType.LONG)) ;
+                fsList1.add(new FieldSchema("field2", DataType.INTEGER)) ;
+    
+                inputSchema1 = new Schema(fsList1) ;
+            }
+    
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+    
+            // Create project inner plan #1
+            LogicalExpressionPlan innerPlan1 = new LogicalExpressionPlan() ;
+            // Sort
+            LOSort sort1 = new LOSort(plan);
+            ProjectExpression project1 = new ProjectExpression(innerPlan1, 0, 1, sort1) ;
+            
+            innerPlan1.add(project1) ;
+    
+            // Create project inner plan #2
+            LogicalExpressionPlan innerPlan2 = new LogicalExpressionPlan() ;
+            ProjectExpression project2 = new ProjectExpression(innerPlan2, 0, 0, sort1) ;
+            
+            innerPlan2.add(project2) ;
+    
+            // List of innerplans
+            List<LogicalExpressionPlan> innerPlans = new ArrayList<LogicalExpressionPlan>() ;
+            innerPlans.add(innerPlan1) ;
+            innerPlans.add(innerPlan2) ;
+    
+            // List of ASC flags
+            List<Boolean> ascList = new ArrayList<Boolean>() ;
+            ascList.add(true);
+            ascList.add(true);
+            
+            sort1.setAscendingCols(ascList);
+            sort1.setSortColPlans(innerPlans);
+    
+            plan.add(load1);
+            plan.add(sort1);
+            plan.connect(load1, sort1) ;
+    
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            typeChecker.visit();  
+            printMessageCollector(collector) ;
+            //printTypeGraph(plan) ;
+    
+            if (collector.hasError()) {
+                throw new AssertionError("Expect no error") ;
+            }
+    
+            LogicalSchema endResultSchema = sort1.getSchema() ;
+    
+            // outer
+            assertEquals(endResultSchema.getField(0).type, DataType.LONG) ;
+            assertEquals(endResultSchema.getField(1).type, DataType.INTEGER) ;
+    
+            // inner
+            assertEquals(getSingleOutput(innerPlan1).getType(), DataType.INTEGER);
+            assertEquals(getSingleOutput(innerPlan2).getType(), DataType.LONG);
+    
+        }
+        
+        
+    
+        private LogicalExpression getSingleOutput(LogicalExpressionPlan innerPlan1) {
+            List<Operator> outputs = innerPlan1.getSources();
+            assertEquals("number of outputs in exp plan", outputs.size(),1);
+            return (LogicalExpression)outputs.get(0);
+        }
+
+        // Positive expression sort columns
+        @Test
+        public void testSortWithInnerPlan2() throws Throwable {
+    
+            // Create outer plan
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+    
+            String pigStorage = PigStorage.class.getName() ;
+    
+            LOLoad load1 =  new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1", DataType.BYTEARRAY)) ;
+                fsList1.add(new FieldSchema("field2", DataType.INTEGER)) ;
+    
+                inputSchema1 = new Schema(fsList1) ;
+            }
+    
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+    
+            LOSort sort1 = new LOSort(plan) ;
+
+            
+            // Create expression inner plan #1
+            LogicalExpressionPlan innerPlan1 = new LogicalExpressionPlan() ;
+            ProjectExpression project11 = new ProjectExpression(innerPlan1, 0, 0, sort1) ;
+            ProjectExpression project12 = new ProjectExpression(innerPlan1, 0, 1, sort1) ;
+     
+            MultiplyExpression mul1 = new MultiplyExpression(innerPlan1, project11, project12) ;
+    
+            // Create expression inner plan #2
+            LogicalExpressionPlan innerPlan2 = new LogicalExpressionPlan() ;
+            ProjectExpression project21 = new ProjectExpression(innerPlan2, 0, 0, sort1) ;
+
+            ConstantExpression const21 = new ConstantExpression(innerPlan2, 26L) ;
+            ModExpression mod21 = new ModExpression(innerPlan2, project21, const21) ;
+    
+            // List of innerplans
+            List<LogicalExpressionPlan> innerPlans = new ArrayList<LogicalExpressionPlan>() ;
+            innerPlans.add(innerPlan1) ;
+            innerPlans.add(innerPlan2) ;
+    
+            // List of ASC flags
+            List<Boolean> ascList = new ArrayList<Boolean>() ;
+            ascList.add(true);
+            ascList.add(true);
+    
+            // Sort
+            sort1.setAscendingCols(ascList);
+            sort1.setSortColPlans(innerPlans);
+    
+            plan.add(load1);
+            plan.add(sort1);
+            plan.connect(load1, sort1) ;
+    
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            typeChecker.visit();  
+            printMessageCollector(collector) ;
+            //printTypeGraph(plan) ;
+    
+            if (collector.hasError()) {
+                throw new AssertionError("Expect no error") ;
+            }
+    
+            LogicalSchema endResultSchema = sort1.getSchema() ;
+    
+            // outer
+            assertEquals(endResultSchema.getField(0).type, DataType.BYTEARRAY) ;
+            assertEquals(endResultSchema.getField(1).type, DataType.INTEGER) ;
+    
+            // inner
+            assertEquals(getSingleOutput(innerPlan1).getType(), DataType.INTEGER);
+            assertEquals(getSingleOutput(innerPlan2).getType(), DataType.LONG);
+    
+        }
+        
+        // Negative test on expression sort columns
+        @Test
+        public void testSortWithInnerPlan3() throws Throwable {
+    
+            // Create outer plan
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+    
+            String pigStorage = PigStorage.class.getName() ;
+    
+            LOLoad load1 =  new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+
+    
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1", DataType.BYTEARRAY)) ;
+                fsList1.add(new FieldSchema("field2", DataType.INTEGER)) ;
+    
+                inputSchema1 = new Schema(fsList1) ;
+            }
+    
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+    
+            
+            // Sort
+            LOSort sort1 = new LOSort(plan) ;
+            
+            // Create expression inner plan #1
+            LogicalExpressionPlan innerPlan1 = new LogicalExpressionPlan() ;
+            ProjectExpression project11 = new ProjectExpression(innerPlan1, 0, 0, sort1) ;
+            ProjectExpression project12 = new ProjectExpression(innerPlan1, 0, 1, sort1) ;
+            MultiplyExpression mul1 = new MultiplyExpression(innerPlan1, project11, project12) ;
+
+    
+            // Create expression inner plan #2
+            LogicalExpressionPlan innerPlan2 = new LogicalExpressionPlan() ;
+            ProjectExpression project21 = new ProjectExpression(innerPlan2, 0, 0, sort1) ;
+            ConstantExpression const21 = new ConstantExpression(innerPlan2, "26") ;
+            ModExpression mod21 = new ModExpression(innerPlan2, project21, const21) ;
+    
+            // List of innerplans
+            List<LogicalExpressionPlan> innerPlans = new ArrayList<LogicalExpressionPlan>() ;
+            innerPlans.add(innerPlan1) ;
+            innerPlans.add(innerPlan2) ;
+    
+            // List of ASC flags
+            List<Boolean> ascList = new ArrayList<Boolean>() ;
+            ascList.add(true);
+            ascList.add(true);
+
+            // Sort
+            sort1.setAscendingCols(ascList);
+            sort1.setSortColPlans(innerPlans);
+    
+            plan.add(load1);
+            plan.add(sort1);
+            plan.connect(load1, sort1) ;
+    
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            try {
+                typeChecker.visit();  
+                fail("Error expected") ;
+            }
+            catch (Exception t) {
+                // good
+            }
+            printMessageCollector(collector) ;
+            //printTypeGraph(plan) ;
+    
+            if (!collector.hasError()) {
+                throw new AssertionError("Error expected") ;
+            }
+    
+        }
+
+
+        // Positive expression cond columns
+        @Test
+        public void testSplitWithInnerPlan1() throws Throwable {
+
+            // Create outer plan
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+
+            String pigStorage = PigStorage.class.getName() ;
+
+            LOLoad load1 =  new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1", DataType.BYTEARRAY)) ;
+                fsList1.add(new FieldSchema("field2", DataType.INTEGER)) ;
+
+                inputSchema1 = new Schema(fsList1) ;
+            }
+
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+
+            
+            // split
+            LOSplit split1 = new LOSplit(plan);
+
+            // output1
+            LOSplitOutput splitOutput1 = new LOSplitOutput(plan) ;
+
+            // output2
+            LOSplitOutput splitOutput2 = new LOSplitOutput(plan) ;
+            
+            // Create expression inner plan #1
+            LogicalExpressionPlan innerPlan1 = new LogicalExpressionPlan() ;
+            ProjectExpression project11 = new ProjectExpression(innerPlan1, 0, 0, splitOutput1) ;
+            
+            ProjectExpression project12 = new ProjectExpression(innerPlan1, 0, 1, splitOutput1) ;
+            
+            NotEqualExpression notequal1 = new NotEqualExpression(innerPlan1, project11, project12) ;
+
+//            innerPlan1.add(project11) ;
+//            innerPlan1.add(project12) ;
+//            innerPlan1.add(notequal1) ;
+//
+//            innerPlan1.connect(project11, notequal1);
+//            innerPlan1.connect(project12, notequal1);
+
+            // Create expression inner plan #2
+            LogicalExpressionPlan innerPlan2 = new LogicalExpressionPlan() ;
+            ProjectExpression project21 = new ProjectExpression(innerPlan2, 0, 0, splitOutput2) ;
+            
+            ConstantExpression const21 = new ConstantExpression(innerPlan2, 26L) ;
+            LessThanEqualExpression lesser21 = new LessThanEqualExpression(innerPlan2, project21, const21) ;
+//
+//            innerPlan2.add(project21) ;
+//            innerPlan2.add(const21) ;
+//            innerPlan2.add(lesser21) ;
+//
+//            innerPlan2.connect(project21, lesser21);
+//            innerPlan2.connect(const21, lesser21) ;
+
+
+            splitOutput1.setFilterPlan(innerPlan1);
+            splitOutput2.setFilterPlan(innerPlan2);
+
+            plan.add(load1);
+            plan.add(split1);
+            plan.add(splitOutput1);
+            plan.add(splitOutput2);
+
+            plan.connect(load1, split1) ;
+            plan.connect(split1, splitOutput1) ;
+            plan.connect(split1, splitOutput2) ;
+
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            typeChecker.visit();  
+            printMessageCollector(collector) ;
+            //printTypeGraph(plan) ;
+
+            if (collector.hasError()) {
+                throw new AssertionError("Expect no error") ;
+            }
+
+            // check split itself
+            {
+                LogicalSchema endResultSchema1 = split1.getSchema() ;
+                // outer
+                assertEquals(endResultSchema1.getField(0).type, DataType.BYTEARRAY) ;
+                assertEquals(endResultSchema1.getField(1).type, DataType.INTEGER) ;
+            }
+
+            // check split output #1
+            {
+                LogicalSchema endResultSchema1 = splitOutput1.getSchema() ;
+                // outer
+                assertEquals(endResultSchema1.getField(0).type, DataType.BYTEARRAY) ;
+                assertEquals(endResultSchema1.getField(1).type, DataType.INTEGER) ;
+            }
+
+            // check split output #2
+            {
+                LogicalSchema endResultSchema2 = splitOutput2.getSchema() ;
+                // outer
+                assertEquals(endResultSchema2.getField(0).type, DataType.BYTEARRAY) ;
+                assertEquals(endResultSchema2.getField(1).type, DataType.INTEGER) ;
+            }
+
+            // inner conditions: all have to be boolean
+            assertEquals(getSingleOutput(innerPlan1).getType(), DataType.BOOLEAN);
+            assertEquals(getSingleOutput(innerPlan2).getType(), DataType.BOOLEAN);
+
+        }
+
+        // Negative test: expression cond columns not evaluate to boolean
+        @Test
+        public void testSplitWithInnerPlan2() throws Throwable {
+
+            // Create outer plan
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+
+            String pigStorage = PigStorage.class.getName() ;
+
+            LOLoad load1 =  new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1", DataType.BYTEARRAY)) ;
+                fsList1.add(new FieldSchema("field2", DataType.INTEGER)) ;
+
+                inputSchema1 = new Schema(fsList1) ;
+            }
+
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+
+            // split
+            LOSplit split1 = new LOSplit(plan);
+
+            // output1
+            LOSplitOutput splitOutput1 = new LOSplitOutput(plan) ;
+
+            // output2
+            LOSplitOutput splitOutput2 = new LOSplitOutput(plan) ;          
+            
+            // Create expression inner plan #1
+            LogicalExpressionPlan innerPlan1 = new LogicalExpressionPlan() ;
+            ProjectExpression project11 = new ProjectExpression(innerPlan1, 0, 0, splitOutput1) ;
+            
+            ProjectExpression project12 = new ProjectExpression(innerPlan1, 0, 1, splitOutput1) ;
+            
+            NotEqualExpression notequal1 = new NotEqualExpression(innerPlan1, project11, project12) ;
+
+//            innerPlan1.add(project11) ;
+//            innerPlan1.add(project12) ;
+//            innerPlan1.add(notequal1) ;
+//
+//            innerPlan1.connect(project11, notequal1);
+//            innerPlan1.connect(project12, notequal1);
+
+            // Create expression inner plan #2
+            LogicalExpressionPlan innerPlan2 = new LogicalExpressionPlan() ;
+            ProjectExpression project21 = new ProjectExpression(innerPlan2, 0, 0, splitOutput1) ;
+            
+            ConstantExpression const21 = 
+                new ConstantExpression(innerPlan2, 26L) ;
+            
+            SubtractExpression subtract21 = 
+                new SubtractExpression(innerPlan2, project21, const21) ;
+
+//            innerPlan2.add(project21) ;
+//            innerPlan2.add(const21) ;
+//            innerPlan2.add(subtract21) ;
+//
+//            innerPlan2.connect(project21, subtract21);
+//            innerPlan2.connect(const21, subtract21) ;
+
+            splitOutput1.setFilterPlan(innerPlan1);
+            splitOutput2.setFilterPlan(innerPlan2);
+
+            plan.add(load1);
+            plan.add(split1);
+            plan.add(splitOutput1);
+            plan.add(splitOutput2);
+
+            plan.connect(load1, split1) ;
+            plan.connect(split1, splitOutput1) ;
+            plan.connect(split1, splitOutput2) ;
+
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            try {
+                typeChecker.visit();  
+            }
+            catch (Exception t) {
+                // good
+            }
+            printMessageCollector(collector) ;
+            //printTypeGraph(plan) ;
+
+            if (!collector.hasError()) {
+                throw new AssertionError("Error expected") ;
+            }
+
+        }
+    
+        // Positive test
+        @Test
+        public void testCOGroupWithInnerPlan1GroupByTuple1() throws Throwable {
+    
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+    
+            String pigStorage = PigStorage.class.getName() ;
+    
+            LOLoad load1 =  new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+            
+            LOLoad load2 =  new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+              
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+                fsList1.add(new FieldSchema("field2a", DataType.LONG)) ;
+                inputSchema1 = new Schema(fsList1) ;
+            }
+    
+            // schema for input#2
+            Schema inputSchema2 = null ;
+            {
+                List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+                fsList2.add(new FieldSchema("field1b", DataType.DOUBLE)) ;
+                fsList2.add(new FieldSchema(null, DataType.INTEGER)) ;
+                inputSchema2 = new Schema(fsList2) ;
+            }
+    
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema((inputSchema1))) ;
+            load2.setScriptSchema(Util.translateSchema((inputSchema2))) ;
+            load2.setScriptSchema(Util.translateSchema((inputSchema2))) ;
+    
+            LOCogroup cogroup1 = new LOCogroup(plan);
+            
+            // Create expression inner plan #1 of input #1
+            LogicalExpressionPlan innerPlan11 = new LogicalExpressionPlan() ;
+            ProjectExpression project111 = new ProjectExpression(innerPlan11, 0, 0, cogroup1) ;
+            ConstantExpression const111 = new ConstantExpression(innerPlan11, 26F) ;
+            SubtractExpression subtract111 = new SubtractExpression(innerPlan11, project111, const111) ;
+    
+//            innerPlan11.add(project111) ;
+//            innerPlan11.add(const111) ;
+//            innerPlan11.add(subtract111) ;
+//    
+//            innerPlan11.connect(project111, subtract111);
+//            innerPlan11.connect(const111, subtract111) ;
+    
+            // Create expression inner plan #2 of input #1
+            LogicalExpressionPlan innerPlan21 = new LogicalExpressionPlan() ;
+            ProjectExpression project211 = new ProjectExpression(innerPlan21, 0, 0, cogroup1) ;
+            ProjectExpression project212 = new ProjectExpression(innerPlan21, 0, 1, cogroup1) ;
+    
+            AddExpression add211 = new AddExpression(innerPlan21, project211, project212) ;
+    
+//            innerPlan21.add(project211) ;
+//            innerPlan21.add(project212) ;
+//            innerPlan21.add(add211) ;
+//    
+//            innerPlan21.connect(project211, add211);
+//            innerPlan21.connect(project212, add211) ;
+   
+            // Create expression inner plan #1 of input #2
+            LogicalExpressionPlan innerPlan12 = new LogicalExpressionPlan() ;
+            ProjectExpression project121 = new ProjectExpression(innerPlan12, 1, 0, cogroup1) ;
+            ConstantExpression const121 = new ConstantExpression(innerPlan12, 26) ;
+            SubtractExpression subtract121 = new SubtractExpression(innerPlan12, project121, const121) ;
+    
+//            innerPlan12.add(project121) ;
+//            innerPlan12.add(const121) ;
+//            innerPlan12.add(subtract121) ;
+//    
+//            innerPlan12.connect(project121, subtract121);
+//            innerPlan12.connect(const121, subtract121) ;
+    
+            // Create expression inner plan #2 of input #2
+            LogicalExpressionPlan innerPlan22 = new LogicalExpressionPlan() ;
+            ConstantExpression const122 = new ConstantExpression(innerPlan22, 26) ;
+//            innerPlan22.add(const122) ;
+    
+            // Create Cogroup
+            ArrayList<LogicalRelationalOperator> inputs = new ArrayList<LogicalRelationalOperator>() ;
+            inputs.add(load1) ;
+            inputs.add(load2) ;
+    
+            MultiMap<Integer, LogicalExpressionPlan> maps
+                                = new MultiMap<Integer, LogicalExpressionPlan>() ;
+            maps.put(0, innerPlan11);
+            maps.put(0, innerPlan21);
+            maps.put(1, innerPlan12);
+            maps.put(1, innerPlan22);
+    
+            boolean[] isInner = new boolean[inputs.size()] ;
+            for (int i=0; i < isInner.length ; i++) {
+                isInner[i] = false ;
+            }
+    
+            cogroup1.setInnerFlags(isInner);
+            cogroup1.setExpressionPlans(maps);
+    
+            // construct the main plan
+            plan.add(load1) ;
+            plan.add(load2) ;
+            plan.add(cogroup1) ;
+    
+            plan.connect(load1, cogroup1);
+            plan.connect(load2, cogroup1);
+    
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            typeChecker.visit();  
+            printMessageCollector(collector) ;
+            //printTypeGraph(plan) ;
+    
+            if (collector.hasError()) {
+                throw new AssertionError("Expect no error") ;
+            }
+            
+            // check outer schema
+            LogicalSchema endResultSchema = cogroup1.getSchema() ;
+    
+            // Tuple group column
+            assertEquals(endResultSchema.getField(0).type, DataType.TUPLE) ;
+            assertEquals(endResultSchema.getField(0).schema.getField(0).type, DataType.DOUBLE) ;
+            assertEquals(endResultSchema.getField(0).schema.getField(1).type, DataType.LONG);
+    
+            assertEquals(endResultSchema.getField(1).type, DataType.BAG) ;
+            assertEquals(endResultSchema.getField(2).type, DataType.BAG) ;
+    
+            // check inner schema1
+            LogicalSchema innerSchema1 = endResultSchema.getField(1).schema.getField(0).schema ;
+            assertEquals(innerSchema1.getField(0).type, DataType.INTEGER);
+            assertEquals(innerSchema1.getField(1).type, DataType.LONG);
+    
+            // check inner schema2
+            LogicalSchema innerSchema2 = endResultSchema.getField(2).schema.getField(0).schema ;
+            assertEquals(innerSchema2.getField(0).type, DataType.DOUBLE);
+            assertEquals(innerSchema2.getField(1).type, DataType.INTEGER);
+    
+            // check group by col end result
+            assertEquals(getSingleOutput(innerPlan11).getType(), DataType.DOUBLE) ;
+            assertEquals(getSingleOutput(innerPlan21).getType(), DataType.LONG) ;
+            assertEquals(getSingleOutput(innerPlan12).getType(), DataType.DOUBLE) ;
+            assertEquals(getSingleOutput(innerPlan22).getType(), DataType.LONG) ;
+        }
+    
+    
+        // Positive test
+        @Test
+        public void testCOGroupWithInnerPlan1GroupByAtom1() throws Throwable {
+    
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+    
+            String pigStorage = PigStorage.class.getName() ;
+            LOLoad load1 =  new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+            
+            LOLoad load2 =  new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+                fsList1.add(new FieldSchema("field2a", DataType.LONG)) ;
+                inputSchema1 = new Schema(fsList1) ;
+            }
+    
+            // schema for input#2
+            Schema inputSchema2 = null ;
+            {
+                List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+                fsList2.add(new FieldSchema("field1b", DataType.DOUBLE)) ;
+                fsList2.add(new FieldSchema(null, DataType.INTEGER)) ;
+                inputSchema2 = new Schema(fsList2) ;
+            }
+    
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema((inputSchema1))) ; ;
+            load2.setScriptSchema(Util.translateSchema((inputSchema2))) ;
+    
+            LOCogroup cogroup1 = new LOCogroup(plan);
+                    
+            // Create expression inner plan #1 of input #1
+            LogicalExpressionPlan innerPlan11 = new LogicalExpressionPlan() ;
+            ProjectExpression project111 = new ProjectExpression(innerPlan11, 0, 0, cogroup1) ;
+            ConstantExpression const111 = new ConstantExpression(innerPlan11, 26F) ;
+            SubtractExpression subtract111 = new SubtractExpression(innerPlan11, project111, const111) ;
+    
+//            innerPlan11.add(project111) ;
+//            innerPlan11.add(const111) ;
+//            innerPlan11.add(subtract111) ;
+//    
+//            innerPlan11.connect(project111, subtract111);
+//            innerPlan11.connect(const111, subtract111) ;
+    
+            // Create expression inner plan #1 of input #2
+            LogicalExpressionPlan innerPlan12 = new LogicalExpressionPlan() ;
+            ProjectExpression project121 = new ProjectExpression(innerPlan12, 1, 0, cogroup1) ;
+            ConstantExpression const121 = new ConstantExpression(innerPlan12, 26) ;
+            SubtractExpression subtract121 = new SubtractExpression(innerPlan12, project121, const121) ;
+    
+//            innerPlan12.add(project121) ;
+//            innerPlan12.add(const121) ;
+//            innerPlan12.add(subtract121) ;
+//    
+//            innerPlan12.connect(project121, subtract121);
+//            innerPlan12.connect(const121, subtract121) ;
+    
+            // Create Cogroup
+            ArrayList<LogicalRelationalOperator> inputs = new ArrayList<LogicalRelationalOperator>() ;
+            inputs.add(load1) ;
+            inputs.add(load2) ;
+    
+            MultiMap<Integer, LogicalExpressionPlan> maps
+                                = new MultiMap<Integer, LogicalExpressionPlan>() ;
+            maps.put(0, innerPlan11);
+            maps.put(1, innerPlan12);
+    
+            boolean[] isInner = new boolean[inputs.size()] ;
+            for (int i=0; i < isInner.length ; i++) {
+                isInner[i] = false ;
+            }
+    
+            
+            cogroup1.setInnerFlags(isInner);
+            cogroup1.setExpressionPlans(maps);
+    
+            // construct the main plan
+            plan.add(load1) ;
+            plan.add(load2) ;
+            plan.add(cogroup1) ;
+    
+            plan.connect(load1, cogroup1);
+            plan.connect(load2, cogroup1);
+    
+            CompilationMessageCollector collector = new CompilationMessageCollector() ;
+            TypeCheckingRelVisitor typeChecker = new TypeCheckingRelVisitor(plan, collector);
+            typeChecker.visit();  
+            printMessageCollector(collector) ;
+            //printTypeGraph(plan) ;
+    
+            if (collector.hasError()) {
+                throw new AssertionError("Expect no error") ;
+            }
+    
+            // check outer schema
+            LogicalSchema endResultSchema = cogroup1.getSchema() ;
+    
+            // Tuple group column
+            assertEquals(endResultSchema.getField(0).type, DataType.DOUBLE) ;
+    
+            assertEquals(endResultSchema.getField(1).type, DataType.BAG) ;
+            assertEquals(endResultSchema.getField(2).type, DataType.BAG) ;
+    
+            // check inner schema1
+            LogicalSchema innerSchema1 = endResultSchema.getField(1).schema.getField(0).schema ;
+            assertEquals(innerSchema1.getField(0).type, DataType.INTEGER);
+            assertEquals(innerSchema1.getField(1).type, DataType.LONG);
+    
+            // check inner schema2
+            LogicalSchema innerSchema2 = endResultSchema.getField(2).schema.getField(0).schema ;
+            assertEquals(innerSchema2.getField(0).type, DataType.DOUBLE);
+            assertEquals(innerSchema2.getField(1).type, DataType.INTEGER);
+    
+            // check group by col end result
+            assertEquals(getSingleOutput(innerPlan11).getType(), DataType.DOUBLE) ;
+            assertEquals(getSingleOutput(innerPlan12).getType(), DataType.DOUBLE) ;
+        }
+    
+    
+        // Positive test
+        @Test
+        public void testCOGroupWithInnerPlan1GroupByIncompatibleAtom1() throws Throwable {
+    
+            printCurrentMethodName();
+            LogicalPlan plan = new LogicalPlan() ;
+    
+            String pigStorage = PigStorage.class.getName() ;
+    
+            LOLoad load1 =  new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+            
+            LOLoad load2 =  new LOLoad(
+                    new FileSpec("pi", new FuncSpec(pigStorage)),
+                    null, plan, null
+            );
+    
+            // schema for input#1
+            Schema inputSchema1 = null ;
+            {
+                List<FieldSchema> fsList1 = new ArrayList<FieldSchema>() ;
+                fsList1.add(new FieldSchema("field1a", DataType.INTEGER)) ;
+                fsList1.add(new FieldSchema("field2a", DataType.LONG)) ;
+                inputSchema1 = new Schema(fsList1) ;
+            }
+    
+            // schema for input#2
+            Schema inputSchema2 = null ;
+            {
+                List<FieldSchema> fsList2 = new ArrayList<FieldSchema>() ;
+                fsList2.add(new FieldSchema("field1b", DataType.DOUBLE)) ;
+                fsList2.add(new FieldSchema(null, DataType.INTEGER)) ;
+                inputSchema2 = new Schema(fsList2) ;
+            }
+    
+            // set schemas
+            load1.setScriptSchema(Util.translateSchema((inputSchema1))) ;
+            load2.setScriptSchema(Util.translateSchema((inputSchema2))) ;
+    
+            LOCogroup cogroup1 = new LOCogroup(plan);
+            // Create expression inner plan #1
+            LogicalExpressionPlan innerPlan11 = new LogicalExpressionPlan() ;
+            ProjectExpression project111 = new ProjectExpression(innerPlan11, 0, 0, cogroup1) ;
+            ConstantExpression const111 = new ConstantExpression(innerPlan11, 26F) ;
+            SubtractExpression subtract111 = new SubtractExpression(innerPlan11, project111, const111) ;
+    
+//            innerPlan11.add(project111) ;
+//            innerPlan11.add(const111) ;
+//            innerPlan11.add(subtract111) ;
+//    
+//            innerPlan11.connect(project111, subtract111);
+//            innerPlan11.connect(const111, subtract111) ;
+    
+            // Create expression inner plan #2
+            LogicalExpressionPlan innerPlan12 = new LogicalExpressionPlan() ;
+            ConstantExpression const121 = new ConstantExpression(innerPlan12, 26) ;
+//            innerPlan12.add(const121) ;
+    
+            // Create Cogroup
+            ArrayList<LogicalRelationalOperator> inputs = new ArrayList<LogicalRelationalOperator>() ;
+            inputs.add(load1) ;
+            inputs.add(load2) ;
+    
+            MultiMap<Integer, LogicalExpressionPlan> maps
+                                = new MultiMap<Integer, LogicalExpressionPlan>() ;
+            maps.put(0, innerPlan11);
+            maps.put(1, innerPlan12);
+    
+            boolean[] isInner = new boolean[inputs.size()] ;
+            for (int i=0; i < isInner.length ; i++) {

[... 3083 lines stripped ...]