You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/02/03 02:33:39 UTC

svn commit: r1066717 - in /pig/trunk: src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/logical/expression/ src/org/apache/pig/newplan/logical/visitor/ src/org/apache/pig/parser/ test/org/apache/pig/parser/

Author: thejas
Date: Thu Feb  3 01:33:38 2011
New Revision: 1066717

URL: http://svn.apache.org/viewvc?rev=1066717&view=rev
Log:
PIG-1618: Switch to new parser generator technology - NewParser-15.patch - (xuefuz via thejas)

Added:
    pig/trunk/src/org/apache/pig/newplan/logical/expression/ScalarExpression.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
    pig/trunk/src/org/apache/pig/parser/InvalidScalarProjectionException.java
    pig/trunk/test/org/apache/pig/parser/TestScalarVisitor.java
Modified:
    pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/ConstantExpression.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/ColumnAliasConversionVisitor.java
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g
    pig/trunk/test/org/apache/pig/parser/TestColumnAliasConversion.java

Modified: pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java Thu Feb  3 01:33:38 2011
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.pig.impl.logicalLayer.ExpressionOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -79,6 +78,7 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.OrExpression;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
 import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.ScalarExpression;
 import org.apache.pig.newplan.logical.expression.SubtractExpression;
 import org.apache.pig.newplan.logical.expression.UserFuncExpression;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
@@ -215,10 +215,14 @@ public class LogicalExpPlanMigrationVist
     }
 
     public void visit(LOUserFunc op) throws VisitorException {
-        UserFuncExpression exp = new UserFuncExpression(exprPlan, op.getFuncSpec());
+        UserFuncExpression exp = null;
+        if( op.getImplicitReferencedOperator() == null ) {
+            exp = new UserFuncExpression(exprPlan, op.getFuncSpec() );
+        } else {
+            exp = new ScalarExpression( exprPlan );
+        }
         
         List<ExpressionOperator> args = op.getArguments();
-        
         for( ExpressionOperator arg : args ) {
             LogicalExpression expArg = exprOpsMap.get(arg);
             exprPlan.connect(exp, expArg);
@@ -227,9 +231,9 @@ public class LogicalExpPlanMigrationVist
         exprOpsMap.put(op, exp);
         // We need to track all the scalars
         if(op.getImplicitReferencedOperator() != null) {
-            exp.setImplicitReferencedOperator(outerOpsMap.get(op.getImplicitReferencedOperator()));
+            ((ScalarExpression)exp).setImplicitReferencedOperator(
+            		outerOpsMap.get(op.getImplicitReferencedOperator()) );
         }
-
     }
 
     public void visit(LOBinCond op) throws VisitorException {

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ConstantExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ConstantExpression.java?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ConstantExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ConstantExpression.java Thu Feb  3 01:33:38 2011
@@ -67,6 +67,10 @@ public class ConstantExpression extends 
         return val;
     }
     
+    public void setValue(Object val) {
+    	this.val = val;
+    }
+    
     public LogicalFieldSchema getValueSchema() {
         return mValueSchema;
     }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Thu Feb  3 01:33:38 2011
@@ -517,9 +517,9 @@ public class ExpToPhyTranslationVisitor 
         logToPhyMap.put(op, p);
         
         //We need to track all the scalars
-        
-        if(op.getImplicitReferencedOperator() != null) {
-            ((POUserFunc)p).setReferencedOperator(logToPhyMap.get(op.getImplicitReferencedOperator()));
+        if( op instanceof ScalarExpression ) {
+            Operator refOp = ((ScalarExpression)op).getImplicitReferencedOperator();
+            ((POUserFunc)p).setReferencedOperator( logToPhyMap.get( refOp ) );
         }
     }
     

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java Thu Feb  3 01:33:38 2011
@@ -66,7 +66,7 @@ public class LogicalExpressionPlan exten
      * @param lgExpPlan plan to merge
      * @return sources of the merged plan
      */
-    public List<Operator> merge(LogicalExpressionPlan lgExpPlan) throws FrontendException {
+    public List<Operator> merge(LogicalExpressionPlan lgExpPlan) {
         
         List<Operator> sources = lgExpPlan.getSources();
         

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionVisitor.java?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionVisitor.java Thu Feb  3 01:33:38 2011
@@ -110,4 +110,9 @@ public abstract class LogicalExpressionV
 
     public void visit(RegexExpression op) throws FrontendException {
     }
+    
+    public void visit(ScalarExpression op) throws FrontendException {
+        this.visit( (UserFuncExpression)op );
+    }
+
 }

Added: pig/trunk/src/org/apache/pig/newplan/logical/expression/ScalarExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ScalarExpression.java?rev=1066717&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ScalarExpression.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ScalarExpression.java Thu Feb  3 01:33:38 2011
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.expression;
+
+import java.util.List;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.builtin.ReadScalars;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+public class ScalarExpression extends UserFuncExpression {
+    private Operator implicitReferencedOperator = null;
+    private Operator attachedLogicalOperator = null; // The logical Operator where this udf is used.
+ 
+    private static FuncSpec funcSpec = new FuncSpec( ReadScalars.class.getName() );
+    
+    public ScalarExpression(OperatorPlan plan) {
+        super( plan, funcSpec );
+    }
+    
+    public ScalarExpression(OperatorPlan plan, Operator implicitReferencedOperator,
+            Operator attachedLogicalOperator) {
+        this( plan );
+        this.implicitReferencedOperator = implicitReferencedOperator;
+        this.attachedLogicalOperator = attachedLogicalOperator;
+    }
+    
+    @Override
+    public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendException {
+        if (fieldSchema!=null)
+            return fieldSchema;
+
+        // TODO: apparently there is a problem with some test cases where implicitReferencedOperator can be null,
+        // even for a scalar UDF. Need more investigation. For now, simulate the old code to make these
+        // test case pass.
+        if( implicitReferencedOperator != null ) {
+	        List<Operator> args = plan.getSuccessors(this);
+	        if(args != null && args.size() > 0 ){
+	            int pos = (Integer)((ConstantExpression)args.get(0)).getValue();
+	            LogicalRelationalOperator inp = (LogicalRelationalOperator)implicitReferencedOperator;
+	
+	            if( inp.getSchema() != null){
+	                LogicalFieldSchema inpFs = inp.getSchema().getField(pos);
+	                fieldSchema = new LogicalFieldSchema(inpFs);
+	                //  fieldSchema.alias = "ReadScalars_" + fieldSchema.alias;
+	            }else{
+	                fieldSchema = new LogicalFieldSchema(null, null, DataType.BYTEARRAY);
+	            }
+	            return fieldSchema;
+	        }else{
+	            //predecessors haven't been setup, return null
+	            return null;
+	        }
+        } else {
+        	return super.getFieldSchema();
+        }
+    }
+    
+    @Override
+    public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws FrontendException {
+        ScalarExpression copy =  null; 
+        copy = new ScalarExpression( lgExpPlan, implicitReferencedOperator,
+            attachedLogicalOperator );
+        
+        // Deep copy the input expressions.
+        List<Operator> inputs = plan.getSuccessors( this );
+        if( inputs != null ) {
+            for( Operator op : inputs ) {
+                LogicalExpression input = (LogicalExpression)op;
+                LogicalExpression inputCopy = input.deepCopy( lgExpPlan );
+                lgExpPlan.add( inputCopy );
+                lgExpPlan.connect( copy, inputCopy );
+            }
+        }
+        
+        return copy;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws FrontendException {
+        if( !( v instanceof LogicalExpressionVisitor ) ) {
+            throw new FrontendException( "Expected LogicalExpressionVisitor", 2222 );
+        }
+        ((LogicalExpressionVisitor)v).visit( this );
+    }
+
+    public Operator getImplicitReferencedOperator() {
+        return implicitReferencedOperator;
+    }
+    
+    public void setImplicitReferencedOperator(Operator implicitReferencedOperator) {
+		this.implicitReferencedOperator = implicitReferencedOperator;
+	}
+
+	public void setAttachedLogicalOperator(Operator attachedLogicalOperator) {
+		this.attachedLogicalOperator = attachedLogicalOperator;
+	}
+
+	public Operator getAttachedLogicalOperator() {
+        return attachedLogicalOperator;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder msg = new StringBuilder();
+        msg.append( "(Name: ScalarExpression)" + " Type: ");
+        if (fieldSchema!=null)
+            msg.append(DataType.findTypeName(fieldSchema.type));
+        else
+            msg.append("null");
+        msg.append(" Uid: ");
+        if (fieldSchema!=null)
+            msg.append(fieldSchema.uid);
+        else
+            msg.append("null");
+        msg.append(")");
+
+        return msg.toString();
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Thu Feb  3 01:33:38 2011
@@ -24,24 +24,17 @@ import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOConst;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.newplan.logical.Util;
-import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
-import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
 
 public class UserFuncExpression extends LogicalExpression {
 
     private FuncSpec mFuncSpec;
-    private Operator implicitReferencedOperator = null;
     private EvalFunc<?> ef = null;
     
     public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec) {
@@ -55,7 +48,7 @@ public class UserFuncExpression extends 
         this( plan, funcSpec );
         
         for( LogicalExpression arg : args ) {
-        	plan.connect( this, arg );
+            plan.connect( this, arg );
         }
     }
 
@@ -111,29 +104,6 @@ public class UserFuncExpression extends 
         if (fieldSchema!=null)
             return fieldSchema;
         
-        if(implicitReferencedOperator != null &&
-                mFuncSpec.getClassName().equals("org.apache.pig.impl.builtin.ReadScalars")){
-            // if this is a ReadScalars udf for scalar operation, use the 
-            // FieldSchema corresponding to this position in input 
-            List<Operator> args = plan.getSuccessors(this);
-            if(args != null && args.size() > 0 ){
-                int pos = (Integer)((ConstantExpression)args.get(0)).getValue();
-                LogicalRelationalOperator inp = (LogicalRelationalOperator)implicitReferencedOperator;
-
-                if( inp.getSchema() != null){
-                    LogicalFieldSchema inpFs = inp.getSchema().getField(pos);
-                    fieldSchema = new LogicalFieldSchema(inpFs);
-                    //  fieldSchema.alias = "ReadScalars_" + fieldSchema.alias;
-                }else{
-                    fieldSchema = new LogicalFieldSchema(null, null, DataType.BYTEARRAY);
-                }
-                return fieldSchema;
-            }else{
-                //predecessors haven't been setup, return null
-                return null;
-            }
-        }
-
         LogicalSchema inputSchema = new LogicalSchema();
         List<Operator> succs = plan.getSuccessors(this);
 
@@ -170,14 +140,6 @@ public class UserFuncExpression extends 
         return fieldSchema;
     }
     
-    public Operator getImplicitReferencedOperator() {
-        return implicitReferencedOperator;
-    }
-    
-    public void setImplicitReferencedOperator(Operator implicitReferencedOperator) {
-        this.implicitReferencedOperator = implicitReferencedOperator;
-    }
-
     @Override
     public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws FrontendException {
         UserFuncExpression copy =  null; 
@@ -185,7 +147,6 @@ public class UserFuncExpression extends 
             copy = new UserFuncExpression(
                     lgExpPlan,
                     this.getFuncSpec().clone() );
-            copy.setImplicitReferencedOperator(this.getImplicitReferencedOperator());
             
             // Deep copy the input expressions.
             List<Operator> inputs = plan.getSuccessors( this );
@@ -220,4 +181,5 @@ public class UserFuncExpression extends 
 
         return msg.toString();
     }
+
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ColumnAliasConversionVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ColumnAliasConversionVisitor.java?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ColumnAliasConversionVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ColumnAliasConversionVisitor.java Thu Feb  3 01:33:38 2011
@@ -18,11 +18,11 @@
 
 package org.apache.pig.newplan.logical.visitor;
 
-
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.PlanValidationException;
 import org.apache.pig.newplan.DependencyOrderWalker;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
@@ -41,6 +41,7 @@ import org.apache.pig.newplan.logical.re
  * indexes, using the underlying anonymous expression plan visitor.
  */
 public class ColumnAliasConversionVisitor extends AllExpressionVisitor {
+	
     public ColumnAliasConversionVisitor(OperatorPlan plan) throws FrontendException {
         super( plan, new DependencyOrderWalker( plan ) );
     }
@@ -58,15 +59,16 @@ public class ColumnAliasConversionVisito
                 LogicalSchema inputSchema = input.getSchema();
                 String alias = expr.getColAlias();
                 if( alias != null ) {
-                    int colNum = inputSchema.getFieldPosition( alias );
+                    int colNum = inputSchema == null ? -1 : inputSchema.getFieldPosition( alias );
                     if( colNum == -1 ) {
-                        throw new FrontendException( "Invalid field projection: " + alias );
+                		throw new PlanValidationException( "Invalid field projection. Projected field [" + 
+                    		alias + "] does not exist in schema: " + inputSchema + "." );
                     }
                     expr.setColNum( colNum );
                 } else {
                     int col = expr.getColNum();
-                    if( col >= inputSchema.size() ) {
-                        throw new FrontendException( "Out of bound access. Trying to access non-existent column: " + 
+                    if( inputSchema != null && col >= inputSchema.size() ) {
+                        throw new PlanValidationException( "Out of bound access. Trying to access non-existent column: " + 
                                                       col + ". Schema " + inputSchema + " has " + inputSchema.size() + " column(s)." );
                     }
                 }
@@ -82,14 +84,20 @@ public class ColumnAliasConversionVisito
                 LogicalExpressionPlan plan = (LogicalExpressionPlan)expr.getPlan();
                 LogicalExpression pred = (LogicalExpression)plan.getPredecessors( expr ).get(0);
                 LogicalSchema schema = pred.getFieldSchema().schema;
-                
+                int col = -1;
                 for( Object rc : rawCols ) {
                     if( rc instanceof Integer ) {
+                    	col = (Integer)rc;
+                    	if( schema != null && col >= schema.size() ) {
+                            throw new PlanValidationException( "Out of bound access. Trying to access non-existent column: " + 
+                                    col + ". Schema " + schema + " has " + schema.size() + " column(s)." );
+                    	}
                         cols.add( (Integer)rc );
                     } else {
-                        int col = schema.getFieldPosition( (String)rc );
+                        col = schema.getFieldPosition( (String)rc );
                         if( col == -1 ) {
-                            throw new FrontendException( "Invalid field projection: " + rc );
+                            throw new PlanValidationException( "Invalid field reference. Referenced field [" + 
+                            		rc + "] does not exist in schema: " + schema + "." );
                         }
                         cols.add( col );
                     }

Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java?rev=1066717&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVisitor.java Thu Feb  3 01:33:38 2011
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.visitor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.PlanValidationException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.ScalarExpression;
+import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+
+/**
+ * Logical plan visitor which will convert all column alias references to column
+ * indexes, using the underlying anonymous expression plan visitor.
+ */
+public class ScalarVisitor extends AllExpressionVisitor {
+    
+    public ScalarVisitor(OperatorPlan plan) throws FrontendException {
+        super( plan, new DependencyOrderWalker( plan ) );
+    }
+
+    @Override
+    protected LogicalExpressionVisitor getVisitor(final LogicalExpressionPlan exprPlan)
+    throws FrontendException {
+        return new LogicalExpressionVisitor( exprPlan, new DependencyOrderWalker( exprPlan ) ) {
+            
+            @Override
+            public void visit(ScalarExpression expr) throws FrontendException {
+                // This is a scalar udf.
+                ConstantExpression filenameConst = (ConstantExpression)exprPlan.getSuccessors( expr ).get( 1 );
+
+                Operator refOp = expr.getImplicitReferencedOperator();
+                Operator attachedOp = expr.getAttachedLogicalOperator();
+                LogicalPlan lp = (LogicalPlan) attachedOp.getPlan();
+                List<Operator> succs = lp.getSuccessors( refOp );
+                LOStore store = null;
+                if( succs != null ) {
+                    for( Operator succ : succs ) {
+                        if( succ instanceof LOStore ) {
+                            store = (LOStore)succ;
+                            break;
+                        }
+                    }
+                }
+
+                if( store == null ) {
+                    FuncSpec funcSpec = new FuncSpec(InterStorage.class.getName());
+                    FileSpec fileSpec;
+                    //try {
+                    // TODO: need to hookup the pigcontext.
+                    fileSpec = new FileSpec( "/tmp/file.name", funcSpec );
+                    //                        } catch (IOException e) {
+                    //                            throw new PlanValidationException( "Failed to process scalar: " + e);
+                    //                        }
+                    store = new LOStore( lp, fileSpec );
+                    lp.add( store );
+                    lp.connect( refOp, store );
+                }
+
+                filenameConst.setValue( store.getOutputSpec().getFileName() );
+                lp.createSoftLink( store, attachedOp );
+            }
+
+        };
+    }
+    
+}

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Thu Feb  3 01:33:38 2011
@@ -149,7 +149,7 @@ stream_cmd : ^( STDIN func_clause? )
 output_clause : ^( OUTPUT stream_cmd+ )
 ;
 
-error_clause : ^( ERROR  QUOTEDSTRING INTEGER? )
+error_clause : ^( STDERROR  QUOTEDSTRING INTEGER? )
 ;
 
 load_clause : ^( LOAD filename func_clause? as_clause? )
@@ -273,7 +273,7 @@ col_alias_or_index : col_alias | col_ind
 col_alias : GROUP | IDENTIFIER
 ;
 
-col_index : DOLLAR^ INTEGER
+col_index : ^( DOLLAR INTEGER )
 ;
 
 pound_proj : ^( POUND ( QUOTEDSTRING | NULL ) )
@@ -482,7 +482,7 @@ eid : rel_str_op
     | CACHE
     | INPUT
     | OUTPUT
-    | ERROR
+    | STDERROR
     | STDIN
     | STDOUT
     | LIMIT

Added: pig/trunk/src/org/apache/pig/parser/InvalidScalarProjectionException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/InvalidScalarProjectionException.java?rev=1066717&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/InvalidScalarProjectionException.java (added)
+++ pig/trunk/src/org/apache/pig/parser/InvalidScalarProjectionException.java Thu Feb  3 01:33:38 2011
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import org.antlr.runtime.IntStream;
+import org.antlr.runtime.RecognitionException;
+import org.apache.pig.newplan.logical.expression.ScalarExpression;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+
+public class InvalidScalarProjectionException extends RecognitionException {
+    private static final long serialVersionUID = 1L;
+    
+    private ScalarExpression scalarExpr;
+    
+    public InvalidScalarProjectionException(IntStream input, ScalarExpression expr) {
+        super( input );
+        this.scalarExpr = expr;
+    }
+    
+    public String toString() {
+        return "Invalid scalar projection: " + 
+        ((LogicalRelationalOperator)scalarExpr.getAttachedLogicalOperator()).getAlias();
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Thu Feb  3 01:33:38 2011
@@ -89,6 +89,10 @@ public class LogicalPlanBuilder {
     private PigContext pigContext = new PigContext( ExecType.LOCAL, new Properties() );
     
     private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
+    
+    Operator lookupOperator(String alias) {
+        return operators.get( alias );
+    }
 
     FuncSpec lookupFunction(String alias) {
         return pigContext.getFuncSpecFromAlias( alias );
@@ -483,7 +487,21 @@ public class LogicalPlanBuilder {
         if( colAlias != null ) {
             LogicalExpressionPlan exprPlan = exprPlans.get( colAlias );
             if( exprPlan != null ) {
-                return (LogicalExpression)exprPlan.getSources().get( 0 );// get the root of the plan
+                plan.merge( exprPlan );
+                // The projected alias is actually expression alias, so the projections in the represented
+                // expression doesn't have any operator associated with it. We need to set it when we 
+                // substitute the expression alias with the its expression.
+                if( op != null ) {
+                    Iterator<Operator> it = plan.getOperators();
+                    while( it.hasNext() ) {
+                        Operator o = it.next();
+                        if( o instanceof ProjectExpression ) {
+                            ProjectExpression projExpr = (ProjectExpression)o;
+                            projExpr.setAttachedRelationalOp( op );
+                        }
+                    }
+                }
+                return (LogicalExpression)plan.getSources().get( 0 );// get the root of the plan
             } else {
                 return new ProjectExpression( plan, 0, colAlias, op );
             }
@@ -502,7 +520,7 @@ public class LogicalPlanBuilder {
     }
     
     LogicalExpression buildUDF(LogicalExpressionPlan plan, String funcName, List<LogicalExpression> args) {
-        FuncSpec funcSpec = null;// TODO: get funcspec from function name.
+        FuncSpec funcSpec = new FuncSpec( funcName );
         return new UserFuncExpression( plan, funcSpec, args );
     }
     

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu Feb  3 01:33:38 2011
@@ -38,6 +38,8 @@ package org.apache.pig.parser;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.impl.builtin.GFAny;
+import org.apache.pig.impl.builtin.ReadScalars;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 import org.apache.pig.impl.util.MultiMap;
@@ -66,6 +68,7 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.OrExpression;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
 import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.ScalarExpression;
 import org.apache.pig.newplan.logical.expression.SubtractExpression;
 import org.apache.pig.newplan.logical.expression.UserFuncExpression;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
@@ -247,7 +250,7 @@ error_clause returns[String dir, Integer
 @init {
     $limit = StreamingCommand.MAX_TASKS;
 }
- : ^( ERROR QUOTEDSTRING INTEGER? )
+ : ^( STDERROR QUOTEDSTRING INTEGER? )
    {
        $dir = builder.unquote( $QUOTEDSTRING.text );
        $limit = Integer.parseInt( $INTEGER.text );
@@ -601,10 +604,46 @@ var_expr[LogicalExpressionPlan plan] ret
  : projectable_expr[$plan] { $expr = $projectable_expr.expr; }
    ( dot_proj 
      {
-         DereferenceExpression e = new DereferenceExpression( $plan );
-         e.setRawColumns( $dot_proj.cols );
-         $plan.connect( $expr, e );
-         $expr = e;
+         if( $expr instanceof ScalarExpression ) {
+             // This is a scalar projection.
+             ScalarExpression scalarExpr = (ScalarExpression)$expr;
+             if( $dot_proj.cols.size() > 1 ) {
+                 throw new InvalidScalarProjectionException( input, scalarExpr );
+             } else {
+                 Object val = $dot_proj.cols.get( 0 );
+                 int pos = -1;
+                 LogicalRelationalOperator relOp = (LogicalRelationalOperator)scalarExpr.getImplicitReferencedOperator();
+                 LogicalSchema schema = null;
+                 try {
+                     schema = relOp.getSchema();
+                 } catch(FrontendException e) {
+                     throw new PlanGenerationFailureException( input, e );
+                 }
+                 if( val instanceof Integer ) {
+                     pos = (Integer)val;
+                     if( schema != null && pos >= schema.size() ) {
+                         throw new InvalidScalarProjectionException( input, scalarExpr );
+                     }
+                 } else {
+                     String colAlias = (String)val;
+                     pos = schema.getFieldPosition( colAlias );
+                     if( schema == null || pos == -1 ) {
+                         throw new InvalidScalarProjectionException( input, scalarExpr );
+                     }
+                 }
+                 LogicalFieldSchema fs = new LogicalFieldSchema( null , null, DataType.INTEGER );
+                 ConstantExpression constExpr = new ConstantExpression( $plan, pos, fs  );
+                 plan.connect( $expr, constExpr );
+                 fs = new LogicalFieldSchema( null , null, DataType.CHARARRAY );
+                 constExpr = new ConstantExpression( $plan, "filename", fs ); // place holder for file name.
+                 plan.connect( $expr, constExpr );
+             }
+         } else {
+             DereferenceExpression e = new DereferenceExpression( $plan );
+             e.setRawColumns( $dot_proj.cols );
+             $plan.connect( $expr, e );
+             $expr = e;
+         }
      }
    | pound_proj
      {
@@ -646,7 +685,7 @@ col_alias returns[Object col]
 ;
 
 col_index returns[Object col]
- : DOLLAR^ INTEGER { $col = Integer.valueOf( $INTEGER.text ); }
+ : ^( DOLLAR INTEGER { $col = Integer.valueOf( $INTEGER.text ); } )
 ;
 
 pound_proj returns[String key]
@@ -1046,12 +1085,27 @@ alias_col_ref[LogicalExpressionPlan plan
    }
  | IDENTIFIER
    {
-       if( inForeachPlan ) {
-           $expr = builder.buildProjectExpr( $plan, currentOp, 
-               $foreach_plan::exprPlans, $IDENTIFIER.text, 0 );
+       String alias = $IDENTIFIER.text;
+       Operator inOp = builder.lookupOperator( $statement::inputAlias );
+       LogicalSchema schema;
+       try {
+           schema = ((LogicalRelationalOperator)inOp).getSchema();
+       } catch (FrontendException e) {
+           throw new PlanGenerationFailureException( input, e );
+       }
+       
+       Operator op = builder.lookupOperator( alias );
+       if( op != null && ( schema == null || schema.getFieldPosition( alias ) == -1 ) ) {
+           $expr = new ScalarExpression( plan, op,
+               inForeachPlan ? $foreach_clause::foreachOp : currentOp );
        } else {
-           $expr = builder.buildProjectExpr( $plan, currentOp, 
-               $statement::inputIndex, $IDENTIFIER.text, 0 );
+           if( inForeachPlan ) {
+               $expr = builder.buildProjectExpr( $plan, currentOp, 
+                   $foreach_plan::exprPlans, alias, 0 );
+           } else {
+               $expr = builder.buildProjectExpr( $plan, currentOp, 
+                   $statement::inputIndex, alias, 0 );
+           }
        }
    }
 ;
@@ -1218,7 +1272,7 @@ eid returns[String id] : rel_str_op { $i
     | CACHE { $id = $CACHE.text; }
     | INPUT { $id = $INPUT.text; }
     | OUTPUT { $id = $OUTPUT.text; }
-    | ERROR { $id = $ERROR.text; }
+    | STDERROR { $id = $STDERROR.text; }
     | STDIN { $id = $STDIN.text; }
     | STDOUT { $id = $STDOUT.text; }
     | LIMIT { $id = $LIMIT.text; }

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Thu Feb  3 01:33:38 2011
@@ -183,7 +183,7 @@ stream_cmd : ( STDIN | STDOUT | QUOTEDST
 output_clause : OUTPUT^ LEFT_PAREN! stream_cmd_list RIGHT_PAREN!
 ;
 
-error_clause : ERROR^ LEFT_PAREN! QUOTEDSTRING ( LIMIT! INTEGER )? RIGHT_PAREN!
+error_clause : STDERROR^ LEFT_PAREN! QUOTEDSTRING ( LIMIT! INTEGER )? RIGHT_PAREN!
 ;
 
 load_clause : LOAD^ filename ( USING! func_clause )? as_clause?
@@ -397,7 +397,7 @@ join_clause : JOIN^ join_sub_clause ( US
 join_type : HINT_REPL | HINT_MERGE | HINT_SKEWED | HINT_DEFAULT
 ;
 
-join_sub_clause : join_item ( LEFT | RIGHT | FULL ) OUTER? join_item
+join_sub_clause : join_item ( LEFT | RIGHT | FULL ) OUTER? COMMA! join_item
                 | join_item_list
 ;
 
@@ -589,7 +589,7 @@ eid : rel_str_op
     | CACHE
     | INPUT
     | OUTPUT
-    | ERROR
+    | STDERROR
     | STDIN
     | STDOUT
     | LIMIT

Modified: pig/trunk/test/org/apache/pig/parser/TestColumnAliasConversion.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestColumnAliasConversion.java?rev=1066717&r1=1066716&r2=1066717&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestColumnAliasConversion.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestColumnAliasConversion.java Thu Feb  3 01:33:38 2011
@@ -24,6 +24,7 @@ import junit.framework.Assert;
 
 import org.antlr.runtime.RecognitionException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.PlanValidationException;
 import org.apache.pig.newplan.DependencyOrderWalker;
 import org.apache.pig.newplan.logical.expression.DereferenceExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
@@ -55,15 +56,118 @@ public class TestColumnAliasConversion {
     @Test
     public void test3() throws RecognitionException, ParsingFailureException, IOException {
         String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
-                       "B = foreach A { R = a; S = R.u; T = limit S 100; generate S, T, c + d/5; };" +
+                       "B = foreach A { R = a; P = c * 2; Q = P + d; S = R.u; T = limit S 100; generate Q, R, S, T, c + d/5; };" +
                        "store B into 'y';";
         verify( query );
     }
     
-    private void verify(String query) throws RecognitionException, ParsingFailureException, IOException {
+    @Test
+    public void test4() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x' as ( u:bag{tuple(x, y)}, v:long, w:bytearray); " + 
+                       "B = foreach A generate u.(x, $1), $1, w; " +
+                       "C = store B into 'output';";
+        validate( query );
+    }
+    
+    @Test
+    public void test5() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x' as ( u:bag{} ); " + 
+                       "B = foreach A generate u.$100; " +
+                       "C = store B into 'output';";
+        validate( query );
+    }
+    
+    @Test
+    public void test6() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x'; " + 
+                       "B = foreach A generate $1, $1000; " +
+                       "C = store B into 'output';";
+        validate( query );
+    }
+
+    @Test
+    public void test7() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x'; " + 
+                       "B = load 'y' as ( u : int, v : chararray );" +
+                       "C = foreach A generate B.$1, $0; " +
+                       "D = store C into 'output';";
+        validate( query );
+    }
+    
+    @Test
+    public void testNegative1() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x' as ( u:bag{tuple(x, y)}, v:long, w:bytearray); " + 
+                       "B = foreach A generate u.(x, $3), v, w; " +
+                       "C = store B into 'output';";
+        try {
+        	validate( query );
+        } catch(PlanValidationException ex) {
+        	return;
+        }
+        Assert.fail( "Query should fail to validate." );
+    }
+    
+    @Test
+    public void testNegative2() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x' as ( u:bag{tuple(x, y)}, v:long, w:bytearray); " + 
+                       "B = foreach A generate u.(x, y), v, $5; " +
+                       "C = store B into 'output';";
+        try {
+        	validate( query );
+        } catch(PlanValidationException ex) {
+        	return;
+        }
+        Assert.fail( "Query should fail to validate." );
+    }
+    
+    @Test
+    public void testNegative3() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x' as ( u:bag{tuple(x, y)}, v:long, w:bytearray); " + 
+                       "B = foreach A generate u.(x, y), v, x; " +
+                       "C = store B into 'output';";
+        try {
+        	validate( query );
+        } catch(PlanValidationException ex) {
+        	return;
+        }
+        Assert.fail( "Query should fail to validate." );
+    }
+    
+    @Test
+    public void testNegative4() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x' as ( u:bag{tuple(x, y)}, v:long, w:bytearray); " + 
+                       "B = foreach A generate u.z, v, w; " +
+                       "C = store B into 'output';";
+        try {
+        	validate( query );
+        } catch(PlanValidationException ex) {
+        	return;
+        }
+        Assert.fail( "Query should fail to validate." );
+    }
+    
+    @Test
+    public void testNegative5() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x';" + 
+                       "B = foreach A generate u, $1; " +
+                       "C = store B into 'output';";
+        try {
+        	validate( query );
+        } catch(PlanValidationException ex) {
+        	return;
+        }
+        Assert.fail( "Query should fail to validate." );
+    }
+
+    private LogicalPlan validate(String query) throws RecognitionException, ParsingFailureException, IOException {
         LogicalPlan plan = ParserTestingUtils.generateLogicalPlan( query );
         ColumnAliasConversionVisitor visitor = new ColumnAliasConversionVisitor( plan );
         visitor.visit();
+        return plan;
+    }
+
+    private void verify(String query) throws RecognitionException, ParsingFailureException, IOException {
+        LogicalPlan plan = validate( query );
         System.out.println( "Plan after setter: " + plan.toString() );
         new AllExpressionVisitor( plan, new DependencyOrderWalker( plan ) ) {
             @Override

Added: pig/trunk/test/org/apache/pig/parser/TestScalarVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestScalarVisitor.java?rev=1066717&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestScalarVisitor.java (added)
+++ pig/trunk/test/org/apache/pig/parser/TestScalarVisitor.java Thu Feb  3 01:33:38 2011
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.antlr.runtime.RecognitionException;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.visitor.ScalarVisitor;
+
+import org.junit.Test;
+
+public class TestScalarVisitor {
+
+    @Test
+    public void test1() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x'; " + 
+                       "B = load 'y' as ( u : int, v : chararray );" +
+                       "C = foreach A generate B.$1, $0; " +
+                       "D = store C into 'output';";
+        LogicalPlan plan = visit( query );
+        Assert.assertEquals( 2, plan.getSources().size() ); // There should be two LOLoad op in the plan.
+        Assert.assertEquals( 2, plan.getSinks().size() ); // There should be also two LOStore op in the plan.
+        System.out.println( "New Logical Plan after scalar processing: " + plan );
+    }
+    
+    @Test
+    public void test2() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x'; " + 
+                       "B = load 'y' as ( u : int, v : chararray );" +
+                       "C = foreach A generate B.v, $0; " +
+                       "D = store C into 'output';";
+        LogicalPlan plan = visit( query );
+        Assert.assertEquals( 2, plan.getSources().size() ); // There should be two LOLoad op in the plan.
+        Assert.assertEquals( 2, plan.getSinks().size() ); // There should be also two LOStore op in the plan.
+        System.out.println( "New Logical Plan after scalar processing: " + plan );
+    }
+
+    @Test
+    public void testNegative1() throws RecognitionException, ParsingFailureException, IOException {
+        String query = "A = load 'x'; " + 
+                       "B = load 'y' as ( u : int, v : chararray );" +
+                       "C = foreach A generate B.w, $0; " +
+                       "D = store C into 'output';";
+        try {
+        	visit( query );
+        } catch(ParsingFailureException ex) {
+        	// Expected exception
+        	return;
+        }
+        Assert.fail( "Test case should fail" );
+    }
+
+    private LogicalPlan visit(String query) throws RecognitionException, ParsingFailureException, IOException {
+        LogicalPlan plan = ParserTestingUtils.generateLogicalPlan( query );
+        ScalarVisitor visitor = new ScalarVisitor( plan );
+        visitor.visit();
+        return plan;
+    }
+
+}