You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/15 21:13:42 UTC

svn commit: r656795 [2/3] - in /incubator/pig/branches/types: ./ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/schema/ src/org/apache/pig/impl/logicalLayer/validators/ src/org/apache/pig/impl/plan/ test/org/apache/pig/test/

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=656795&r1=656794&r2=656795&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu May 15 12:13:41 2008
@@ -2,10 +2,8 @@
 
 import java.util.Iterator;
 import java.util.List ;
-import java.util.ArrayList; 
-import java.io.IOException; 
+import java.util.ArrayList;
 
-import org.apache.hadoop.mapred.lib.FieldSelectionMapReduce;
 import org.apache.pig.impl.logicalLayer.LOConst;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -14,12 +12,9 @@
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.plan.PlanWalker;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType ;
 import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.*;
 import org.apache.pig.data.DataType ;
 
 import org.apache.commons.logging.Log;
@@ -42,10 +37,11 @@
         super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
         msgCollector = messageCollector ;
     }
-    
+
+    // Just in case caller is lazy
     @Override
     protected void visit(ExpressionOperator eOp)
-            throws VisitorException {
+                                throws VisitorException {
         if (eOp instanceof BinaryExpressionOperator) {
             visit((BinaryExpressionOperator) eOp) ;
         } 
@@ -61,8 +57,67 @@
         else if (eOp instanceof LOCast) {
             visit((LOCast) eOp) ;
         }
-        // TODO: A lot more "else" here for all LOs
+        else if (eOp instanceof LORegexp) {
+            visit((LORegexp) eOp) ;
+        }
+        else if (eOp instanceof LOUserFunc) {
+            visit((LOUserFunc) eOp) ;
+        }
+        else if (eOp instanceof LOProject) {
+            visit((LOProject) eOp) ;
+        }
+        else if (eOp instanceof LONegative) {
+            visit((LONegative) eOp) ;
+        }
+        else if (eOp instanceof LONot) {
+            visit((LONot) eOp) ;
+        }
+        // TODO: Check that all operators are included here
+    }
+
+
+    // Just in case caller is lazy
+    @Override
+    protected void visit(LogicalOperator lOp)
+                                throws VisitorException {
+        if (lOp instanceof LOLoad) {
+            visit((LOLoad) lOp) ;
+        }
+        else if (lOp instanceof LODistinct) {
+            visit((LODistinct) lOp) ;
+        }
+        else if (lOp instanceof LOFilter) {
+            visit((LOFilter) lOp) ;
+        }
+        else if (lOp instanceof LOUnion) {
+            visit((LOUnion) lOp) ;
+        }
+        else if (lOp instanceof LOSplit) {
+            visit((LOSplit) lOp) ;
+        }
+        else if (lOp instanceof LOSplitOutput) {
+            visit((LOSplitOutput) lOp) ;
+        }
+        else if (lOp instanceof LOCogroup) {
+            visit((LOCogroup) lOp) ;
+        }
+        else if (lOp instanceof LOSort) {
+            visit((LOSort) lOp) ;
+        }
+        // TODO: Check that all operators are included here
+    }
+
+
 
+    protected void visit(LOProject pj) throws VisitorException {
+        try {
+            pj.getFieldSchema() ;
+        }
+        catch (FrontendException fe) {
+            VisitorException vse = new VisitorException("Problem in LOProject") ;
+            vse.initCause(fe) ;
+            throw vse ;
+        }
     }
 
     /**
@@ -70,10 +125,54 @@
      * in the parsing stage so we don't need any logic here
      */
     @Override
-    protected void visit(LOConst cs) {
-   
+    protected void visit(LOConst cs)
+                        throws VisitorException {
+
     }
-    
+
+    /**
+     * LORegexp expects CharArray as input
+     * Itself always returns Boolean
+     * @param rg
+     */
+    @Override
+    protected void visit(LORegexp rg)
+            throws VisitorException {
+
+        // We allow BYTEARRAY to be converted to CHARARRAY
+        if (rg.getOperand().getType() == DataType.BYTEARRAY)
+        {
+            insertCastForRegexp(rg) ;
+        }
+
+        // Other than that if it's not CharArray just say goodbye
+        if (rg.getOperand().getType() != DataType.CHARARRAY)
+        {
+            String msg = "Operand of Regex can be CharArray only" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
+    }
+
+    private void insertCastForRegexp(LORegexp rg) {
+        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+        collectCastWarning(rg, DataType.BYTEARRAY, DataType.CHARARRAY) ;
+        OperatorKey newKey = genNewOperatorKey(rg) ;
+        LOCast cast = new LOCast(currentPlan, newKey, rg.getOperand(), DataType.CHARARRAY) ;
+        currentPlan.add(cast) ;
+        currentPlan.disconnect(rg.getOperand(), rg) ;
+        try {
+            currentPlan.connect(rg.getOperand(), cast) ;
+            currentPlan.connect(cast, rg) ;
+        } 
+        catch (PlanException ioe) {
+            AssertionError err =  new AssertionError("Explicit casting insertion") ;
+            err.initCause(ioe) ;
+            throw err ;
+        }
+        rg.setOperand(cast) ;
+    }
+
     /**
      * For all binary operators
      */
@@ -97,7 +196,9 @@
             
             if (  (lhsType != DataType.BOOLEAN)  ||
                   (rhsType != DataType.BOOLEAN)  ) {
-                throw new VisitorException("Operands of AND/OR can be boolean only") ;
+                String msg = "Operands of AND/OR can be boolean only" ;
+                msgCollector.collect(msg, MessageType.Error);
+                throw new VisitorException(msg) ;
             }
             
             binOp.setType(DataType.BOOLEAN) ;
@@ -112,44 +213,46 @@
         else if ( (binOp instanceof LOMultiply) ||
                   (binOp instanceof LODivide)  ) {
                      
-           if ( DataType.isNumberType(lhsType) &&
-                DataType.isNumberType(rhsType) ) {
-               
-               // return the bigger type
-               byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+            if ( DataType.isNumberType(lhsType) &&
+                 DataType.isNumberType(rhsType) ) {
                
-               // Cast smaller type to the bigger type
-               if (lhsType != biggerType) {            
-                   insertLeftCastForBinaryOp(binOp, biggerType) ;
-               } 
-               else if (rhsType != biggerType) { 
-                   insertRightCastForBinaryOp(binOp, biggerType) ;
-               }              
-               binOp.setType(biggerType) ;
-           } 
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (DataType.isNumberType(rhsType)) ) {
-               insertLeftCastForBinaryOp(binOp, rhsType) ;
-               // Set output type
-               binOp.setType(rhsType) ;
-           }
-           else if ( (rhsType == DataType.BYTEARRAY) &&
-                   (DataType.isNumberType(lhsType)) ) {
-               insertRightCastForBinaryOp(binOp, lhsType) ;
-               // Set output type
-               binOp.setType(lhsType) ;
-           }
-           else if ( (lhsType == DataType.BYTEARRAY) &&
-                     (rhsType == DataType.BYTEARRAY) ) {
-               // Cast both operands to double
-               insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
-               insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
-               // Set output type
-               binOp.setType(DataType.DOUBLE) ;
-           }
-           else {
-               throw new VisitorException("Cannot evaluate output type of Mul/Div Operator") ;
-           }
+                // return the bigger type
+                byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
+
+                // Cast smaller type to the bigger type
+                if (lhsType != biggerType) {
+                    insertLeftCastForBinaryOp(binOp, biggerType) ;
+                }
+                else if (rhsType != biggerType) {
+                    insertRightCastForBinaryOp(binOp, biggerType) ;
+                }
+                binOp.setType(biggerType) ;
+            }
+            else if ( (lhsType == DataType.BYTEARRAY) &&
+                      (DataType.isNumberType(rhsType)) ) {
+                insertLeftCastForBinaryOp(binOp, rhsType) ;
+                // Set output type
+                binOp.setType(rhsType) ;
+            }
+            else if ( (rhsType == DataType.BYTEARRAY) &&
+                    (DataType.isNumberType(lhsType)) ) {
+                insertRightCastForBinaryOp(binOp, lhsType) ;
+                // Set output type
+                binOp.setType(lhsType) ;
+            }
+            else if ( (lhsType == DataType.BYTEARRAY) &&
+                      (rhsType == DataType.BYTEARRAY) ) {
+                // Cast both operands to double
+                insertLeftCastForBinaryOp(binOp, DataType.DOUBLE) ;
+                insertRightCastForBinaryOp(binOp, DataType.DOUBLE) ;
+                // Set output type
+                binOp.setType(DataType.DOUBLE) ;
+            }
+            else {
+                String msg = "Cannot evaluate output type of Mul/Div Operator" ;
+                msgCollector.collect(msg, MessageType.Error);
+                throw new VisitorException(msg) ;
+            }
        }        
         
         /**
@@ -197,7 +300,9 @@
                 binOp.setType(DataType.DOUBLE) ;
             }
             else {
-                throw new VisitorException("Cannot evaluate output type of Add/Subtract Operator") ;
+                String msg = "Cannot evaluate output type of Add/Subtract Operator" ;
+                msgCollector.collect(msg, MessageType.Error);
+                throw new VisitorException(msg) ;
             }
         }
         
@@ -248,7 +353,10 @@
                 insertRightCastForBinaryOp(binOp, lhsType) ;
             }
             else {
-                throw new VisitorException("Cannot evaluate output type of Inequality Operator") ;
+                throw new VisitorException("Cannot evaluate output type of "
+                             + binOp.getClass().getSimpleName()
+                             + " LHS:" + DataType.findTypeName(lhsType)
+                             + " RHS:" + DataType.findTypeName(rhsType)) ;
             }
             
             binOp.setType(DataType.BOOLEAN) ;
@@ -310,7 +418,9 @@
                 // good
             }
             else {
-                  throw new VisitorException("Cannot evaluate output type of Equal/NotEqual Operator") ;
+                String msg = "Cannot evaluate output type of Equal/NotEqual Operator" ;
+                msgCollector.collect(msg, MessageType.Error);
+                throw new VisitorException(msg) ;
             }
             
             binOp.setType(DataType.BOOLEAN) ;
@@ -340,7 +450,9 @@
                 binOp.setType(rhsType) ;
             }          
             else {
-                throw new VisitorException("Cannot evaluate output type of Mod Operator") ;
+                String msg = "Cannot evaluate output type of Mod Operator" ;
+                msgCollector.collect(msg, MessageType.Error);
+                throw new VisitorException(msg) ;
             }
         }
         
@@ -348,13 +460,17 @@
      
     private void insertLeftCastForBinaryOp(BinaryExpressionOperator binOp,
                                            byte toType ) {
+        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+        collectCastWarning(binOp,
+                           binOp.getLhsOperand().getType(),
+                           toType) ;
         OperatorKey newKey = genNewOperatorKey(binOp) ;
-        LOCast cast = new LOCast(mPlan, newKey, binOp.getLhsOperand(), toType) ;
-        mPlan.add(cast) ;
-        mPlan.disconnect(binOp.getLhsOperand(), binOp) ;
+        LOCast cast = new LOCast(currentPlan, newKey, binOp.getLhsOperand(), toType) ;
+        currentPlan.add(cast) ;
+        currentPlan.disconnect(binOp.getLhsOperand(), binOp) ;
         try {
-            mPlan.connect(binOp.getLhsOperand(), cast) ;
-            mPlan.connect(cast, binOp) ;            
+            currentPlan.connect(binOp.getLhsOperand(), cast) ;
+            currentPlan.connect(cast, binOp) ;
         } 
         catch (PlanException ioe) {
             AssertionError err =  new AssertionError("Explicit casting insertion") ;
@@ -366,20 +482,24 @@
     
     private void insertRightCastForBinaryOp(BinaryExpressionOperator binOp,
                                             byte toType ) {
+        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+        collectCastWarning(binOp,
+                           binOp.getRhsOperand().getType(),
+                           toType) ;
         OperatorKey newKey = genNewOperatorKey(binOp) ;
-        LOCast cast = new LOCast(mPlan, newKey, binOp.getRhsOperand(), toType) ;
-        mPlan.add(cast) ;
-        mPlan.disconnect(binOp.getRhsOperand(), binOp) ;
+        LOCast cast = new LOCast(currentPlan, newKey, binOp.getRhsOperand(), toType) ;
+        currentPlan.add(cast) ;
+        currentPlan.disconnect(binOp.getRhsOperand(), binOp) ;
         try {
-            mPlan.connect(binOp.getRhsOperand(), cast) ;
-            mPlan.connect(cast, binOp) ;            
+            currentPlan.connect(binOp.getRhsOperand(), cast) ;
+            currentPlan.connect(cast, binOp) ;
         } 
         catch (PlanException ioe) {
             AssertionError err =  new AssertionError("Explicit casting insertion") ;
             err.initCause(ioe) ;
             throw err ;
         }               
-        binOp.setRhsOperand(cast) ;        
+        binOp.setRhsOperand(cast) ;
     }
     
     /** 
@@ -399,7 +519,9 @@
                 uniOp.setType(DataType.DOUBLE) ;              
             }
             else {
-                throw new VisitorException("NEG can be used with numbers or Bytearray only") ;
+                String msg = "NEG can be used with numbers or Bytearray only" ;
+                msgCollector.collect(msg, MessageType.Error);
+                throw new VisitorException(msg) ;
             }
         } 
         else if (uniOp instanceof LONot) {            
@@ -407,7 +529,9 @@
                 uniOp.setType(DataType.BOOLEAN) ;                
             }
             else {
-                throw new VisitorException("NOT can be used with boolean only") ;
+                String msg = "NOT can be used with boolean only" ;
+                msgCollector.collect(msg, MessageType.Error);
+                throw new VisitorException(msg) ;
             }
         }
         else {
@@ -418,35 +542,55 @@
     }
     
     private void insertCastForUniOp(UnaryExpressionOperator uniOp, byte toType) {
-        List<LogicalOperator> list = mPlan.getPredecessors(uniOp) ;
+        collectCastWarning(uniOp,
+                           uniOp.getOperand().getType(),
+                           toType) ;
+        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+        List<LogicalOperator> list = currentPlan.getPredecessors(uniOp) ;
         if (list==null) {
             throw new AssertionError("No input for " + uniOp.getClass()) ;
         }
         // All uniOps at the moment only work with Expression input
         ExpressionOperator input = (ExpressionOperator) list.get(0) ;                
         OperatorKey newKey = genNewOperatorKey(uniOp) ;
-        LOCast cast = new LOCast(mPlan, newKey, input, toType) ;
+        LOCast cast = new LOCast(currentPlan, newKey, input, toType) ;
         
-        mPlan.disconnect(input, uniOp) ;       
+        currentPlan.disconnect(input, uniOp) ;
         try {
-            mPlan.connect(input, cast) ;
-            mPlan.connect(cast, uniOp) ;
+            currentPlan.connect(input, cast) ;
+            currentPlan.connect(cast, uniOp) ;
         } 
         catch (PlanException ioe) {
             AssertionError err =  new AssertionError("Explicit casting insertion") ;
             err.initCause(ioe) ;
             throw err ;
-        }      
+        }
+
     }
     
-    // TODO: NOT DONE YET because LOUserFunc itself is not done!!!
+    // Currently there is no input type information support in UserFunc
+    // So we can just check if all inputs are not of any stupid type
     @Override
     protected void visit(LOUserFunc func) throws VisitorException {
-        // Visit each of the arguments
-        Iterator<ExpressionOperator> i = func.getArguments().iterator();
-        while (i.hasNext()) {
-            i.next().visit(this);
+
+        List<ExpressionOperator> list = func.getArguments() ;
+
+        // If the dependency graph is right, all the inputs
+        // must already know the types
+
+        for(ExpressionOperator op: list) {
+            if (!DataType.isUsableType(op.getType())) {
+                String msg = "Problem with input of User-defined function" ;
+                msgCollector.collect(msg, MessageType.Error);
+                throw new VisitorException(msg) ;
+            }
         }
+
+        /*
+        while (iterator.hasNext()) {
+            iterator.next().visit(this);          
+        }
+        */
     }
 
     /**
@@ -458,7 +602,9 @@
              
         // high-level type checking
         if (binCond.getCond().getType() != DataType.BOOLEAN) {
-            throw new VisitorException("Condition in BinCond must be boolean") ;
+            String msg = "Condition in BinCond must be boolean" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
         }       
         
         byte lhsType = binCond.getLhsOp().getType() ;
@@ -476,14 +622,18 @@
             binCond.setType(biggerType) ;
         }        
         else if (lhsType != rhsType) {
-            throw new VisitorException("Two inputs of BinCond do not have compatible types") ;
+            String msg = "Two inputs of BinCond do not have compatible types" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
         }
         
         // Matching schemas if we're working with tuples
         else if (lhsType == DataType.TUPLE) {            
             try {
                 if (!binCond.getLhsOp().getSchema().equals(binCond.getRhsOp().getSchema())) {
-                    throw new VisitorException("Two inputs of BinCond must have compatible schemas") ;
+                    String msg = "Two inputs of BinCond must have compatible schemas" ;
+                    msgCollector.collect(msg, MessageType.Error) ;
+                    throw new VisitorException(msg) ;
                 }
                 // TODO: We may have to merge the schema here
                 //       if the previous check is not exact match
@@ -492,13 +642,17 @@
                     binCond.setSchema(binCond.getLhsOp().getSchema()) ;
                 }
                 catch (ParseException pe) {
-                    VisitorException vse = new VisitorException("Problem during setting BinCond output schema") ;
+                    String msg = "Problem during setting BinCond output schema" ;
+                    msgCollector.collect(msg, MessageType.Error) ;
+                    VisitorException vse = new VisitorException(msg) ;
                     vse.initCause(pe) ;
                     throw vse ;
                 }
             } 
             catch (FrontendException ioe) {
-                VisitorException vse = new VisitorException("Problem during evaluating BinCond output type") ;
+                String msg = "Problem during evaluating BinCond output type" ;
+                msgCollector.collect(msg, MessageType.Error) ;
+                VisitorException vse = new VisitorException(msg) ;
                 vse.initCause(ioe) ;
                 throw vse ;
             }
@@ -511,19 +665,27 @@
             binCond.setType(DataType.CHARARRAY) ;
         }
         else {
-            throw new VisitorException("Unsupported input type for BinCond") ;
+            String msg = "Unsupported input type for BinCond" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new VisitorException(msg) ;
         }
 
     }
 
     private void insertLeftCastForBinCond(LOBinCond binCond, byte toType) {
+        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+
+        collectCastWarning(binCond,
+                           binCond.getLhsOp().getType(),
+                           toType) ;
+
         OperatorKey newKey = genNewOperatorKey(binCond) ;
-        LOCast cast = new LOCast(mPlan, newKey, binCond.getLhsOp(), toType) ;
-        mPlan.add(cast) ;
-        mPlan.disconnect(binCond.getLhsOp(), binCond) ;
+        LOCast cast = new LOCast(currentPlan, newKey, binCond.getLhsOp(), toType) ;
+        currentPlan.add(cast) ;
+        currentPlan.disconnect(binCond.getLhsOp(), binCond) ;
         try {
-            mPlan.connect(binCond.getLhsOp(), cast) ;
-            mPlan.connect(cast, binCond) ;            
+            currentPlan.connect(binCond.getLhsOp(), cast) ;
+            currentPlan.connect(cast, binCond) ;
         } 
         catch (PlanException ioe) {
             AssertionError err =  new AssertionError("Explicit casting insertion") ;
@@ -531,23 +693,31 @@
             throw err ;
         } 
         binCond.setLhsOp(cast) ;
+
     }
 
     private void insertRightCastForBinCond(LOBinCond binCond, byte toType) {
+        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+
+        collectCastWarning(binCond,
+                           binCond.getRhsOp().getType(),
+                           toType) ;
+
         OperatorKey newKey = genNewOperatorKey(binCond) ;
-        LOCast cast = new LOCast(mPlan, newKey, binCond.getRhsOp(), toType) ;
-        mPlan.add(cast) ;
-        mPlan.disconnect(binCond.getRhsOp(), binCond) ;
+        LOCast cast = new LOCast(currentPlan, newKey, binCond.getRhsOp(), toType) ;
+        currentPlan.add(cast) ;
+        currentPlan.disconnect(binCond.getRhsOp(), binCond) ;
         try {
-            mPlan.connect(binCond.getRhsOp(), cast) ;
-            mPlan.connect(cast, binCond) ;            
+            currentPlan.connect(binCond.getRhsOp(), cast) ;
+            currentPlan.connect(cast, binCond) ;
         } 
         catch (PlanException ioe) {
             AssertionError err =  new AssertionError("Explicit casting insertion") ;
             err.initCause(ioe) ;
             throw err ;
         }               
-        binCond.setRhsOp(cast) ;        
+        binCond.setRhsOp(cast) ;
+
     }
 
     /**
@@ -587,11 +757,12 @@
             // good
         }
         else {
-            throw new VisitorException("Cannot cast " 
-                                       + DataType.findTypeName(inputType)
-                                       + " to "
-                                       + DataType.findTypeName(expectedType)
-                                       ) ;    
+            String msg = "Cannot cast "
+                           + DataType.findTypeName(inputType)
+                           + " to "
+                           + DataType.findTypeName(expectedType) ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new VisitorException(msg) ; 
         }
         
         // cast.getType() already returns the correct type so don't have to 
@@ -603,103 +774,306 @@
     /***********************************************************************/
     /*                  Relational Operators                               */                                                             
     /***********************************************************************/
+    /*
+        All the getType() of these operators always return BAG.
+        We just have to :-
+        1) Check types of inputs, inner plans
+        2) Compute output schema with type information
+           (At the moment, the parser does only return GetSchema with correct aliases)
+        3) Insert casting if necessary
+
+     */
+
+    /*
+        The output schema of LOUnion is the merge of all input schemas.
+        Operands on left side always take precedance on aliases.
+
+        We allow type promotion here
+    */
+
+    @Override
+    protected void visit(LOUnion u) throws VisitorException {
+        List<LogicalOperator> inputs =  u.getInputs() ;
+
+        // There is no point to union only one operand
+        // that should be a problem in the parser
+        if (inputs.size() < 2) {
+            AssertionError err =  new AssertionError("Union with Count(Operand) < 2") ;
+        }
+
+        try {
+            Schema schema = inputs.get(0).getSchema() ;
+            
+            // Keep merging one by one
+            for (int i=1; i< inputs.size() ;i++) {
+                // Assume the first input's aliases take precedance
+                schema = schema.merge(inputs.get(i).getSchema(), false) ;
+                // if they cannot be merged, we just give up
+                if (schema == null) {
+                    String msg = "cannot merge schemas from inputs of UNION" ;
+                    msgCollector.collect(msg, MessageType.Error) ;
+                    throw new VisitorException(msg) ;
+                }
+            }
+
+            try {
+                u.setSchema(schema);
+            }
+            catch (ParseException pe) {
+                // This should never happen
+                AssertionError err =  new AssertionError("problem with computing UNION schema") ;
+                err.initCause(pe) ;
+                throw err ;
+            }
+
+            // Insert casting to inputs if necessary
+
+            for (int i=0; i< inputs.size() ;i++) {
+                insertCastForEachInBetweenIfNecessary(inputs.get(i), u, schema);
+            }
+
+
+        }
+        catch (FrontendException fee) {
+            // I don't quite understand how this can happen
+            // Anyway, just throw an exception to be on the safe side
+            String msg = "Problem while reading schemas from inputs of UNION" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            //vse.initCause(fee) ;
+            throw vse ;
+        }
+
+    }
+
+    @Override
+    protected void visit(LOSplitOutput op) throws VisitorException {
+        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+
+        // LOSplitOutput can only have 1 input
+        List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
+        if (list.size() != 1) {
+            throw new AssertionError("LOSplitOutput can only have 1 input") ;
+        }
+
+        try {
+            op.setSchema(list.get(0).getSchema()) ;
+        } 
+        catch (ParseException pe) {
+            String msg = "Problem while reading schemas from"
+                         + " inputs of LOSplitOutput " ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(pe) ;
+            throw vse ;
+        }
+        catch (FrontendException fe) {
+            String msg = "Problem while reading"
+                         + " schemas from inputs of LOSplitOutput" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(fe) ;
+            throw vse ;
+        }
+    }
+
 
     /***
+     *  LODistinct, output schema should be the same as input
+     * @param op
+     * @throws VisitorException
+     */
+    
+    @Override
+    protected void visit(LODistinct op) throws VisitorException {
+        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+        List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
+        // LOSplitOutput can only have 1 input
+
+        try {
+            op.setSchema(list.get(0).getSchema()) ;
+        }
+        catch (ParseException pe) {
+            String msg = "Problem while reading"
+                         + " schemas from inputs of LODistinct" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(pe) ;
+            throw vse ;
+        }
+        catch (FrontendException fe) {
+            String msg = "Problem while reading"
+                         + " schemas from inputs of LODistinct" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(fe) ;
+            throw vse ;
+        }
+    }
+
+    /***
+     * Return concatenated of all fields from all input operators
+     * @param cs
+     * @throws VisitorException
+     */
+    protected void visit(LOCross cs) throws VisitorException {
+        List<LogicalOperator> inputs = cs.getInputs() ;
+        List<FieldSchema> fsList = new ArrayList<FieldSchema>() ;
+
+        try {
+            for(LogicalOperator op: inputs) {
+                // All of inputs are relational operators
+                // so we can access getSchema()
+                Schema inputSchema = op.getSchema() ;
+                for(int i=0; i < inputSchema.size(); i++) {
+                    // For types other than tuple
+                    if (inputSchema.getField(0).type != DataType.TUPLE) {
+                        fsList.add(new FieldSchema(inputSchema.getField(0).alias,
+                                                   inputSchema.getField(0).type)) ;
+                    }
+                    // For tuple type
+                    else {
+                        fsList.add(new FieldSchema(inputSchema.getField(0).alias,
+                                                   inputSchema.getField(0).schema)) ;
+                    }
+                }
+            }
+        }
+        catch (ParseException pe) {
+            String msg = "Problem while reading"
+                         + " schemas from inputs of CROSS" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(pe) ;
+            throw vse ;
+        }
+        catch (FrontendException fe) {
+            String msg = "Problem while reading"
+                        + " schemas from inputs of CROSS" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(fe) ;
+            throw vse ;
+        }
+
+        try {
+            cs.setSchema(new Schema(fsList));
+        }
+        catch (ParseException pe) {
+            String msg = "Problem while reading"
+                        + " schemas from inputs of CROSS" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(pe) ;
+            throw vse ;
+        }
+    }
+    
+    /***
      * The schema of sort output will be the same as sort input.
      *
      */
-    
-    /*
+
     protected void visit(LOSort s) throws VisitorException {
-       
-        // TODO: Why doesn't LOSort have getInput() ???
-        List<LogicalOperator> inputList = mPlan.getPredecessors(s) ;
-        
-        if (inputList.size() != 1) {
-            throw new AssertionError("LOSort cannot have more than one input") ;
-        }       
-        
-        LogicalOperator input = inputList.get(0) ;
+
+        LogicalOperator input = s.getInput() ;
         
         // Type checking internal plans.
         for(int i=0;i < s.getSortColPlans().size(); i++) {
             
-            LogicalPlan sortColPlan = s.getSortColPlans().get(i) ;                     
-            checkInnerPlan(sortColPlan, input.getSchema()) ;
-            
-            // Check that the inner plan has only 1 output
+            LogicalPlan sortColPlan = s.getSortColPlans().get(i) ;
+
+            // Check that the inner plan has only 1 output port
             if (!sortColPlan.isSingleLeafPlan()) {
-                throw new VisitorException("Sort's inner plans can only have one output (leaf)") ;
+                String msg = "LOSort's sort plan can only have one output (leaf)" ;
+                msgCollector.collect(msg, MessageType.Error) ;
+                throw new VisitorException(msg) ;
             }
+
+            checkInnerPlan(sortColPlan, input) ;
+
+            // TODO: May have to check SortFunc compatibility here in the future
                        
         }
         
-        s.setType(input.getType()) ;  // This should be bag always.    
-        
-        // I assume we can enforce schema in Bags, so this has to be done
-        try { 
+        s.setType(input.getType()) ;  // This should be bag always.
+
+        try {
             s.setSchema(input.getSchema()) ;
-        } 
-        catch (IOException ioe) {
-            VisitorException vse = new VisitorException("Problem while checking sort schema") ;
+        }
+        catch (FrontendException ioe) {
+            String msg = "Problem while reconciling output schema of LOSort" ;
+            msgCollector.collect(msg, MessageType.Error);
+            VisitorException vse = new VisitorException(msg) ;
             vse.initCause(ioe) ;
             throw vse ;
         }
         // TODO: Is this ParseException applicable ?
         catch (ParseException pe) {
-            VisitorException vse = new VisitorException("Problem while checking sort schema") ;
+            String msg = "Problem while reconciling output schema of LOSort" ;
+            msgCollector.collect(msg, MessageType.Error);
+            VisitorException vse = new VisitorException(msg) ;
             vse.initCause(pe) ;
             throw vse ;
         }
     }
-    */
+
+
     /***
      * The schema of filter output will be the same as filter input
      */
-    /*
+
+    @Override
     protected void visit(LOFilter filter) throws VisitorException {
-              
+
         LogicalOperator input = filter.getInput() ;
-        LogicalPlan condPlan = filter.getConditionPlan() ;
-        checkInnerPlan(condPlan, input.getSchema()) ;
+        LogicalPlan comparisonPlan = filter.getComparisonPlan() ;
         
-        // Check that the inner plan has only 1 output
-        if (!condPlan.isSingleLeafPlan()) {
-            throw new VisitorException("Filter's cond plan can only have one output (leaf)") ;
+        // Check that the inner plan has only 1 output port
+        if (!comparisonPlan.isSingleLeafPlan()) {
+            String msg = "Filter's cond plan can only have one output (leaf)" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new VisitorException(msg) ;
         }
+
+        checkInnerPlan(comparisonPlan, input) ;
               
-        // TODO: We don't have a way to getOutputType of inner plan yet!
-        byte innerCondType = condPlan.getLeaves().get(0).getType() ;
+        byte innerCondType = comparisonPlan.getLeaves().get(0).getType() ;
         if (innerCondType != DataType.BOOLEAN) {
-            throw new VisitorException("Filter's condition must be boolean") ;
+            String msg = "Filter's condition must evaluate to boolean" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            throw new VisitorException(msg) ;
         }       
         
         filter.setType(input.getType()) ; // This should be bag always.
-        
-        // I assume we can enforce schema in Bags, so this has to be done       
+
         try { 
             filter.setSchema(input.getSchema()) ;
         } 
-        catch (IOException ioe) {
-            VisitorException vse = new VisitorException("Problem while reconciling output schema") ;
+        catch (FrontendException ioe) {
+            String msg = "Problem while reconciling output schema of LOFilter" ;
+            msgCollector.collect(msg, MessageType.Error);
+            VisitorException vse = new VisitorException(msg) ;
             vse.initCause(ioe) ;
             throw vse ;
         }
         // TODO: Is this ParseException applicable ?
         catch (ParseException pe) {
-            VisitorException vse = new VisitorException("Problem while reconciling output schema") ;
+            String msg = "Problem while reconciling output schema of LOFilter" ;
+            msgCollector.collect(msg, MessageType.Error);
+            VisitorException vse = new VisitorException(msg) ;
             vse.initCause(pe) ;
             throw vse ;
-        };
-        
+        }
+
     }
-    */
+
     /***
      * The schema of split output will be the same as split input
      */
-    /*
+
     protected void visit(LOSplit split) throws VisitorException {
-     
+
         // TODO: Why doesn't LOSplit have getInput() ???
         List<LogicalOperator> inputList = mPlan.getPredecessors(split) ;
         
@@ -710,42 +1084,53 @@
         LogicalOperator input = inputList.get(0) ;
         
         // Checking internal plans.
+        ArrayList<LogicalPlan> condPlans
+                        = new ArrayList<LogicalPlan>(split.getConditionPlans()) ;
+
         for(int i=0;i < split.getConditionPlans().size(); i++) {
             
-            LogicalPlan condPlan = split.getConditionPlans().get(i) ;            
-            checkInnerPlan(condPlan, input.getSchema()) ;
-                  
-            // Check that the inner plan has only 1 output
+            LogicalPlan condPlan = condPlans.get(i) ;
+
+            // Check that the inner plan has only 1 output port
             if (!condPlan.isSingleLeafPlan()) {
-                throw new VisitorException("Split's inner plans can only have one output (leaf)") ;
+                String msg = "Split's cond plan can only have one output (leaf)" ;
+                msgCollector.collect(msg, MessageType.Error) ;
+                throw new VisitorException(msg) ;
             }
             
+            checkInnerPlan(condPlan, input) ;
+                  
             byte innerCondType = condPlan.getLeaves().get(0).getType() ;
             if (innerCondType != DataType.BOOLEAN) {
-                throw new VisitorException("Split's conditions must be boolean") ;
+                String msg = "Split's condition must evaluate to boolean" ;
+                msgCollector.collect(msg, MessageType.Error) ;
+                throw new VisitorException(msg) ;
             }
                        
         }         
         
         split.setType(input.getType()) ; // This should be bag always
         
-        // I assume we can enforce schema in Bags, so this has to be done 
-        try { 
+        try {
             split.setSchema(input.getSchema()) ;
-        } 
-        catch (IOException ioe) {
-            VisitorException vse = new VisitorException("Problem while checking sort schema") ;
+        }
+        catch (FrontendException ioe) {
+            String msg = "Problem while reconciling output schema of LOSplit" ;
+            msgCollector.collect(msg, MessageType.Error);
+            VisitorException vse = new VisitorException(msg) ;
             vse.initCause(ioe) ;
             throw vse ;
         }
         // TODO: Is this ParseException applicable ?
         catch (ParseException pe) {
-            VisitorException vse = new VisitorException("Problem while checking sort schema") ;
+            String msg = "Problem while reconciling output schema of LOSplit" ;
+            msgCollector.collect(msg, MessageType.Error);
+            VisitorException vse = new VisitorException(msg) ;
             vse.initCause(pe) ;
             throw vse ;
-        };
+        }
     }
-    */
+
     /**
      * The output schema will be generated.
      */
@@ -793,54 +1178,111 @@
      * Still have questions about LOCOGroup internal structure
      * so this may not look quite right
      */
-    /*
     protected void visit(LOCogroup cg) throws VisitorException {
 
-        // Type checking internal plans.
         // TODO: Do all the GroupBy cols from all inputs have
-        //       to have the same type??? I assume "no"
-        for(int i=0;i < cg.getGroupByPlans().size(); i++) {
-            
-            LogicalPlan groupColPlan = cg.getGroupByPlans().get(i) ;
-            // TODO: Question!!!, why getInputs() here not returning 
-            // List<LogicalOperator> in the latest implementation ???
-            LogicalOperator input = cg.getInputs().get(i) ;
-            checkInnerPlan(groupColPlan, input.getSchema()) ;
-            
-            // Check that the inner plan has only 1 output
-            // If we group by more than one fields, this should be tuple
-            if (!groupColPlan.isSingleLeafPlan()) {
-                throw new VisitorException("Cogroup's inner plans can only have one output (leave)") ;
+        // TODO: to have the same type??? I assume "no"
+
+        MultiMap<LogicalOperator, LogicalPlan> groupByPlans
+                                                    = cg.getGroupByPlans() ;
+        List<LogicalOperator> inputs = cg.getInputs() ;
+
+        // Type checking internal plans.
+        for(int i=0;i < inputs.size(); i++) {
+
+            LogicalOperator input = inputs.get(i) ;
+            List<LogicalPlan> innerPlans
+                        = new ArrayList<LogicalPlan>(groupByPlans.get(input)) ;
+
+            for(int j=0; j < innerPlans.size(); j++) {
+
+                LogicalPlan innerPlan = innerPlans.get(j) ;
+                
+                // Check that the inner plan has only 1 output port
+                if (!innerPlan.isSingleLeafPlan()) {
+                    String msg = "COGroup's inner plans can only"
+                                 + "have one output (leaf)" ;
+                    msgCollector.collect(msg, MessageType.Error) ;
+                    throw new VisitorException(msg) ;
+                }
+
+                checkInnerPlan(innerPlans.get(j), input) ;
             }
-             
-            
+
         }
        
-        // Generate output schema
-        List<FieldSchema> fsList = new ArrayList<FieldSchema>() ;
-        fsList.add(new FieldSchema("group", DataType.UNKNOWN)) ;
-        for(int i=0;i < cg.getInputs().size(); i++) {
-            FieldSchema fs = new FieldSchema(cg.getInputs().getAlias(), DataType.BAG) ;
-            fsList.add(fs) ;
-            // TODO: How do we transfer Bag's Schema in this case?
+        // Generate output schema based on the schema generated from
+        // COGroup itself
+
+        try {
+
+            Schema schema = cg.getSchema() ;
+
+            for(int i=0; i< inputs.size(); i++) {
+                FieldSchema fs = schema.getField(i+1) ;
+                fs.type = DataType.BAG ;
+                fs.schema = inputs.get(i).getSchema() ;
+            }
+
+            cg.setType(DataType.BAG) ;
+            cg.setSchema(schema) ;
+
+        }
+        catch (FrontendException fe) {
+            String msg = "Cannot resolve COGroup output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(fe) ;
+            throw vse ;
+        }
+        catch (ParseException pe) {
+            String msg = "Cannot resolve COGroup output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(pe) ;
+            throw vse ;
         }
         
-        cg.setType(DataType.BAG) ;
-        cg.setSchema(new Schema(fsList)) ;
     }
-    */
 
-    // TODO: NOT DONE YET
-    //COmmenting out this method as its not using the new APIs
-    /*
+    /***
+     * Output schema of LOGenerate is a tuple schma
+     * which is the output of all inner plans
+     *
+     * Flatten also has to be taken care on in here
+     *
+     */
+
     protected void visit(LOGenerate g) throws VisitorException {
-        // Visit each of generates projection elements.
-        Iterator<ExpressionOperator> i = g.getProjections().iterator();
-        while (i.hasNext()) {
-            i.next().visit(this);
+        List<LogicalPlan> plans = g.getGeneratePlans() ;
+        List<Boolean> flattens = g.getFlatten() ;
+
+        /*
+        for(int i=0;i < plans.size(); i++) {
+
+            LogicalPlan plan = plans.get(i) ;
+
+            // Check that the inner plan has only 1 output port
+            if (!plan.isSingleLeafPlan()) {
+                String msg = "Generate's expression plan can "
+                             + " only have one output (leaf)" ;
+                msgCollector.collect(msg, MessageType.Error) ;
+                throw new VisitorException(msg) ;
+            }
+
+            checkInnerPlan(plan, ) ;
+
+            byte innerCondType = condPlan.getLeaves().get(0).getType() ;
+            if (innerCondType != DataType.BOOLEAN) {
+                String msg = "Split's condition must evaluate to boolean" ;
+                msgCollector.collect(msg, MessageType.Error) ;
+                throw new VisitorException(msg) ;
+            }
+
         }
+        */
+
     }
-    */
     
     /***
      * This does:-
@@ -849,13 +1291,13 @@
      * 2) Type checking of the inner plan
      * NOTE: This helper method only supports one source schema
      * 
-     * @param innerPlan
-     * @param inputSchema
+     * @param innerPlan  the inner plan
+     * @param srcOuterOp the source data operator
      * @throws VisitorException
      */
-    /*
+
     private void checkInnerPlan(LogicalPlan innerPlan,
-                                Schema inputSchema) 
+                                LogicalOperator srcOuterOp)
                                     throws VisitorException {
         // Preparation
         int errorCount = 0 ;     
@@ -863,15 +1305,39 @@
         if (rootList.size() < 1) {
             throw new AssertionError("Inner plan is poorly constructed") ;
         }
-        
+
+        Schema inputSchema = null ;
+        try {
+            inputSchema = srcOuterOp.getSchema() ;
+        }
+        catch(FrontendException fe) {
+            String msg = "Cannot not get schema out of "
+                         + srcOuterOp.getClass().getSimpleName() ;
+            msgCollector.collect(msg, MessageType.Error);
+            throw new VisitorException(msg) ;
+        }
+
         // Actual checking
         for(LogicalOperator op: rootList) {
             // TODO: Support map dereference
             if (op instanceof LOProject) {
                 // Get the required field from input operator's schema
                 // Copy type and schema (if any) to the inner plan root
-                // TODO: Is this the correct way of getting a column name from LOProject ???
-                FieldSchema fs = inputSchema.getField(((LOProject)op).getColumnName()) ;
+                LOProject project = (LOProject)op ;
+
+                FieldSchema fs = null ;
+                try {
+                    fs = inputSchema.getField(project.getCol()) ;
+                }
+                catch(ParseException pe) {
+                    String msg = "Cannot not get schema out of "
+                                 + srcOuterOp.getClass().getSimpleName() ;
+                    msgCollector.collect(msg, MessageType.Error);
+                    VisitorException vse = new VisitorException(msg) ;
+                    vse.initCause(pe) ;
+                    throw vse ;
+                }
+
                 if (fs != null) {
                     op.setType(fs.type) ;
                     if (fs.type == DataType.TUPLE) {
@@ -879,9 +1345,10 @@
                             op.setSchema(fs.schema) ;
                         } 
                         catch (ParseException pe) {
-                            VisitorException vse = new VisitorException(
-                                "A schema from a field in input tuple cannot "
-                                + " be reconciled with inner plan schema ") ;
+                            String msg = "A schema from a field in input tuple cannot "
+                                         + " be reconciled with inner plan schema " ;
+                            msgCollector.collect(msg, MessageType.Error);
+                            VisitorException vse = new VisitorException(msg) ;
                             vse.initCause(pe) ;
                             throw vse ;
                         }
@@ -891,7 +1358,10 @@
                 else {
                     errorCount++ ;
                 }
-            } 
+            }
+            else if (op instanceof LOConst) {
+                // don't have to do anything
+            }
             else {
                 throw new AssertionError("Unsupported root operator in inner plan") ;
             }
@@ -900,8 +1370,9 @@
         // Throw an exception if we found errors
         if (errorCount > 0) {
             // TODO: Should indicate the field names or indexes here
-            VisitorException vse = new VisitorException(
-                    "Some required fields in inner plan cannot be found in input") ;
+            String msg =  "Some required fields in inner plan cannot be found in input" ;
+            msgCollector.collect(msg, MessageType.Error);
+            VisitorException vse = new VisitorException(msg) ;
             throw vse ;
         }
         
@@ -913,8 +1384,187 @@
         popWalker() ;
         
     }
-       */
-    
+       
+
+    /***
+     * For casting insertion for relational operators
+     * only if it's necessary
+     * Currently this only does "shallow" casting
+     * @param fromOp
+     * @param toOp
+     * @param targetSchema array of target types
+     */
+    private void insertCastForEachInBetweenIfNecessary(LogicalOperator fromOp,
+                                                       LogicalOperator toOp,
+                                                       Schema targetSchema)
+                                                    throws VisitorException {
+        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+
+        // Make sure that two operators are in the same plan
+        if (fromOp.getPlan() != toOp.getPlan()) {
+            throw new AssertionError("Two operators have toOp be in the same plan") ;
+        }
+        // Mare sure that they are in the plan we're looking at
+        if (fromOp.getPlan() != toOp.getPlan()) {
+            throw new AssertionError("Cannot manipulate any other plan"
+                                    +" than the current one") ;
+        }
+
+        // Make sure that they are adjacent and the direction
+        // is from "fromOp" to "toOp"
+        List<LogicalOperator> preList = currentPlan.getPredecessors(toOp) ;
+        boolean found = false ;
+        for(LogicalOperator tmpOp: preList) {
+            // compare by reference
+            if (tmpOp == fromOp) {
+                found = true ;
+                break ;
+            }
+        }
+
+        if (!found) {
+            throw new AssertionError("Two operators are not adjacent") ;
+        }
+
+        // retrieve input schema to be casted
+        // this will be used later
+        Schema fromSchema = null ;
+        try {
+            fromSchema = fromOp.getSchema() ;
+        }
+        catch(FrontendException fe) {
+            AssertionError err =  new AssertionError("Cannot get schema from"
+                                                     + " input operator") ;
+            err.initCause(fe) ;
+            throw err ;
+        }
+
+        // make sure the supplied targetSchema has the same number of members
+        // as number of output fields from "fromOp"
+        if (fromSchema.size() != targetSchema.size()) {
+            throw new AssertionError("Invalid input parameters in cast insert") ;
+        }
+
+        // Compose the new inner plan to be used in ForEach
+        LogicalPlan foreachPlan = new LogicalPlan() ;
+
+        // Plans inside Generate. Fields that do not need casting will only
+        // have Project. Fields that need casting will have Project + Cast
+        ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>() ;
+
+        int castNeededCounter = 0 ;
+        for(int i=0;i < fromSchema.size(); i++) {
+
+            LogicalPlan genPlan = new LogicalPlan() ;
+            // TODO: should the LogicalOp here be null in this case?
+            LOProject project = new LOProject(genPlan,
+                                                 genNewOperatorKey(fromOp),
+                                                 null,
+                                                 i) ;
+            genPlan.add(project);
+
+            // add casting if necessary by comparing target types
+            // to the input schema
+            FieldSchema fs = null ;
+            try {
+                fs = fromSchema.getField(i) ;
+            }
+            catch(ParseException pe) {
+                String msg = "Problem while reading"
+                                + " field schema from input while"
+                                + " insert casting " ;
+                msgCollector.collect(msg, MessageType.Error) ;
+                VisitorException vse = new VisitorException(msg) ;
+                vse.initCause(pe) ;
+                throw vse ;
+            }
+
+            // This only does "shallow checking"
+
+            byte inputFieldType ;
+
+            try {
+                inputFieldType = targetSchema.getField(i).type ;
+            }
+            catch (ParseException e) {
+                throw new AssertionError("Cannot get field type") ;
+            }
+
+            if (inputFieldType != fs.type) {
+                castNeededCounter++ ;
+                LOCast cast = new LOCast(genPlan,
+                                         genNewOperatorKey(fromOp),
+                                         project,
+                                         inputFieldType) ;
+                genPlan.add(cast) ;
+                try {
+                    genPlan.connect(project, cast);
+                }
+                catch (PlanException pe) {
+                    // This should never happen
+                    throw new AssertionError("unpected plan exception while insert casting") ;
+                }
+            }
+
+            generatePlans.add(genPlan) ;
+
+        }
+
+        // if we really need casting
+        if (castNeededCounter > 0)  {
+            // Flatten List
+            // This is just cast insertion so we don't have any flatten
+            ArrayList<Boolean> flattenList = new ArrayList<Boolean>() ;
+            for(int i=0;i < targetSchema.size(); i++) {
+                flattenList.add(new Boolean(false)) ;
+            }
+
+            LOGenerate generate = new LOGenerate(currentPlan, genNewOperatorKey(fromOp),
+                                                 generatePlans, flattenList) ;
+
+            foreachPlan.add(generate) ;
+
+            // Create ForEach to be inserted
+            LOForEach foreach = new LOForEach(currentPlan, genNewOperatorKey(fromOp), foreachPlan) ;
+
+            // Manipulate the plan structure
+            currentPlan.add(foreach);
+            currentPlan.disconnect(fromOp, toOp) ;
+
+            try {
+                currentPlan.connect(fromOp, foreach);
+                currentPlan.connect(foreach, toOp);
+            }
+            catch (PlanException pe) {
+                AssertionError err = new AssertionError("Problem wiring the plan while insert casting") ;
+                err.initCause(pe) ;
+                throw err ;
+            }
+        }
+        else {
+            log.debug("Tried to insert relational casting when not necessary");
+        }
+    }
+
+    /***
+     * Helper for collecting warning when casting is inserted
+     * to the plan (implicit casting)
+     *
+     * @param op
+     * @param originalType
+     * @param toType
+     */
+    private void collectCastWarning(LogicalOperator op,
+                                   byte originalType,
+                                   byte toType) {
+        String originalTypeName = DataType.findTypeName(originalType) ;
+        String toTypeName = DataType.findTypeName(toType) ;
+        String opName = op.getClass().getSimpleName() ;
+        msgCollector.collect(originalTypeName + " is implicitly casted to "
+                             + toTypeName +" under " + opName + " Operator",
+                             MessageType.Warning) ;
+    }
+
     /***
      * We need the neighbor to make sure that the new key is in the same scope
      */
@@ -924,7 +1574,8 @@
         return new OperatorKey(scope, newId) ;
     }
     
-    protected void visit(LOLoad load) {
+    protected void visit(LOLoad load)
+                        throws VisitorException {
         // do nothing
     }
     

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidator.java?rev=656795&r1=656794&r2=656795&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidator.java Thu May 15 12:13:41 2008
@@ -29,7 +29,7 @@
      * If there are errors during validation, all of the errors have
      * to be collected in the supplied messageCollector. The exception should
      * be thrown only when the validation logic finds something too bad 
-     * that the other validation logics should not try to do more work.
+     * that other validation logics should not try to do more work.
      * 
      */
     public abstract void validate(P plan, CompilationMessageCollector messageCollector) 
@@ -37,8 +37,8 @@
     
     /**
      * This convenient method is used when: 
-     * - if an exception is thrown from the current validation logic, 
-     * the whole validation pipeline should stop.
+     * - if an exception being thrown from the current validation logic
+     *   indicates that the whole validation pipeline should stop.
      * @param visitor
      * @param messageCollector
      * @throws PlanValidationException
@@ -60,9 +60,10 @@
     
     /**
      * This convenient method is used when: 
-     * - if an exception is thrown from the current validation logic, 
-     * the whole validation pipeline should keep going by continuing
-     * with the next validation logic in the pipeline
+     * - if an exception being thrown from the current validation logic
+     *   indicates that the whole validation pipeline should keep going
+     *   by continuing with the next validation logic in the pipeline
+     *   (skip the rest of the current logic)
      * @param visitor
      * @param messageCollector
      * @throws PlanValidationException
@@ -80,4 +81,29 @@
         }
     }
 
+    /**
+     * This convenient method is used when: 
+     * - if an exception being thrown from the current validation logic
+     *   indicates that the whole validation pipeline should stop.
+     *
+     *   This method also assumes that the appropriate error message
+     *   has already been recorded in the message collector so
+     *   there is no need to duplicate the error message again here.
+     *
+     * @param visitor
+     * @param messageCollector
+     * @throws PlanValidationException
+     */
+    protected void validateSkipCollectException(PlanVisitor<O, P> visitor,
+                            CompilationMessageCollector messageCollector)
+                                             throws PlanValidationException {
+        try {
+            visitor.visit() ;
+        }
+        catch(VisitorException ve) {
+            throw new PlanValidationException("An unexpected exception caused "
+                                              + "the validation to stop", ve) ;
+        }
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java?rev=656795&r1=656794&r2=656795&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java Thu May 15 12:13:41 2008
@@ -56,6 +56,9 @@
      */
     public abstract PlanWalker<O, P> spawnChildWalker(P plan);
 
+    public P getPlan() {
+        return mPlan ;
+    }
 }