You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/22 19:12:58 UTC

svn commit: r659161 [2/4] - in /incubator/pig/branches/types: ./ src/org/apache/pig/data/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/schema/ src/org/apache/pig/impl/logicalLayer/validators/ test/org/apache/pig/test/ test...

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu May 22 10:12:56 2008
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.pig.impl.logicalLayer.validators;
 
 import java.util.Iterator;
@@ -5,20 +23,13 @@
 import java.util.ArrayList;
 
 import org.apache.pig.impl.logicalLayer.LOConst;
-import org.apache.pig.impl.logicalLayer.LOEqual;
-import org.apache.pig.impl.logicalLayer.LOGreaterThan;
-import org.apache.pig.impl.logicalLayer.LOGreaterThanEqual;
-import org.apache.pig.impl.logicalLayer.LOLesserThan;
-import org.apache.pig.impl.logicalLayer.LOLesserThanEqual;
-import org.apache.pig.impl.logicalLayer.LOMod;
-import org.apache.pig.impl.logicalLayer.LONegative;
-import org.apache.pig.impl.logicalLayer.LONotEqual;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 
 import org.apache.pig.impl.logicalLayer.* ;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType ;
 import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
@@ -37,9 +48,11 @@
 public class TypeCheckingVisitor extends LOVisitor {
 
     private static final Log log = LogFactory.getLog(TypeCheckingVisitor.class);
-    
+
     private CompilationMessageCollector msgCollector = null ;
-    
+
+    private boolean strictMode = false ;
+
     public TypeCheckingVisitor(LogicalPlan plan,
                         CompilationMessageCollector messageCollector) {
         super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
@@ -52,7 +65,7 @@
                                 throws VisitorException {
         if (eOp instanceof BinaryExpressionOperator) {
             visit((BinaryExpressionOperator) eOp) ;
-        } 
+        }
         else if (eOp instanceof UnaryExpressionOperator) {
             visit((UnaryExpressionOperator) eOp) ;
         }
@@ -112,20 +125,144 @@
         else if (lOp instanceof LOSort) {
             visit((LOSort) lOp) ;
         }
+        else if (lOp instanceof LOForEach) {
+            visit((LOForEach) lOp) ;
+        }
+        else if (lOp instanceof LOGenerate) {
+            visit((LOGenerate) lOp) ;
+        }
+        else if (lOp instanceof LOCross) {
+            visit((LOCross) lOp) ;
+        }
         // TODO: Check that all operators are included here
     }
 
 
 
     protected void visit(LOProject pj) throws VisitorException {
+        resolveLOProjectType(pj) ;
+    }
+
+    private void resolveLOProjectType(LOProject pj) throws VisitorException {
+
         try {
             pj.getFieldSchema() ;
         }
         catch (FrontendException fe) {
-            VisitorException vse = new VisitorException("Problem in LOProject") ;
+            String msg = "Error getting LOProject's input schema" ;
+            msgCollector.collect(msg, MessageType.Error);
+            VisitorException vse = new VisitorException(msg) ;
             vse.initCause(fe) ;
-            throw vse ;
+            throw new VisitorException(msg) ;
+
+        }
+
+        /*
+        if (!pj.getSentinel()) {
+
+            LogicalOperator op = pj.getExpression() ;
+
+            if (!(op instanceof LOProject)) {
+                throw new AssertionError("LOProject.getExpression() has to be "
+                                         + "LOProject if it's not a sentinel") ;
+            }
+
+            // else
+            LOProject innerProject = (LOProject) op ;
+            resolveLOProjectType(innerProject) ;
+
+            if ( (innerProject.getType() != DataType.BAG) &&
+                 (innerProject.getType() != DataType.TUPLE) ) {
+                throw new AssertionError("Nested LOProject is for extracting "
+                                         + " from TUPLE/BAG only") ;
+            }
+
+            // set type of this project
+            pj.setType(innerProject.getType());
+            Schema inputSchema = null ;
+
+            try {
+                inputSchema = innerProject.getSchema() ;
+            }
+            catch (FrontendException fe) {
+                String msg = "Cannot get source schema into LOProject" ;
+                msgCollector.collect(msg, MessageType.Error);
+                VisitorException vse = new VisitorException(msg) ;
+                vse.initCause(fe) ;
+                throw new VisitorException(msg) ;
+            }
+
+            // extracting schema from projection
+            List<FieldSchema> fsList = new ArrayList<FieldSchema>() ;
+            try {
+                for(int index: pj.getProjection()) {
+                    FieldSchema fs = null ;
+                    // typed input
+                    if (inputSchema != null) {
+                        fs = inputSchema.getField(index) ;
+                        FieldSchema newFs = new FieldSchema(fs.alias, fs.schema, fs.type) ;
+                        fsList.add(newFs) ;
+                    }
+                    // non-typed input
+                    else {
+                        FieldSchema newFs = new FieldSchema(null, DataType.BYTEARRAY) ;
+                        fsList.add(newFs) ;
+                    }
+                }
+                pj.setFieldSchema(new FieldSchema(null, new Schema(fsList), innerProject.getType()));
+            }
+            catch (FrontendException fe) {
+                String msg = "Cannot get source schema into LOProject" ;
+                msgCollector.collect(msg, MessageType.Error);
+                VisitorException vse = new VisitorException(msg) ;
+                vse.initCause(fe) ;
+                throw new VisitorException(msg) ;
+            }
+            catch (ParseException pe) {
+                String msg = "Cannot get source schema into LOProject" ;
+                msgCollector.collect(msg, MessageType.Error);
+                VisitorException vse = new VisitorException(msg) ;
+                vse.initCause(pe) ;
+                throw new VisitorException(msg) ;
+            }
         }
+        // if it's a sentinel, we just get the projected input type to it
+        else {
+            if (pj.getProjection().size() != 1) {
+                throw new AssertionError("Sentinel LOProject can have only "
+                                         + "1 projection") ;
+            }
+            LogicalOperator input = pj.getExpression() ;
+            int projectedField = pj.getProjection().get(0) ;
+            try {
+                Schema schema = input.getSchema() ;
+
+                if (schema != null) {
+                    FieldSchema fs = schema.getField(projectedField) ;
+                    pj.setFieldSchema(fs);
+                }
+                else {
+                    FieldSchema fs = new FieldSchema(null, DataType.BYTEARRAY) ;
+                    pj.setFieldSchema(fs);
+                }
+            }
+            catch (FrontendException fe) {
+                String msg = "Cannot get source schema into LOProject" ;
+                msgCollector.collect(msg, MessageType.Error);
+                VisitorException vse = new VisitorException(msg) ;
+                vse.initCause(fe) ;
+                throw new VisitorException(msg) ;
+            }
+            catch (ParseException pe) {
+                String msg = "Cannot get source schema into LOProject" ;
+                msgCollector.collect(msg, MessageType.Error);
+                VisitorException vse = new VisitorException(msg) ;
+                vse.initCause(pe) ;
+                throw new VisitorException(msg) ;
+            }
+        }
+        */
+        
     }
 
     /**
@@ -172,7 +309,7 @@
         try {
             currentPlan.connect(rg.getOperand(), cast) ;
             currentPlan.connect(cast, rg) ;
-        } 
+        }
         catch (PlanException ioe) {
             AssertionError err =  new AssertionError("Explicit casting insertion") ;
             err.initCause(ioe) ;
@@ -181,603 +318,646 @@
         rg.setOperand(cast) ;
     }
 
-    @Override
     public void visit(LOAnd binOp) throws VisitorException {
-    	ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
 
-    	byte lhsType = lhs.getType() ;
-    	byte rhsType = rhs.getType() ;
+        byte lhsType = lhs.getType() ;
+        byte rhsType = rhs.getType() ;
 
-    	if (  (lhsType != DataType.BOOLEAN)  ||
-    			(rhsType != DataType.BOOLEAN)  ) {
-    		String msg = "Operands of AND/OR can be boolean only" ;
-    		msgCollector.collect(msg, MessageType.Error);
-    		throw new VisitorException(msg) ;
-    	}
+        if (  (lhsType != DataType.BOOLEAN)  ||
+              (rhsType != DataType.BOOLEAN)  ) {
+            String msg = "Operands of AND/OR can be boolean only" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
 
-    	binOp.setType(DataType.BOOLEAN) ;
+        binOp.setType(DataType.BOOLEAN) ;
 
     }
-    
+
     @Override
     public void visit(LOOr binOp) throws VisitorException {
-    	ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
 
-    	byte lhsType = lhs.getType() ;
-    	byte rhsType = rhs.getType() ;
+        byte lhsType = lhs.getType() ;
+        byte rhsType = rhs.getType() ;
 
-    	if (  (lhsType != DataType.BOOLEAN)  ||
-    			(rhsType != DataType.BOOLEAN)  ) {
-    		String msg = "Operands of AND/OR can be boolean only" ;
-    		msgCollector.collect(msg, MessageType.Error);
-    		throw new VisitorException(msg) ;
-    	}
+        if (  (lhsType != DataType.BOOLEAN)  ||
+              (rhsType != DataType.BOOLEAN)  ) {
+            String msg = "Operands of AND/OR can be boolean only" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
 
-    	binOp.setType(DataType.BOOLEAN) ;
+        binOp.setType(DataType.BOOLEAN) ;
     }
-    
+
     @Override
     public void visit(LOMultiply binOp) throws VisitorException {
-    	ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-    	
-    	if ( DataType.isNumberType(lhsType) &&
-                DataType.isNumberType(rhsType) ) {
-              
-               // return the bigger type
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
 
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               }
-               else if (rhsType != biggerType) {
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               }
-               binOp.setType(biggerType) ;
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (DataType.isNumberType(rhsType)) ) {
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-               // Set output type
-               binOp.setType(rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                   (DataType.isNumberType(lhsType)) ) {
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-               // Set output type
-               binOp.setType(lhsType) ;
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (rhsType == DataType.BYTEARRAY) ) {
-               // Cast both operands to double
-               insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
-               insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
-               // Set output type
-               binOp.setType(DataType.DOUBLE) ;
-           }
-           else {
-               String msg = "Cannot evaluate output type of Mul/Div Operator" ;
-               msgCollector.collect(msg, MessageType.Error);
-               throw new VisitorException(msg) ;
-           }
+        if ( DataType.isNumberType(lhsType) &&
+             DataType.isNumberType(rhsType) ) {
+
+            // return the bigger type
+            byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+            // Cast smaller type to the bigger type
+            if (lhsType != biggerType) {
+                insertLeftCastForBinaryOp(binOp, biggerType) ;
+            }
+            else if (rhsType != biggerType) {
+                insertRightCastForBinaryOp(binOp, biggerType) ;
+            }
+            binOp.setType(biggerType) ;
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (DataType.isNumberType(rhsType)) ) {
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+            // Set output type
+            binOp.setType(rhsType) ;
+        }
+        else if ( (rhsType == DataType.BYTEARRAY) &&
+                  (DataType.isNumberType(lhsType)) ) {
+            insertRightCastForBinaryOp(binOp, lhsType) ;
+            // Set output type
+            binOp.setType(lhsType) ;
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (rhsType == DataType.BYTEARRAY) ) {
+            // Cast both operands to double
+            insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
+            insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
+            // Set output type
+            binOp.setType(DataType.DOUBLE) ;
+        }
+        else {
+            String msg = "Cannot evaluate output type of Mul/Div Operator" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
     }
-    
+
     @Override
     public void visit(LODivide binOp) throws VisitorException {
-    	ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-    	
-    	if ( DataType.isNumberType(lhsType) &&
-                DataType.isNumberType(rhsType) ) {
-              
-               // return the bigger type
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
 
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               }
-               else if (rhsType != biggerType) {
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               }
-               binOp.setType(biggerType) ;
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (DataType.isNumberType(rhsType)) ) {
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-               // Set output type
-               binOp.setType(rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                   (DataType.isNumberType(lhsType)) ) {
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-               // Set output type
-               binOp.setType(lhsType) ;
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (rhsType == DataType.BYTEARRAY) ) {
-               // Cast both operands to double
-               insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
-               insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
-               // Set output type
-               binOp.setType(DataType.DOUBLE) ;
-           }
-           else {
-               String msg = "Cannot evaluate output type of Mul/Div Operator" ;
-               msgCollector.collect(msg, MessageType.Error);
-               throw new VisitorException(msg) ;
-           }
+        if ( DataType.isNumberType(lhsType) &&
+             DataType.isNumberType(rhsType) ) {
+
+            // return the bigger type
+            byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+            // Cast smaller type to the bigger type
+            if (lhsType != biggerType) {
+                insertLeftCastForBinaryOp(binOp, biggerType) ;
+            }
+            else if (rhsType != biggerType) {
+                insertRightCastForBinaryOp(binOp, biggerType) ;
+            }
+            binOp.setType(biggerType) ;
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (DataType.isNumberType(rhsType)) ) {
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+            // Set output type
+            binOp.setType(rhsType) ;
+        }
+        else if ( (rhsType == DataType.BYTEARRAY) &&
+                  (DataType.isNumberType(lhsType)) ) {
+            insertRightCastForBinaryOp(binOp, lhsType) ;
+            // Set output type
+            binOp.setType(lhsType) ;
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (rhsType == DataType.BYTEARRAY) ) {
+            // Cast both operands to double
+            insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
+            insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
+            // Set output type
+            binOp.setType(DataType.DOUBLE) ;
+        }
+        else {
+            String msg = "Cannot evaluate output type of Mul/Div Operator" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
     }
-    
+
     @Override
     public void visit(LOAdd binOp) throws VisitorException {
-    	ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-        
-    	if ( DataType.isNumberType(lhsType) &&
-                DataType.isNumberType(rhsType) ) {
-               
-               // return the bigger type
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-               
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {            
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               } 
-               else if (rhsType != biggerType) { 
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               }              
-               binOp.setType(biggerType) ;
-           } 
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                   (DataType.isNumberType(rhsType)) ) {
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-               // Set output type
-               binOp.setType(rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                 (DataType.isNumberType(lhsType)) ) {
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-               // Set output type
-               binOp.setType(lhsType) ;
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                   (rhsType == DataType.BYTEARRAY) ) {
-               // Cast both operands to double
-               insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
-               insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
-               // Set output type
-               binOp.setType(DataType.DOUBLE) ;
-           }
-           else {
-               String msg = "Cannot evaluate output type of Add/Subtract Operator" ;
-               msgCollector.collect(msg, MessageType.Error);
-               throw new VisitorException(msg) ;
-           }
+
+        if ( DataType.isNumberType(lhsType) &&
+             DataType.isNumberType(rhsType) ) {
+
+            // return the bigger type
+            byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+            // Cast smaller type to the bigger type
+            if (lhsType != biggerType) {
+                insertLeftCastForBinaryOp(binOp, biggerType) ;
+            }
+            else if (rhsType != biggerType) {
+                insertRightCastForBinaryOp(binOp, biggerType) ;
+            }
+            binOp.setType(biggerType) ;
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (DataType.isNumberType(rhsType)) ) {
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+            // Set output type
+            binOp.setType(rhsType) ;
+        }
+        else if ( (rhsType == DataType.BYTEARRAY) &&
+                  (DataType.isNumberType(lhsType)) ) {
+            insertRightCastForBinaryOp(binOp, lhsType) ;
+            // Set output type
+            binOp.setType(lhsType) ;
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (rhsType == DataType.BYTEARRAY) ) {
+            // Cast both operands to double
+            insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
+            insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
+            // Set output type
+            binOp.setType(DataType.DOUBLE) ;
+        }
+        else {
+            String msg = "Cannot evaluate output type of Add/Subtract Operator" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
     }
-    
+
     @Override
     public void visit(LOSubtract binOp) throws VisitorException {
-    	ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-        
-    	if ( DataType.isNumberType(lhsType) &&
+
+        if ( DataType.isNumberType(lhsType) &&
                 DataType.isNumberType(rhsType) ) {
-               
-               // return the bigger type
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-               
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {            
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               } 
-               else if (rhsType != biggerType) { 
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               }              
-               binOp.setType(biggerType) ;
-           } 
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                   (DataType.isNumberType(rhsType)) ) {
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-               // Set output type
-               binOp.setType(rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                 (DataType.isNumberType(lhsType)) ) {
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-               // Set output type
-               binOp.setType(lhsType) ;
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                   (rhsType == DataType.BYTEARRAY) ) {
-               // Cast both operands to double
-               insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
-               insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
-               // Set output type
-               binOp.setType(DataType.DOUBLE) ;
-           }
-           else {
-               String msg = "Cannot evaluate output type of Add/Subtract Operator" ;
-               msgCollector.collect(msg, MessageType.Error);
-               throw new VisitorException(msg) ;
-           }
+
+            // return the bigger type
+            byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+            // Cast smaller type to the bigger type
+            if (lhsType != biggerType) {
+                insertLeftCastForBinaryOp(binOp, biggerType) ;
+            }
+            else if (rhsType != biggerType) {
+                insertRightCastForBinaryOp(binOp, biggerType) ;
+            }
+            binOp.setType(biggerType) ;
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                (DataType.isNumberType(rhsType)) ) {
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+            // Set output type
+            binOp.setType(rhsType) ;
+        }
+        else if ( (rhsType == DataType.BYTEARRAY) &&
+                  (DataType.isNumberType(lhsType)) ) {
+            insertRightCastForBinaryOp(binOp, lhsType) ;
+            // Set output type
+            binOp.setType(lhsType) ;
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (rhsType == DataType.BYTEARRAY) ) {
+            // Cast both operands to double
+            insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
+            insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
+            // Set output type
+            binOp.setType(DataType.DOUBLE) ;
+        }
+        else {
+            String msg = "Cannot evaluate output type of Add/Subtract Operator" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
     }
-    
-    
-     
+
+
+
     @Override
-	public void visit(LOGreaterThan binOp) throws VisitorException {
-    	ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+    public void visit(LOGreaterThan binOp) throws VisitorException {
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-        
-    	if ( DataType.isNumberType(lhsType) &&
-                DataType.isNumberType(rhsType) ) {
-               // If not the same type, we cast them to the same
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-               
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {            
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               } 
-               else if (rhsType != biggerType) { 
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               }       
-           } 
-           else if ( (lhsType == DataType.CHARARRAY) &&
-                     (rhsType == DataType.CHARARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (rhsType == DataType.BYTEARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
-                   ) {
-               // Cast byte array to the type on rhs
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                     ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
-                   ) {
-               // Cast byte array to the type on lhs
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-           }
-           else {
-               throw new VisitorException("Cannot evaluate output type of "
+
+        if ( DataType.isNumberType(lhsType) &&
+             DataType.isNumberType(rhsType) ) {
+            // If not the same type, we cast them to the same
+            byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+            // Cast smaller type to the bigger type
+            if (lhsType != biggerType) {
+                insertLeftCastForBinaryOp(binOp, biggerType) ;
+            }
+            else if (rhsType != biggerType) {
+                insertRightCastForBinaryOp(binOp, biggerType) ;
+            }
+        }
+        else if ( (lhsType == DataType.CHARARRAY) &&
+                  (rhsType == DataType.CHARARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (rhsType == DataType.BYTEARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
+                ) {
+            // Cast byte array to the type on rhs
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+        }
+        else if ( (rhsType == DataType.BYTEARRAY) &&
+                  ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
+                ) {
+            // Cast byte array to the type on lhs
+            insertRightCastForBinaryOp(binOp, lhsType) ;
+        }
+        else {
+            String msg = "Cannot evaluate output type of "
                             + binOp.getClass().getSimpleName()
                             + " LHS:" + DataType.findTypeName(lhsType)
-                            + " RHS:" + DataType.findTypeName(rhsType)) ;
-           }
-           
-           binOp.setType(DataType.BOOLEAN) ;
-	}
-
-	@Override
-	public void visit(LOGreaterThanEqual binOp) throws VisitorException {
-		ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+                            + " RHS:" + DataType.findTypeName(rhsType) ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new VisitorException(msg) ;
+        }
+
+        binOp.setType(DataType.BOOLEAN) ;
+    }
+
+    @Override
+    public void visit(LOGreaterThanEqual binOp) throws VisitorException {
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-        
-    	if ( DataType.isNumberType(lhsType) &&
-                DataType.isNumberType(rhsType) ) {
-               // If not the same type, we cast them to the same
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-               
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {            
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               } 
-               else if (rhsType != biggerType) { 
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               }       
-           } 
-           else if ( (lhsType == DataType.CHARARRAY) &&
-                     (rhsType == DataType.CHARARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (rhsType == DataType.BYTEARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
-                   ) {
-               // Cast byte array to the type on rhs
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                     ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
-                   ) {
-               // Cast byte array to the type on lhs
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-           }
-           else {
-               throw new VisitorException("Cannot evaluate output type of "
+
+        if ( DataType.isNumberType(lhsType) &&
+             DataType.isNumberType(rhsType) ) {
+            // If not the same type, we cast them to the same
+            byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+            // Cast smaller type to the bigger type
+            if (lhsType != biggerType) {
+                insertLeftCastForBinaryOp(binOp, biggerType) ;
+            }
+            else if (rhsType != biggerType) {
+                insertRightCastForBinaryOp(binOp, biggerType) ;
+            }
+        }
+        else if ( (lhsType == DataType.CHARARRAY) &&
+                  (rhsType == DataType.CHARARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (rhsType == DataType.BYTEARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
+                ) {
+            // Cast byte array to the type on rhs
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+        }
+        else if ( (rhsType == DataType.BYTEARRAY) &&
+                  ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
+                ) {
+            // Cast byte array to the type on lhs
+            insertRightCastForBinaryOp(binOp, lhsType) ;
+        }
+        else {
+            String msg = "Cannot evaluate output type of "
                             + binOp.getClass().getSimpleName()
                             + " LHS:" + DataType.findTypeName(lhsType)
-                            + " RHS:" + DataType.findTypeName(rhsType)) ;
-           }
-           
-           binOp.setType(DataType.BOOLEAN) ;
-	}
-
-	@Override
-	public void visit(LOLesserThan binOp) throws VisitorException {
-		ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+                            + " RHS:" + DataType.findTypeName(rhsType) ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new VisitorException(msg) ;
+        }
+
+        binOp.setType(DataType.BOOLEAN) ;
+    }
+
+    @Override
+    public void visit(LOLesserThan binOp) throws VisitorException {
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-        
-    	if ( DataType.isNumberType(lhsType) &&
+        if ( DataType.isNumberType(lhsType) &&
                 DataType.isNumberType(rhsType) ) {
-               // If not the same type, we cast them to the same
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-               
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {            
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               } 
-               else if (rhsType != biggerType) { 
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               }       
-           } 
-           else if ( (lhsType == DataType.CHARARRAY) &&
-                     (rhsType == DataType.CHARARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (rhsType == DataType.BYTEARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
-                   ) {
-               // Cast byte array to the type on rhs
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                     ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
-                   ) {
-               // Cast byte array to the type on lhs
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-           }
-           else {
-               throw new VisitorException("Cannot evaluate output type of "
+            // If not the same type, we cast them to the same
+            byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+            // Cast smaller type to the bigger type
+            if (lhsType != biggerType) {
+                insertLeftCastForBinaryOp(binOp, biggerType) ;
+            }
+            else if (rhsType != biggerType) {
+                insertRightCastForBinaryOp(binOp, biggerType) ;
+            }
+        }
+        else if ( (lhsType == DataType.CHARARRAY) &&
+                  (rhsType == DataType.CHARARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (rhsType == DataType.BYTEARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
+                ) {
+            // Cast byte array to the type on rhs
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+        }
+        else if ( (rhsType == DataType.BYTEARRAY) &&
+                  ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
+                ) {
+            // Cast byte array to the type on lhs
+            insertRightCastForBinaryOp(binOp, lhsType) ;
+        }
+        else {
+            String msg = "Cannot evaluate output type of "
                             + binOp.getClass().getSimpleName()
                             + " LHS:" + DataType.findTypeName(lhsType)
-                            + " RHS:" + DataType.findTypeName(rhsType)) ;
-           }
-           
-           binOp.setType(DataType.BOOLEAN) ;
-	}
-
-	@Override
-	public void visit(LOLesserThanEqual binOp) throws VisitorException {
-		ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+                            + " RHS:" + DataType.findTypeName(rhsType) ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new VisitorException(msg) ;
+        }
+
+        binOp.setType(DataType.BOOLEAN) ;
+    }
+
+    @Override
+    public void visit(LOLesserThanEqual binOp) throws VisitorException {
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-        
-    	if ( DataType.isNumberType(lhsType) &&
+
+        if ( DataType.isNumberType(lhsType) &&
                 DataType.isNumberType(rhsType) ) {
-               // If not the same type, we cast them to the same
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-               
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {            
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               } 
-               else if (rhsType != biggerType) { 
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               }       
-           } 
-           else if ( (lhsType == DataType.CHARARRAY) &&
-                     (rhsType == DataType.CHARARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (rhsType == DataType.BYTEARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
-                   ) {
-               // Cast byte array to the type on rhs
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                     ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
-                   ) {
-               // Cast byte array to the type on lhs
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-           }
-           else {
-               throw new VisitorException("Cannot evaluate output type of "
+            // If not the same type, we cast them to the same
+            byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+            // Cast smaller type to the bigger type
+            if (lhsType != biggerType) {
+                insertLeftCastForBinaryOp(binOp, biggerType) ;
+            }
+            else if (rhsType != biggerType) {
+                insertRightCastForBinaryOp(binOp, biggerType) ;
+            }
+        }
+        else if ( (lhsType == DataType.CHARARRAY) &&
+                  (rhsType == DataType.CHARARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (rhsType == DataType.BYTEARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
+                ) {
+            // Cast byte array to the type on rhs
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+        }
+        else if ( (rhsType == DataType.BYTEARRAY) &&
+                  ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
+                ) {
+            // Cast byte array to the type on lhs
+            insertRightCastForBinaryOp(binOp, lhsType) ;
+        }
+        else {
+            String msg = "Cannot evaluate output type of "
                             + binOp.getClass().getSimpleName()
                             + " LHS:" + DataType.findTypeName(lhsType)
-                            + " RHS:" + DataType.findTypeName(rhsType)) ;
-           }
-           
-           binOp.setType(DataType.BOOLEAN) ;
-	}
-
-	
-	
-	@Override
-	public void visit(LOEqual binOp) throws VisitorException {
-		ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+                            + " RHS:" + DataType.findTypeName(rhsType) ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new VisitorException(msg) ;
+        }
+
+        binOp.setType(DataType.BOOLEAN) ;
+    }
+
+
+
+    @Override
+    public void visit(LOEqual binOp) throws VisitorException {
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-        
-		if ( DataType.isNumberType(lhsType) &&
+
+        if ( DataType.isNumberType(lhsType) &&
                 DataType.isNumberType(rhsType) ) {
-               
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-               
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {            
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               } 
-               else if (rhsType != biggerType) { 
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               } 
-               
-           } 
-           else if ( (lhsType == DataType.CHARARRAY) &&
-                     (rhsType == DataType.CHARARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (rhsType == DataType.BYTEARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
-                   ) {
-               // Cast byte array to the type on rhs
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                     ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
-                   ) {
-               // Cast byte array to the type on lhs
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-           }
-           else if ( (lhsType == DataType.TUPLE) &&
-                     (rhsType == DataType.TUPLE) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.MAP) &&
-                     (rhsType == DataType.MAP) ) {
-               // good
-           }
-           else {
-               String msg = "Cannot evaluate output type of Equal/NotEqual Operator" ;
-               msgCollector.collect(msg, MessageType.Error);
-               throw new VisitorException(msg) ;
-           }
-           
-           binOp.setType(DataType.BOOLEAN) ;
-	}
-
-	@Override
-	public void visit(LONotEqual binOp) throws VisitorException {
-		ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+
+            byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+            // Cast smaller type to the bigger type
+            if (lhsType != biggerType) {
+                insertLeftCastForBinaryOp(binOp, biggerType) ;
+            }
+            else if (rhsType != biggerType) {
+                insertRightCastForBinaryOp(binOp, biggerType) ;
+            }
+
+        }
+        else if ( (lhsType == DataType.CHARARRAY) &&
+                  (rhsType == DataType.CHARARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (rhsType == DataType.BYTEARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
+                ) {
+            // Cast byte array to the type on rhs
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+        }
+        else if ( (rhsType == DataType.BYTEARRAY) &&
+                  ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
+                ) {
+            // Cast byte array to the type on lhs
+            insertRightCastForBinaryOp(binOp, lhsType) ;
+        }
+        else if ( (lhsType == DataType.TUPLE) &&
+                  (rhsType == DataType.TUPLE) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.MAP) &&
+                  (rhsType == DataType.MAP) ) {
+            // good
+        }
+        else {
+            String msg = "Cannot evaluate output type of Equal/NotEqual Operator" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
+
+        binOp.setType(DataType.BOOLEAN) ;
+    }
+
+    @Override
+    public void visit(LONotEqual binOp) throws VisitorException {
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-        
-		if ( DataType.isNumberType(lhsType) &&
+
+
+        if ( DataType.isNumberType(lhsType) &&
                 DataType.isNumberType(rhsType) ) {
-               
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
-               
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {            
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               } 
-               else if (rhsType != biggerType) { 
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               } 
-               
-           } 
-           else if ( (lhsType == DataType.CHARARRAY) &&
-                     (rhsType == DataType.CHARARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (rhsType == DataType.BYTEARRAY) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
-                   ) {
-               // Cast byte array to the type on rhs
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                     ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
-                   ) {
-               // Cast byte array to the type on lhs
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-           }
-           else if ( (lhsType == DataType.TUPLE) &&
-                     (rhsType == DataType.TUPLE) ) {
-               // good
-           }
-           else if ( (lhsType == DataType.MAP) &&
-                     (rhsType == DataType.MAP) ) {
-               // good
-           }
-           else {
-               String msg = "Cannot evaluate output type of Equal/NotEqual Operator" ;
-               msgCollector.collect(msg, MessageType.Error);
-               throw new VisitorException(msg) ;
-           }
-           
-           binOp.setType(DataType.BOOLEAN) ;
-	}
-	
-	
-
-	@Override
-	public void visit(LOMod binOp) throws VisitorException {
-		ExpressionOperator lhs = binOp.getLhsOperand() ;
-    	ExpressionOperator rhs = binOp.getRhsOperand() ;
-    	
-    	byte lhsType = lhs.getType() ;
+
+            byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+            // Cast smaller type to the bigger type
+            if (lhsType != biggerType) {
+                insertLeftCastForBinaryOp(binOp, biggerType) ;
+            }
+            else if (rhsType != biggerType) {
+                insertRightCastForBinaryOp(binOp, biggerType) ;
+            }
+
+        }
+        else if ( (lhsType == DataType.CHARARRAY) &&
+                  (rhsType == DataType.CHARARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  (rhsType == DataType.BYTEARRAY) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  ( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) )
+                ) {
+            // Cast byte array to the type on rhs
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+        }
+        else if ( (rhsType == DataType.BYTEARRAY) &&
+                  ( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) )
+                ) {
+            // Cast byte array to the type on lhs
+            insertRightCastForBinaryOp(binOp, lhsType) ;
+        }
+        else if ( (lhsType == DataType.TUPLE) &&
+                  (rhsType == DataType.TUPLE) ) {
+            // good
+        }
+        else if ( (lhsType == DataType.MAP) &&
+                  (rhsType == DataType.MAP) ) {
+            // good
+        }
+        else {
+            String msg = "Cannot evaluate output type of Equal/NotEqual Operator" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
+
+        binOp.setType(DataType.BOOLEAN) ;
+    }
+
+    @Override
+    public void visit(LOMod binOp) throws VisitorException {
+        ExpressionOperator lhs = binOp.getLhsOperand() ;
+        ExpressionOperator rhs = binOp.getRhsOperand() ;
+
+        byte lhsType = lhs.getType() ;
         byte rhsType = rhs.getType() ;
-        
-		if ( (lhsType == DataType.INTEGER) &&
-                (rhsType == DataType.INTEGER) ) {
-               binOp.setType(DataType.INTEGER) ;
-           }           
-           else if ( (lhsType == DataType.LONG) &&
-                     ( (rhsType == DataType.INTEGER) || (rhsType == DataType.LONG) )) {
-               if (rhsType == DataType.INTEGER) {
-                   insertRightCastForBinaryOp(binOp, DataType.LONG) ;
-               }
-               binOp.setType(DataType.LONG) ;
-           }            
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                   ( (rhsType == DataType.INTEGER) || (rhsType == DataType.LONG) )) {
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-               binOp.setType(rhsType) ;
-           }          
-           else {
-               String msg = "Cannot evaluate output type of Mod Operator" ;
-               msgCollector.collect(msg, MessageType.Error);
-               throw new VisitorException(msg) ;
-           }
-	}
 
-	private void insertLeftCastForBinaryOp(BinaryExpressionOperator binOp,
+        if ( (lhsType == DataType.INTEGER) &&
+             (rhsType == DataType.INTEGER)
+           ) {
+            binOp.setType(DataType.INTEGER) ;
+        }
+        else if ( (lhsType == DataType.LONG) &&
+                  ( (rhsType == DataType.INTEGER) || (rhsType == DataType.LONG) )
+                ) {
+            if (rhsType == DataType.INTEGER) {
+                insertRightCastForBinaryOp(binOp, DataType.LONG) ;
+            }
+            binOp.setType(DataType.LONG) ;
+        }
+        else if ( (lhsType == DataType.BYTEARRAY) &&
+                  ( (rhsType == DataType.INTEGER) || (rhsType == DataType.LONG) )
+                ) {
+            insertLeftCastForBinaryOp(binOp, rhsType) ;
+            binOp.setType(rhsType) ;
+        }
+        else {
+            String msg = "Cannot evaluate output type of Mod Operator" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
+    }
+
+
+    @Override
+    public void visit(LONegative uniOp) throws VisitorException {
+        byte type = uniOp.getOperand().getType() ;
+
+
+        if (DataType.isNumberType(type)) {
+            uniOp.setType(type) ;
+        }
+        else if (type == DataType.BYTEARRAY) {
+            insertCastForUniOp(uniOp, DataType.DOUBLE) ;
+            uniOp.setType(DataType.DOUBLE) ;
+        }
+        else {
+            String msg = "NEG can be used with numbers or Bytearray only" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
+
+    }
+
+    @Override
+    public void visit(LONot uniOp) throws VisitorException {
+        byte type = uniOp.getOperand().getType() ;
+
+        if (type == DataType.BOOLEAN) {
+            uniOp.setType(DataType.BOOLEAN) ;
+        }
+        else {
+            String msg = "NOT can be used with boolean only" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
+    }
+
+    private void insertLeftCastForBinaryOp(BinaryExpressionOperator binOp,
                                            byte toType ) {
         LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
         collectCastWarning(binOp,
@@ -790,15 +970,15 @@
         try {
             currentPlan.connect(binOp.getLhsOperand(), cast) ;
             currentPlan.connect(cast, binOp) ;
-        } 
+        }
         catch (PlanException ioe) {
             AssertionError err =  new AssertionError("Explicit casting insertion") ;
             err.initCause(ioe) ;
             throw err ;
-        } 
+        }
         binOp.setLhsOperand(cast) ;
     }
-	
+
     private void insertRightCastForBinaryOp(BinaryExpressionOperator binOp,
                                             byte toType ) {
         LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
@@ -812,40 +992,40 @@
         try {
             currentPlan.connect(binOp.getRhsOperand(), cast) ;
             currentPlan.connect(cast, binOp) ;
-        } 
+        }
         catch (PlanException ioe) {
             AssertionError err =  new AssertionError("Explicit casting insertion") ;
             err.initCause(ioe) ;
             throw err ;
-        }               
+        }
         binOp.setRhsOperand(cast) ;
     }
-    
-    /** 
-     * Currently, there are two unaryOps: Neg and Not. 
-     */   
+
+    /**
+     * Currently, there are two unaryOps: Neg and Not.
+     */
     @Override
     protected void visit(UnaryExpressionOperator uniOp) throws VisitorException {
-        
+
         byte type = uniOp.getOperand().getType() ;
-        
+
         if (uniOp instanceof LONegative) {
             if (DataType.isNumberType(type)) {
                 uniOp.setType(type) ;
             }
             else if (type == DataType.BYTEARRAY) {
                 insertCastForUniOp(uniOp, DataType.DOUBLE) ;
-                uniOp.setType(DataType.DOUBLE) ;              
+                uniOp.setType(DataType.DOUBLE) ;
             }
             else {
                 String msg = "NEG can be used with numbers or Bytearray only" ;
                 msgCollector.collect(msg, MessageType.Error);
                 throw new VisitorException(msg) ;
             }
-        } 
-        else if (uniOp instanceof LONot) {            
+        }
+        else if (uniOp instanceof LONot) {
             if (type == DataType.BOOLEAN) {
-                uniOp.setType(DataType.BOOLEAN) ;                
+                uniOp.setType(DataType.BOOLEAN) ;
             }
             else {
                 String msg = "NOT can be used with boolean only" ;
@@ -857,44 +1037,10 @@
             // undefined for this unknown unary operator
             throw new AssertionError(" Undefined type checking logic for " + uniOp.getClass()) ;
         }
-            
-    }
-    
-    @Override
-	public void visit(LONegative uniOp) throws VisitorException {
-    	byte type = uniOp.getOperand().getType() ;
 
-
-    	if (DataType.isNumberType(type)) {
-    		uniOp.setType(type) ;
-    	}
-    	else if (type == DataType.BYTEARRAY) {
-    		insertCastForUniOp(uniOp, DataType.DOUBLE) ;
-    		uniOp.setType(DataType.DOUBLE) ;              
-    	}
-    	else {
-    		String msg = "NEG can be used with numbers or Bytearray only" ;
-    		msgCollector.collect(msg, MessageType.Error);
-    		throw new VisitorException(msg) ;
-    	}
-         
-	}
-    
-    @Override
-    public void visit(LONot uniOp) throws VisitorException {
-    	byte type = uniOp.getOperand().getType() ;
-    	
-    	if (type == DataType.BOOLEAN) {
-            uniOp.setType(DataType.BOOLEAN) ;                
-        }
-        else {
-            String msg = "NOT can be used with boolean only" ;
-            msgCollector.collect(msg, MessageType.Error);
-            throw new VisitorException(msg) ;
-        }
     }
 
-	private void insertCastForUniOp(UnaryExpressionOperator uniOp, byte toType) {
+    private void insertCastForUniOp(UnaryExpressionOperator uniOp, byte toType) {
         collectCastWarning(uniOp,
                            uniOp.getOperand().getType(),
                            toType) ;
@@ -1149,59 +1295,73 @@
         List<LogicalOperator> inputs =  u.getInputs() ;
 
         // There is no point to union only one operand
-        // that should be a problem in the parser
+        // it should be a problem in the parser
         if (inputs.size() < 2) {
             AssertionError err =  new AssertionError("Union with Count(Operand) < 2") ;
         }
 
+        Schema schema = null ;
         try {
-            Schema schema = inputs.get(0).getSchema() ;
-            
-            // Keep merging one by one
-            for (int i=1; i< inputs.size() ;i++) {
-                // Assume the first input's aliases take precedance
-                schema = schema.merge(inputs.get(i).getSchema(), false) ;
-                // if they cannot be merged, we just give up
-                if (schema == null) {
-                    String msg = "cannot merge schemas from inputs of UNION" ;
-                    msgCollector.collect(msg, MessageType.Error) ;
-                    throw new VisitorException(msg) ;
-                }
-            }
-
-            try {
-                u.setSchema(schema);
-            }
-            catch (ParseException pe) {
-                // This should never happen
-                AssertionError err =  new AssertionError("problem with computing UNION schema") ;
-                err.initCause(pe) ;
-                throw err ;
-            }
-
-            // Insert casting to inputs if necessary
 
-            for (int i=0; i< inputs.size() ;i++) {
-                insertCastForEachInBetweenIfNecessary(inputs.get(i), u, schema);
+            if (strictMode) {
+                // Keep merging one by one just to check if there is
+                // any problem with types in strict mode
+                Schema tmpSchema = inputs.get(0).getSchema() ;
+                for (int i=1; i< inputs.size() ;i++) {
+                    // Assume the first input's aliases take precedance
+                    tmpSchema = tmpSchema.merge(inputs.get(i).getSchema(), false) ;
+
+                    // if they cannot be merged, we just give up
+                    if (tmpSchema == null) {
+                        String msg = "cannot merge schemas from inputs of UNION" ;
+                        msgCollector.collect(msg, MessageType.Error) ;
+                        throw new VisitorException(msg) ;
+                    }
+                }
             }
 
+            // Compute the schema
+            schema = u.getSchema() ;
 
         }
         catch (FrontendException fee) {
-            // I don't quite understand how this can happen
-            // Anyway, just throw an exception to be on the safe side
             String msg = "Problem while reading schemas from inputs of UNION" ;
             msgCollector.collect(msg, MessageType.Error) ;
             VisitorException vse = new VisitorException(msg) ;
-            //vse.initCause(fee) ;
+            vse.initCause(fee) ;
             throw vse ;
         }
 
+        // Do cast insertion only if we are typed
+        if (schema != null) {
+            // Insert casting to inputs if necessary
+            for (int i=0; i< inputs.size() ;i++) {
+                LOForEach insertedOp
+                        = insertCastForEachInBetweenIfNecessary(inputs.get(i), u, schema) ;
+
+                // We may have to compute the schema of the input again
+                // because we have just inserted
+                if (insertedOp != null) {
+                    try {
+                        this.visit(insertedOp);
+                    }
+                    catch (FrontendException fee) {
+                        String msg = "Problem while casting inputs of UNION" ;
+                        msgCollector.collect(msg, MessageType.Error) ;
+                        VisitorException vse = new VisitorException(msg) ;
+                        //vse.initCause(fee) ;
+                        throw vse ;
+                    }
+                }
+            }
+        }
+
+
     }
 
     @Override
     protected void visit(LOSplitOutput op) throws VisitorException {
-        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+        LogicalPlan currentPlan =  mCurrentWalker.getPlan() ;
 
         // LOSplitOutput can only have 1 input
         List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
@@ -1219,7 +1379,7 @@
             throw new VisitorException(msg) ;
         }
             
-        checkInnerPlan(condPlan, input) ;
+        checkInnerPlan(condPlan) ;
                  
         byte innerCondType = condPlan.getLeaves().get(0).getType() ;
         if (innerCondType != DataType.BOOLEAN) {
@@ -1228,18 +1388,9 @@
             throw new VisitorException(msg) ;
         }
 
-        op.setType(input.getType()) ; // This should be bag always
-
         try {
-            op.setSchema(input.getSchema()) ;
-        } 
-        catch (ParseException pe) {
-            String msg = "Problem while reading schemas from"
-                         + " inputs of LOSplitOutput " ;
-            msgCollector.collect(msg, MessageType.Error) ;
-            VisitorException vse = new VisitorException(msg) ;
-            vse.initCause(pe) ;
-            throw vse ;
+            // Compute the schema
+            op.getSchema() ;
         }
         catch (FrontendException fe) {
             String msg = "Problem while reading"
@@ -1260,20 +1411,13 @@
     
     @Override
     protected void visit(LODistinct op) throws VisitorException {
-        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+        LogicalPlan currentPlan = mCurrentWalker.getPlan() ;
         List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
-        // LOSplitOutput can only have 1 input
 
+        // LOSplitOutput can only have 1 input
         try {
-            op.setSchema(list.get(0).getSchema()) ;
-        }
-        catch (ParseException pe) {
-            String msg = "Problem while reading"
-                         + " schemas from inputs of LODistinct" ;
-            msgCollector.collect(msg, MessageType.Error) ;
-            VisitorException vse = new VisitorException(msg) ;
-            vse.initCause(pe) ;
-            throw vse ;
+            // Compute the schema
+            op.getSchema() ;
         }
         catch (FrontendException fe) {
             String msg = "Problem while reading"
@@ -1287,6 +1431,8 @@
 
     /***
      * Return concatenated of all fields from all input operators
+     * If one of the inputs have no schema then we cannot construct
+     * the output schema.
      * @param cs
      * @throws VisitorException
      */
@@ -1295,31 +1441,22 @@
         List<FieldSchema> fsList = new ArrayList<FieldSchema>() ;
 
         try {
+            // Compute the schema
+            cs.getSchema() ;
+
+            boolean foundNullSchema = false ;
             for(LogicalOperator op: inputs) {
                 // All of inputs are relational operators
-                // so we can access getSchema()
-                Schema inputSchema = op.getSchema() ;
-                for(int i=0; i < inputSchema.size(); i++) {
-                    // For types other than tuple
-                    if (inputSchema.getField(0).type != DataType.TUPLE) {
-                        fsList.add(new FieldSchema(inputSchema.getField(0).alias,
-                                                   inputSchema.getField(0).type)) ;
-                    }
-                    // For tuple type
-                    else {
-                        fsList.add(new FieldSchema(inputSchema.getField(0).alias,
-                                                   inputSchema.getField(0).schema)) ;
-                    }
+                // so we can access getSchema()
+                Schema inputSchema = op.getSchema() ;
+                if (inputSchema == null) {
+                    // force to null if one input has null schema
+                    cs.forceSchema(null);
+                    break ;
                 }
+
             }
-        }
-        catch (ParseException pe) {
-            String msg = "Problem while reading"
-                         + " schemas from inputs of CROSS" ;
-            msgCollector.collect(msg, MessageType.Error) ;
-            VisitorException vse = new VisitorException(msg) ;
-            vse.initCause(pe) ;
-            throw vse ;
+
         }
         catch (FrontendException fe) {
             String msg = "Problem while reading"
@@ -1330,17 +1467,6 @@
             throw vse ;
         }
 
-        try {
-            cs.setSchema(new Schema(fsList));
-        }
-        catch (ParseException pe) {
-            String msg = "Problem while reading"
-                        + " schemas from inputs of CROSS" ;
-            msgCollector.collect(msg, MessageType.Error) ;
-            VisitorException vse = new VisitorException(msg) ;
-            vse.initCause(pe) ;
-            throw vse ;
-        }
     }
     
     /***
@@ -1364,8 +1490,7 @@
                 throw new VisitorException(msg) ;
             }
 
-            checkInnerPlan(sortColPlan, input) ;
-
+            checkInnerPlan(sortColPlan) ;
             // TODO: May have to check SortFunc compatibility here in the future
                        
         }
@@ -1373,7 +1498,8 @@
         s.setType(input.getType()) ;  // This should be bag always.
 
         try {
-            s.setSchema(input.getSchema()) ;
+            // Compute the schema
+            s.getSchema() ;
         }
         catch (FrontendException ioe) {
             String msg = "Problem while reconciling output schema of LOSort" ;
@@ -1382,14 +1508,6 @@
             vse.initCause(ioe) ;
             throw vse ;
         }
-        // TODO: Is this ParseException applicable ?
-        catch (ParseException pe) {
-            String msg = "Problem while reconciling output schema of LOSort" ;
-            msgCollector.collect(msg, MessageType.Error);
-            VisitorException vse = new VisitorException(msg) ;
-            vse.initCause(pe) ;
-            throw vse ;
-        }
     }
 
 
@@ -1410,7 +1528,7 @@
             throw new VisitorException(msg) ;
         }
 
-        checkInnerPlan(comparisonPlan, input) ;
+        checkInnerPlan(comparisonPlan) ;
               
         byte innerCondType = comparisonPlan.getLeaves().get(0).getType() ;
         if (innerCondType != DataType.BOOLEAN) {
@@ -1418,11 +1536,11 @@
             msgCollector.collect(msg, MessageType.Error) ;
             throw new VisitorException(msg) ;
         }       
-        
-        filter.setType(input.getType()) ; // This should be bag always.
 
-        try { 
-            filter.setSchema(input.getSchema()) ;
+
+        try {
+            // Compute the schema
+            filter.getSchema() ;
         } 
         catch (FrontendException ioe) {
             String msg = "Problem while reconciling output schema of LOFilter" ;
@@ -1431,14 +1549,6 @@
             vse.initCause(ioe) ;
             throw vse ;
         }
-        // TODO: Is this ParseException applicable ?
-        catch (ParseException pe) {
-            String msg = "Problem while reconciling output schema of LOFilter" ;
-            msgCollector.collect(msg, MessageType.Error);
-            VisitorException vse = new VisitorException(msg) ;
-            vse.initCause(pe) ;
-            throw vse ;
-        }
 
     }
 
@@ -1457,10 +1567,9 @@
         
         LogicalOperator input = inputList.get(0) ;
         
-        split.setType(input.getType()) ; // This should be bag always
-        
         try {
-            split.setSchema(input.getSchema()) ;
+            // Compute the schema
+            split.getSchema() ;
         }
         catch (FrontendException ioe) {
             String msg = "Problem while reconciling output schema of LOSplit" ;
@@ -1469,75 +1578,54 @@
             vse.initCause(ioe) ;
             throw vse ;
         }
-        // TODO: Is this ParseException applicable ?
-        catch (ParseException pe) {
-            String msg = "Problem while reconciling output schema of LOSplit" ;
-            msgCollector.collect(msg, MessageType.Error);
-            VisitorException vse = new VisitorException(msg) ;
-            vse.initCause(pe) ;
-            throw vse ;
-        }
     }
 
     /**
-     * The output schema will be generated.
+     * This implementation assumes LOForeach only works
+     * in conjunction with the inner LOGenerate
+     *
+     * The output schema will be generated by the inner generate
      */
-    /*
+
     protected void visit(LOForEach forEach) throws VisitorException {
-        // TODO: Again, should ForEach have getInput() ???
+
+        // TODO: Shouldn't ForEach have getInput() ???
         List<LogicalOperator> inputList = mPlan.getPredecessors(forEach) ;
         
         if (inputList.size() != 1) {
             throw new AssertionError("LOForEach cannot have more than one input") ;
         }
-        
-        LogicalOperator input = inputList.get(0) ;
-        
-        // This variable for generating output schema at the end
-        List<FieldSchema> fsList = new ArrayList<FieldSchema>() ;
-        
-        // Checking internal plans.
-        for(int i=0;i < forEach.getEvaluationPlans().size(); i++) {
-            
-            LogicalPlan evalPlan = forEach.getEvaluationPlans().get(i) ;            
-            checkInnerPlan(evalPlan, input.getSchema()) ;
-            
-            if (!evalPlan.isSingleLeafPlan()) {
-                throw new VisitorException("Inner plans can only have one output (leaf)") ;
-            }
-            
-            LogicalOperator leafOp = evalPlan.getLeaves().get(0) ;
-            // TODO: May have to fill in field aliases here
-            FieldSchema fs = new FieldSchema(null, leafOp.getType()) ; 
-            if (leafOp.getType() == DataType.TUPLE) {
-                fs.schema = leafOp.getSchema() ;
-            }
-            fsList.add(fs) ;
-            
+
+        // Check the inner plan
+        LogicalPlan innerPlan = forEach.getForEachPlan() ;
+        checkInnerPlan(innerPlan) ;
+
+        try {
+            forEach.getSchema() ;
+        }
+        catch (FrontendException ioe) {
+            String msg = "Problem while reconciling output schema of LOForEach" ;
+            msgCollector.collect(msg, MessageType.Error);
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(ioe) ;
+            throw vse ;
         }
-        
-        forEach.setType(input.getType()) ; // This should be bag always    
-        forEach.setSchema(new Schema(fsList)) ;
                
     }
-    */
+
     /**
      * COGroup
-     * Still have questions about LOCOGroup internal structure
-     * so this may not look quite right
+     * All group by cols from all inputs have to be of the
+     * same type
      */
     protected void visit(LOCogroup cg) throws VisitorException {
 
-        // TODO: Do all the GroupBy cols from all inputs have
-        // TODO: to have the same type??? I assume "no"
-
         MultiMap<LogicalOperator, LogicalPlan> groupByPlans
                                                     = cg.getGroupByPlans() ;
         List<LogicalOperator> inputs = cg.getInputs() ;
 
         // Type checking internal plans.
         for(int i=0;i < inputs.size(); i++) {
-
             LogicalOperator input = inputs.get(i) ;
             List<LogicalPlan> innerPlans
                         = new ArrayList<LogicalPlan>(groupByPlans.get(input)) ;
@@ -1554,26 +1642,125 @@
                     throw new VisitorException(msg) ;
                 }
 
-                checkInnerPlan(innerPlans.get(j), input) ;
+                checkInnerPlan(innerPlans.get(j)) ;
+            }
+
+        }
+
+        Schema schema = null ;
+        try {
+            // Compute the schema
+            schema = cg.getSchema() ;
+
+            if (!cg.isTupleGroupCol()) {
+                // merge all the inner plan outputs so we know what type
+                // our group column should be
+
+                // TODO: Don't recompute schema here
+                //byte groupType = schema.getField(0).type ;
+                byte groupType = getAtomicGroupByType(cg) ;
+
+                // go through all inputs again to add cast if necessary
+                for(int i=0;i < inputs.size(); i++) {
+                    LogicalOperator input = inputs.get(i) ;
+                    List<LogicalPlan> innerPlans
+                                = new ArrayList<LogicalPlan>(groupByPlans.get(input)) ;
+                    // Checking innerPlan size already done above
+                    byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
+                    if (innerType != groupType) {
+                        insertAtomicCastForCOGroupInnerPlan(innerPlans.get(0),
+                                                            cg,
+                                                            groupType) ;
+                    }
+                }
             }
+            else {
+
+                // TODO: Don't recompute schema here
+                //Schema groupBySchema = schema.getField(0).schema ;
+                Schema groupBySchema = getTupleGroupBySchema(cg) ;
+
+                // go through all inputs again to add cast if necessary
+                for(int i=0;i < inputs.size(); i++) {
+                    LogicalOperator input = inputs.get(i) ;
+                    List<LogicalPlan> innerPlans
+                                = new ArrayList<LogicalPlan>(groupByPlans.get(input)) ;
+                    for(int j=0;j < innerPlans.size(); j++) {
+                        LogicalPlan innerPlan = innerPlans.get(j) ;
+                        byte innerType = innerPlan.getSingleLeafPlanOutputType() ;
+                        byte expectedType = DataType.BYTEARRAY ;
+
+                        if (!DataType.isAtomic(innerType)) {
+                            String msg = "Sorry, group by complex types"
+                                       + " will be supported soon" ;
+                            msgCollector.collect(msg, MessageType.Error) ;
+                            VisitorException vse = new VisitorException(msg) ;
+                            throw vse ;
+                        }
+
+                        try {
+                            expectedType = groupBySchema.getField(j).type ;
+                        }
+                        catch(ParseException pe) {
+                            String msg = "Cannot resolve COGroup output schema" ;
+                            msgCollector.collect(msg, MessageType.Error) ;
+                            VisitorException vse = new VisitorException(msg) ;
+                            vse.initCause(pe) ;
+                            throw vse ;
+                        }
 
+                        if (innerType != expectedType) {
+                            insertAtomicCastForCOGroupInnerPlan(innerPlan,
+                                                                cg,
+                                                                expectedType) ;
+                        }
+                    }
+                }
+            }
+        }
+        catch (FrontendException fe) {
+            String msg = "Cannot resolve COGroup output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(fe) ;
+            throw vse ;
+        }
+        /*
+        catch (ParseException pe) {
+            String msg = "Cannot resolve COGroup output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(pe) ;
+            throw vse ;
         }
-       
+        */
+
+
+        // TODO: Don't recompute schema here. Remove all from here!
         // Generate output schema based on the schema generated from
         // COGroup itself
 
         try {
 
-            Schema schema = cg.getSchema() ;
+            Schema outputSchema = cg.getSchema() ;
+
+            // if the "group" col is atomic
+            if (!cg.isTupleGroupCol()) {
+                outputSchema.getField(0).type = getAtomicGroupByType(cg) ;
+            }
+            else {
+                outputSchema.getField(0).type = DataType.TUPLE ;
+                outputSchema.getField(0).schema = getTupleGroupBySchema(cg) ;
+            }
 
             for(int i=0; i< inputs.size(); i++) {
-                FieldSchema fs = schema.getField(i+1) ;
+                FieldSchema fs = outputSchema.getField(i+1) ;
                 fs.type = DataType.BAG ;
                 fs.schema = inputs.get(i).getSchema() ;
             }
 
             cg.setType(DataType.BAG) ;
-            cg.setSchema(schema) ;
+            cg.setSchema(outputSchema) ;
 
         }
         catch (FrontendException fe) {
@@ -1590,9 +1777,136 @@
             vse.initCause(pe) ;
             throw vse ;
         }
+
+
         
     }
 
+
+    // This helps insert casting to atomic types in COGroup's inner plans
+    // as a new leave of the plan
+    private void insertAtomicCastForCOGroupInnerPlan(LogicalPlan innerPlan,
+                                                     LOCogroup cg,
+                                                     byte toType) {
+        List<LogicalOperator> leaves = innerPlan.getLeaves() ;
+        if (leaves.size() > 1) {
+            throw new AssertionError("insertAtomicForCOGroupInnerPlan cannot be"
+                                + " used when there is more than 1 output port") ;
+        }
+        ExpressionOperator currentOutput = (ExpressionOperator) leaves.get(0) ;
+        collectCastWarning(cg, currentOutput.getType(), toType) ;
+        OperatorKey newKey = genNewOperatorKey(currentOutput) ;
+        LOCast cast = new LOCast(innerPlan, newKey, currentOutput, toType) ;
+        innerPlan.add(cast) ;
+        try {
+            innerPlan.connect(currentOutput, cast) ;
+        }
+        catch (PlanException ioe) {
+            AssertionError err =  new AssertionError("Explicit casting insertion") ;
+            err.initCause(ioe) ;
+            throw err ;
+        }
+    }
+
+        /**
+     * This can be used to get the merged type of output group col
+     * only when the group col is of atomic type
+     * TODO: This doesn't work with group by complex type
+     * @return
+     */
+    public byte getAtomicGroupByType(LOCogroup cg) throws VisitorException {
+        if (cg.isTupleGroupCol()) {
+            throw new AssertionError("getAtomicGroupByType is used only when"
+                                     + " dealing with atomic group col") ;
+        }
+        byte groupType = DataType.BYTEARRAY ;
+        // merge all the inner plan outputs so we know what type
+        // our group column should be
+        for(int i=0;i < cg.getInputs().size(); i++) {
+            LogicalOperator input = cg.getInputs().get(i) ;
+            List<LogicalPlan> innerPlans
+                        = new ArrayList<LogicalPlan>(cg.getGroupByPlans().get(input)) ;
+            if (innerPlans.size() != 1) {
+                throw new AssertionError("Each COGroup input has to have "
+                                         + "the same number of inner plans") ;
+            }
+            byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
+            groupType = DataType.mergeType(groupType, innerType) ;
+            if (groupType == DataType.ERROR) {
+                // We just warn about mismatch type in non-strict mode
+                if (!strictMode) {
+                    String msg = "COGroup by incompatible types results in ByteArray" ;
+                    msgCollector.collect(msg, MessageType.Warning) ;
+                    groupType = DataType.BYTEARRAY ;
+                }
+                // We just die if in strict mode
+                else {
+                    String msg = "COGroup by incompatible types" ;
+                    msgCollector.collect(msg, MessageType.Error) ;
+                    throw new VisitorException(msg) ;
+                }
+            }
+
+
+        }
+
+        return groupType ;
+
+    }
+
+    /*
+        This implementation is based on the assumption that all the
+        inputs have the same group col tuple arity.
+        TODO: This doesn't work with group by complex type
+     */
+    public Schema getTupleGroupBySchema(LOCogroup cg) throws VisitorException {
+        if (!cg.isTupleGroupCol()) {
+            throw new AssertionError("getTupleGroupBySchema is used only when"
+                                     + " dealing with tuple group col") ;
+        }
+
+        // this fsList represents all the columns in group tuple
+        List<Schema.FieldSchema> fsList = new ArrayList<Schema.FieldSchema>() ;
+
+        int outputSchemaSize = cg.getGroupByPlans().get(cg.getInputs().get(0)).size() ;
+
+        // by default, they are all bytearray
+        // for type checking, we don't care about aliases
+        for(int i=0; i<outputSchemaSize; i++) {
+            fsList.add(new Schema.FieldSchema(null, DataType.BYTEARRAY)) ;
+        }
+
+        // merge all the inner plan outputs so we know what type
+        // our group column should be
+        for(int i=0;i < cg.getInputs().size(); i++) {
+            LogicalOperator input = cg.getInputs().get(i) ;
+            List<LogicalPlan> innerPlans
+                        = new ArrayList<LogicalPlan>(cg.getGroupByPlans().get(input)) ;
+
+            for(int j=0;j < innerPlans.size(); j++) {
+                byte innerType = innerPlans.get(j).getSingleLeafPlanOutputType() ;
+                fsList.get(j).type = DataType.mergeType(fsList.get(j).type,
+                                                        innerType) ;
+                if (fsList.get(j).type == DataType.ERROR) {
+                    // We just warn about mismatch type in non-strict mode
+                    if (!strictMode) {
+                        String msg = "COGroup by incompatible types results in ByteArray" ;
+                        msgCollector.collect(msg, MessageType.Warning) ;
+                        fsList.get(j).type = DataType.BYTEARRAY ;
+                    }
+                    // We just die if in strict mode
+                    else {
+                        String msg = "COGroup by incompatible types" ;
+                        msgCollector.collect(msg, MessageType.Error) ;
+                        throw new VisitorException(msg) ;
+                    }
+                }
+            }
+        }
+
+        return new Schema(fsList) ;
+    }
+
     /***
      * Output schema of LOGenerate is a tuple schma
      * which is the output of all inner plans
@@ -1602,33 +1916,103 @@
      */
 
     protected void visit(LOGenerate g) throws VisitorException {
+
         List<LogicalPlan> plans = g.getGeneratePlans() ;
         List<Boolean> flattens = g.getFlatten() ;
 
-        /*
-        for(int i=0;i < plans.size(); i++) {
+        try {
 
-            LogicalPlan plan = plans.get(i) ;
+            // Have to resolve all inner plans before calling getSchema
+            int outputSchemaIdx = 0 ;
+            for(int i=0;i < plans.size(); i++) {
+
+                LogicalPlan plan = plans.get(i) ;
+
+                // Check that the inner plan has only 1 output port
+                if (!plan.isSingleLeafPlan()) {
+                    String msg = "Generate's expression plan can "
+                                 + " only have one output (leaf)" ;
+                    msgCollector.collect(msg, MessageType.Error) ;
+                    throw new VisitorException(msg) ;
+                }
+
+                List<LogicalOperator> rootList = plan.getRoots() ;
+                for(int j=0; j<rootList.size(); j++) {
+                    LogicalOperator innerRoot = rootList.get(j) ;
+                    // TODO: Support MAP dereference
+                    if (innerRoot instanceof LOProject) {
+                        resolveLOProjectType((LOProject) innerRoot) ;
+                    }
+                    else if (innerRoot instanceof LOConst) {
+                        // it's ok because LOConst always has
+                        // the right type information
+                    }
+                    else {
+                        throw new AssertionError("Unsupported root type in "
+                            +"LOGenerate:" + innerRoot.getClass().getSimpleName()) ;
+                    }
+                }
+
+                checkInnerPlan(plan) ;
 
-            // Check that the inner plan has only 1 output port
-            if (!plan.isSingleLeafPlan()) {
-                String msg = "Generate's expression plan can "
-                             + " only have one output (leaf)" ;
-                msgCollector.collect(msg, MessageType.Error) ;
-                throw new VisitorException(msg) ;
             }
 
-            checkInnerPlan(plan, ) ;
+            Schema schema = g.getSchema() ;
+
+            // Propagate type information from inner plans back to LOGenerate 
+            for(int i=0;i < plans.size(); i++) {
+
+                LogicalPlan plan = plans.get(i) ;
+                LogicalOperator leaf = plan.getSingleLeafPlanOutputOp() ;
+                // if it's atomic, we don't have to care about flattening

[... 243 lines stripped ...]