You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/12/18 01:01:50 UTC

svn commit: r1050538 - in /pig/trunk: src/org/apache/pig/parser/ test/org/apache/pig/parser/

Author: thejas
Date: Sat Dec 18 00:01:49 2010
New Revision: 1050538

URL: http://svn.apache.org/viewvc?rev=1050538&view=rev
Log:
PIG-1618: Switch to new parser generator technology - NewParser-9.patch - (xuefuz via thejas)

Added:
    pig/trunk/src/org/apache/pig/parser/UndefinedAliasException.java
Modified:
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryLexer.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g
    pig/trunk/test/org/apache/pig/parser/TestAstValidator.java
    pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1050538&r1=1050537&r2=1050538&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Sat Dec 18 00:01:49 2010
@@ -51,6 +51,9 @@ public String getErrorMessage(Recognitio
     if ( e instanceof DuplicatedSchemaAliasException ) {
         DuplicatedSchemaAliasException dae = (DuplicatedSchemaAliasException)e;
         msg = "Duplicated schema alias name '"+ dae.getAlias() + "' in the schema definition";
+    } else if( e instanceof UndefinedAliasException ) {
+        UndefinedAliasException dae = (UndefinedAliasException)e;
+        msg = "Alias '"+ dae.getAlias() + "' is not defined";
     }
     
     return msg;
@@ -65,6 +68,17 @@ throws DuplicatedSchemaAliasException {
     }
 }
 
+private void validateAliasRef(Set<String> aliases, String alias)
+throws UndefinedAliasException {
+    if( !aliases.contains( alias ) ) {
+        throw new UndefinedAliasException( input, alias );
+    } else {
+        aliases.add( alias );
+    }
+}
+
+private Set<String> aliases = new HashSet<String>();
+
 } // End of @members
 
 query : ^( QUERY statement* )
@@ -73,15 +87,15 @@ query : ^( QUERY statement* )
 statement : general_statement | foreach_statement
 ;
 
-general_statement : ^( STATEMENT alias? op_clause INTEGER? )
+general_statement : ^( STATEMENT ( alias { aliases.add( $alias.name ); } )? op_clause INTEGER? )
 ;
 
 // We need to handle foreach specifically because of the ending ';', which is not required 
 // if there is a nested block. This is ugly, but it gets the job done.
-foreach_statement : ^( STATEMENT alias? foreach_clause )
+foreach_statement : ^( STATEMENT ( alias { aliases.add( $alias.name ); } )? foreach_clause )
 ;
 
-alias : IDENTIFIER
+alias returns[String name] : IDENTIFIER { $name = $IDENTIFIER.text; }
 ;
 
 op_clause : define_clause 
@@ -93,9 +107,8 @@ op_clause : define_clause 
           | limit_clause
           | sample_clause
           | order_clause
-          | partition_clause
           | cross_clause
-          | joint_clause
+          | join_clause
           | union_clause
           | stream_clause
           | mr_clause
@@ -200,13 +213,11 @@ group_clause : ^( GROUP group_item_list 
 group_item_list : group_item+
 ;
 
-group_item : rel ( ( flatten_generated_item_list ) | ALL | ANY ) ( INNER | OUTER )?
-;
-
-rel : alias | op_clause
+group_item : rel ( join_group_by_clause | ALL | ANY ) ( INNER | OUTER )?
 ;
 
-flatten_generated_item_list : flatten_generated_item+
+rel : alias {  validateAliasRef( aliases, $alias.name ); }
+    | op_clause
 ;
 
 flatten_generated_item : ( flatten_clause | expr | STAR ) as_clause?
@@ -273,10 +284,7 @@ order_clause : ^( ORDER rel order_by_cla
 ;
 
 order_by_clause : STAR ( ASC | DESC )?
-                | order_col_list
-;
-
-order_col_list : order_col+
+                | order_col+
 ;
 
 order_col : col_ref ( ASC | DESC )?
@@ -294,17 +302,23 @@ cross_clause : ^( CROSS rel_list partiti
 rel_list : rel+
 ;
 
-joint_clause : ^( JOIN join_sub_clause QUOTEDSTRING? partition_clause? )
+join_clause : ^( JOIN join_sub_clause join_type? partition_clause? )
+;
+
+join_type : JOIN_TYPE_REPL | JOIN_TYPE_MERGE | JOIN_TYPE_SKEWED | JOIN_TYPE_DEFAULT
 ;
 
 join_sub_clause : join_item ( LEFT | RIGHT | FULL ) OUTER? join_item
-                | join_item_list
+                | ( join_item )+
 ;
 
-join_item_list : join_item ( join_item )+
+join_item : ^( JOIN_ITEM rel join_group_by_clause )
 ;
 
-join_item : rel flatten_generated_item_list
+join_group_by_clause : ^( BY join_group_by_expr+ )
+;
+
+join_group_by_expr : expr | STAR
 ;
 
 union_clause : ^( UNION ONSCHEMA? rel_list )
@@ -323,11 +337,17 @@ foreach_blk : nested_command_list genera
 generate_clause : ^( GENERATE flatten_generated_item+ )
 ;
 
-nested_command_list : nested_command*
+nested_command_list
+scope { Set<String> ids; }
+@init{ $nested_command_list::ids = new HashSet<String>(); }
+ : nested_command*
 ;
 
-nested_command : ^( NESTED_CMD IDENTIFIER expr  )
-               | ^( NESTED_CMD IDENTIFIER nested_op )
+nested_command
+ : ^( NESTED_CMD IDENTIFIER ( expr | nested_op ) )
+   {
+       $nested_command_list::ids.add( $IDENTIFIER.text );
+   }
 ;
 
 nested_op : nested_proj
@@ -343,16 +363,24 @@ nested_proj : ^( NESTED_PROJ col_ref col
 col_ref_list : col_ref+
 ;
 
-nested_filter : ^( FILTER ( IDENTIFIER | nested_proj | expr ) cond )
+nested_alias_ref
+ : IDENTIFIER
+   {
+       validateAliasRef( $nested_command_list::ids, $IDENTIFIER.text );
+   }
+;
+
+nested_filter
+ : ^( FILTER ( nested_alias_ref | nested_proj | expr ) cond )
 ;
 
-nested_sort : ^( ORDER ( IDENTIFIER | nested_proj | expr )  order_by_clause func_clause? )
+nested_sort : ^( ORDER ( nested_alias_ref | nested_proj | expr )  order_by_clause func_clause? )
 ;
 
-nested_distinct : ^( DISTINCT ( IDENTIFIER | nested_proj | expr ) )
+nested_distinct : ^( DISTINCT ( nested_alias_ref | nested_proj | expr ) )
 ;
 
-nested_limit : ^( LIMIT ( IDENTIFIER | nested_proj | expr ) INTEGER )
+nested_limit : ^( LIMIT ( nested_alias_ref | nested_proj | expr ) INTEGER )
 ;
 
 stream_clause : ^( STREAM rel ( EXECCOMMAND | IDENTIFIER ) as_clause? )

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1050538&r1=1050537&r2=1050538&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Sat Dec 18 00:01:49 2010
@@ -18,25 +18,44 @@
 
 package org.apache.pig.parser;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.pig.FuncSpec;
+import org.apache.pig.builtin.RANDOM;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.StringUtils;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
 import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
 import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOUnion;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
 import org.apache.pig.newplan.Operator;
 
 public class LogicalPlanBuilder {
@@ -59,55 +78,102 @@ public class LogicalPlanBuilder {
 
     String buildFilterOp(String alias, Integer parallel, String inputAlias, LogicalExpressionPlan expr) {
         LOFilter op = new LOFilter( plan, expr );
-        setAlias( op, alias );
-        setParallel( op, parallel );
-        plan.add( op );
-        Operator pred = operators.get( inputAlias );
-        if( pred == null ) {
-            // error out
-        }
-        plan.connect( pred, op );
-        operators.put( op.getAlias(), op );
-        return op.getAlias();
+        return buildOp( op, alias, parallel, inputAlias, null );
+    }
+
+    String buildDistinctOp(String alias, Integer parallel, String inputAlias, String partitioner) {
+        LODistinct op = new LODistinct( plan );
+        return buildOp( op, alias, parallel, inputAlias, partitioner );
     }
 
     String buildLimitOp(String alias, Integer parallel, String inputAlias, long limit) {
         LOLimit op = new LOLimit( plan, limit );
-        setAlias( op, alias );
-        setParallel( op, parallel );
-        plan.add( op );
-        Operator pred = operators.get( inputAlias );
-        if( pred == null ) {
-            // error out
+        return buildOp( op, alias, parallel, inputAlias, null );
+    }
+    
+    String buildSampleOp(String alias, Integer parallel, String inputAlias, double value) {
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        //  Generate a filter condition.
+        LogicalExpression konst = new ConstantExpression( filterPlan, value,
+                   new LogicalFieldSchema( null , null, DataType.DOUBLE ) );
+        UserFuncExpression udf = new UserFuncExpression( filterPlan, new FuncSpec( RANDOM.class.getName() ) );
+        new LessThanEqualExpression( filterPlan, udf, konst );
+        return buildFilterOp( alias, parallel, inputAlias, filterPlan );
+    }
+    
+    String buildUnionOp(String alias, Integer parallel, List<String> inputAliases) {
+        LOUnion op = new LOUnion( plan );
+        return buildOp( op, alias, parallel, inputAliases, null );
+    }
+
+    String buildSplitOp(String alias, Integer parallel, String inputAlias) {
+        LOSplit op = new LOSplit( plan );
+        return buildOp( op, alias, parallel, inputAlias, null );
+    }
+    
+    String buildSplitOutputOp(String alias, Integer parallel, String inputAlias, LogicalExpressionPlan filterPlan) {
+        LOSplitOutput op = new LOSplitOutput( plan, filterPlan );
+        return buildOp ( op, alias, parallel, inputAlias, null );
+    }
+    
+    String buildCrossOp(String alias, Integer parallel, List<String> inputAliases, String partitioner) {
+        LOCross op = new LOCross( plan );
+        return buildOp ( op, alias, parallel, inputAliases, partitioner );
+    }
+    
+    String buildOrderOp(String alias, Integer parallel, String inputAlias, List<LogicalExpressionPlan> plans, 
+            List<Boolean> ascFlags, FuncSpec fs) {
+        LOSort op = new LOSort( plan, plans, ascFlags, fs );
+        return buildOp( op, alias, parallel, inputAlias, null );
+    }
+
+    String buildJoinOp(String alias, Integer parallel, List<String> inputAliases, MultiMap<Integer, LogicalExpressionPlan> joinPlans,
+            JOINTYPE jt, List<Boolean> innerFlags, String partitioner) {
+        boolean[] flags = new boolean[innerFlags.size()];
+        for( int i = 0; i < innerFlags.size(); i++ ) {
+            flags[i] = innerFlags.get( i );
         }
-        plan.connect( pred, op );
-        operators.put( op.getAlias(), op );
-        return op.getAlias();
+        LOJoin op = new LOJoin( plan, joinPlans, jt, flags );
+        return buildOp( op, alias, parallel, inputAliases, partitioner );
+    }
+
+    String buildGroupOp(String alias, Integer parallel, String inputAlias, List<LogicalExpressionPlan> plans) {
+        LOCogroup op = new LOCogroup( plan );
+        return buildOp( op, alias, parallel, inputAlias, null );
     }
     
     String buildLoadOp(String alias, Integer parallel, String filename, FuncSpec funcSpec, LogicalSchema schema) {
         FileSpec loader = new FileSpec( filename, funcSpec );
         LOLoad op = new LOLoad( loader, schema, plan, null );
+        return buildOp( op, alias, parallel, new ArrayList<String>(), null );
+    }
+    
+    private String buildOp(LogicalRelationalOperator op, String alias, Integer parallel,
+            String inputAlias, String partitioner) {
+        List<String> inputAliases = new ArrayList<String>();
+        if( inputAlias != null )
+            inputAliases.add( inputAlias );
+        return buildOp( op, alias, parallel, inputAliases, partitioner );
+    }
+    
+    private String buildOp(LogicalRelationalOperator op, String alias, Integer parallel, 
+            List<String> inputAliases, String partitioner) {
         setAlias( op, alias );
         setParallel( op, parallel );
+        setPartitioner( op, partitioner );
         plan.add( op );
+        for( String a : inputAliases ) {
+            Operator pred = operators.get( a );
+            plan.connect( pred, op );
+        }
         operators.put( op.getAlias(), op );
         return op.getAlias();
     }
-    
+
     String buildStoreOp(String alias, Integer parallel, String inputAlias, String filename, FuncSpec funcSpec) {
         FileSpec fileSpec = new FileSpec( filename, funcSpec );
         LOStore op = new LOStore( plan, fileSpec );
-        setAlias( op, alias );
-        setParallel( op, parallel );
-        Operator pred = operators.get( inputAlias );
-        if( pred == null ) {
-            // error out
-        }
-        plan.connect( pred, op );
-        plan.add( op );
-        operators.put( op.getAlias(), op );
-        return op.getAlias();
+        return buildOp( op, alias, parallel, inputAlias, null );
     }
     
     static void setAlias(LogicalRelationalOperator op, String alias) {
@@ -123,14 +189,16 @@ public class LogicalPlanBuilder {
             op.setRequestedParallelism( parallel );
     }
     
+    static void setPartitioner(LogicalRelationalOperator op, String partitioner) {
+        if( partitioner != null )
+            op.setCustomPartitioner( partitioner );
+    }
+    
     static FuncSpec buildFuncSpec(String funcName, List<String> args) {
         String[] argArray = new String[args.size()];
         return new FuncSpec( funcName, args.toArray( argArray ) );
     }
     
-//    static FuncSpec buildFuncSpec(String funcName, List<LogicalExpression> args) {
-//    }
-    
     static String unquote(String s) {
         return StringUtils.unescapeInputString( s.substring(1, s.length() - 1 ) );
     }
@@ -145,4 +213,11 @@ public class LogicalPlanBuilder {
         return bagFactory.newDefaultBag();
     }
     
+    LogicalExpressionPlan buildProjectStar(String opAlias) {
+        LogicalExpressionPlan plan = new LogicalExpressionPlan();
+        LogicalRelationalOperator op = (LogicalRelationalOperator)operators.get( opAlias );
+        new ProjectExpression( plan, 0, -1, op );
+        return plan;
+    }
+    
 }

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1050538&r1=1050537&r2=1050538&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Sat Dec 18 00:01:49 2010
@@ -37,6 +37,7 @@ package org.apache.pig.parser;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.logical.expression.AddExpression;
 import org.apache.pig.newplan.logical.expression.AndExpression;
 import org.apache.pig.newplan.logical.expression.BinCondExpression;
@@ -63,6 +64,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
@@ -92,7 +94,7 @@ scope {
 }
 @init {
 }
-: ^( STATEMENT ( alias { $general_statement::alias = $alias.ret; } )? 
+: ^( STATEMENT ( alias { $general_statement::alias = $alias.name; } )? 
   op_clause ( INTEGER { $general_statement::parallel = Integer.parseInt( $INTEGER.text ); } )? )
 ;
 
@@ -101,33 +103,32 @@ scope {
 foreach_statement : ^( STATEMENT alias? foreach_clause[$alias.text] )
 ;
 
-alias returns[String ret]: IDENTIFIER { $ret = $IDENTIFIER.text; }
+alias returns[String name]: IDENTIFIER { $name = $IDENTIFIER.text; }
 ;
 
 op_clause returns[String alias] : 
             define_clause 
           | load_clause { $alias = $load_clause.alias; }
-          | group_clause
-          | store_clause
-          | filter_clause
-          | distinct_clause
+          | group_clause { $alias = $group_clause.alias; }
+          | store_clause { $alias = $store_clause.alias; }
+          | filter_clause { $alias = $filter_clause.alias; }
+          | distinct_clause { $alias = $distinct_clause.alias; }
           | limit_clause { $alias = $limit_clause.alias; }
-          | sample_clause
-          | order_clause
-          | partition_clause
-          | cross_clause
-          | joint_clause
-          | union_clause
+          | sample_clause { $alias = $sample_clause.alias; }
+          | order_clause { $alias = $order_clause.alias; }
+          | cross_clause { $alias = $cross_clause.alias; }
+          | join_clause { $alias = $join_clause.alias; }
+          | union_clause { $alias = $union_clause.alias; }
           | stream_clause
           | mr_clause
-          | split_clause
+          | split_clause { $alias = $split_clause.alias; }
 ;
 
 define_clause 
  : ^( DEFINE alias cmd ) 
  | ^( DEFINE alias func_clause )
    {
-       builder.defineFunction( $alias.ret, $func_clause.funcSpec );
+       builder.defineFunction( $alias.name, $func_clause.funcSpec );
    }
 ;
 
@@ -284,34 +285,34 @@ func_args returns[List<String> args]
 : ( QUOTEDSTRING { $args.add( builder.unquote( $QUOTEDSTRING.text ) ); } )+
 ;
 
-group_clause : ^( GROUP group_item_list QUOTEDSTRING? )
-             | ^( COGROUP group_item_list QUOTEDSTRING? )
+group_clause returns[String alias]
+ : ^( GROUP group_item_list QUOTEDSTRING? )
+ | ^( COGROUP group_item_list QUOTEDSTRING? )
 ;
 
 group_item_list : group_item+
 ;
 
-group_item : rel ( ( flatten_generated_item_list ) | ALL | ANY ) ( INNER | OUTER )?
+group_item : rel ( join_group_by_clause[$rel.name] | ALL | ANY ) ( INNER | OUTER )?
 ;
 
-rel returns[String ret] : alias { $ret = $alias.ret; }
-                        | op_clause { $ret = $op_clause.alias; }
+rel returns[String name] : alias { $name = $alias.name; }
+                        | op_clause { $name = $op_clause.alias; }
 ;
 
-flatten_generated_item_list : flatten_generated_item+
+flatten_generated_item
+ : ( flatten_clause | expr[null] | STAR ) as_clause?
 ;
 
-flatten_generated_item : ( flatten_clause | expr[null] | STAR ) as_clause?
-;
-
-flatten_clause : ^( FLATTEN expr[null] )
+flatten_clause
+ : ^( FLATTEN expr[null] )
 ;
 
 store_clause returns[String alias]
  : ^( STORE alias filename func_clause? )
    {
        $alias= builder.buildStoreOp( $general_statement::alias,
-          $general_statement::parallel, $alias.ret, $filename.filename, $func_clause.funcSpec );
+          $general_statement::parallel, $alias.name, $filename.filename, $func_clause.funcSpec );
    }
 ;
 
@@ -320,7 +321,7 @@ filter_clause returns[String alias]
  : ^( FILTER rel cond[exprPlan] )
    {
        $alias = builder.buildFilterOp( $general_statement::alias,
-          $general_statement::parallel, $rel.ret, exprPlan );
+          $general_statement::parallel, $rel.name, exprPlan );
    }
 ;
 
@@ -454,56 +455,160 @@ bin_expr[LogicalExpressionPlan plan] ret
    }
 ;
 
-limit_clause returns[String alias] : ^( LIMIT rel INTEGER  )
-{
-$alias = builder.buildLimitOp( $general_statement::alias,
-    $general_statement::parallel, $rel.ret, Long.valueOf( $INTEGER.text ) );
-}
-             | ^( LIMIT rel LONGINTEGER )
+limit_clause returns[String alias]
+ : ^( LIMIT rel INTEGER  )
+   {
+       $alias = builder.buildLimitOp( $general_statement::alias,
+           $general_statement::parallel, $rel.name, Long.valueOf( $INTEGER.text ) );
+   }
+ | ^( LIMIT rel LONGINTEGER )
+   {
+       $alias = builder.buildLimitOp( $general_statement::alias,
+           $general_statement::parallel, $rel.name, Long.valueOf( $LONGINTEGER.text ) );
+   }
 ;
 
-sample_clause : ^( SAMPLE rel DOUBLENUMBER )
+sample_clause returns[String alias]
+ : ^( SAMPLE rel DOUBLENUMBER )
+   {
+       $alias = builder.buildSampleOp( $general_statement::alias,
+           $general_statement::parallel, $rel.name, Double.valueOf( $DOUBLENUMBER.text ) );
+   }
 ;
 
-order_clause : ^( ORDER rel order_by_clause func_clause? )
+order_clause returns[String alias]
+ : ^( ORDER rel order_by_clause[$rel.name] func_clause? )
+   {
+       $alias = builder.buildOrderOp( $general_statement::alias,
+           $general_statement::parallel, $rel.name, $order_by_clause.plans, 
+           $order_by_clause.ascFlags, $func_clause.funcSpec );
+   }
 ;
 
-order_by_clause : STAR ( ASC | DESC )?
-                | order_col_list
+order_by_clause[String opAlias] returns[List<LogicalExpressionPlan> plans, List<Boolean> ascFlags]
+@init {
+    $plans = new ArrayList<LogicalExpressionPlan>();
+    $ascFlags = new ArrayList<Boolean>();
+}
+ : STAR {
+       LogicalExpressionPlan plan = builder.buildProjectStar( $opAlias );
+       $plans.add( plan );
+   }
+   ( ASC | DESC { $ascFlags.add( false ); } )?
+ | ( order_col
+   {
+       $plans.add( $order_col.plan );
+       $ascFlags.add( $order_col.ascFlag );
+   } )+
+;
+
+order_col returns[LogicalExpressionPlan plan, Boolean ascFlag]
+@init { 
+    $plan = new LogicalExpressionPlan();
+    $ascFlag = true;
+}
+ : col_ref[$plan] ( ASC | DESC { $ascFlag = false; } )?
 ;
 
-order_col_list : order_col+
+distinct_clause returns[String alias]
+ : ^( DISTINCT rel partition_clause? )
+   {
+       $alias = builder.buildDistinctOp( $general_statement::alias,
+          $general_statement::parallel, $rel.name, $partition_clause.partitioner );
+   }
 ;
 
-order_col : col_ref[null] ( ASC | DESC )?
+partition_clause returns[String partitioner]
+ : ^( PARTITION func_name )
+   {
+       $partitioner = $func_name.funcName;
+   }
 ;
 
-distinct_clause : ^( DISTINCT rel partition_clause? )
+cross_clause returns[String alias]
+ : ^( CROSS rel_list partition_clause? )
+   {
+       $alias = builder.buildCrossOp( $general_statement::alias,
+          $general_statement::parallel, $rel_list.aliasList, $partition_clause.partitioner );
+   }
 ;
 
-partition_clause : ^( PARTITION func_name )
+rel_list returns[List<String> aliasList]
+@init { $aliasList = new ArrayList<String>(); }
+ : ( rel { $aliasList.add( $rel.name ); } )+
 ;
 
-cross_clause : ^( CROSS rel_list partition_clause? )
+join_clause returns[String alias]
+scope {
+    MultiMap<Integer, LogicalExpressionPlan> joinPlans;
+    int inputIndex;
+    List<String> inputAliases;
+    List<Boolean> innerFlags;
+}
+@init {
+    $join_clause::joinPlans = new MultiMap<Integer, LogicalExpressionPlan>();
+    $join_clause::inputAliases = new ArrayList<String>();
+    $join_clause::innerFlags = new ArrayList<Boolean>();
+}
+ : ^( JOIN join_sub_clause join_type? partition_clause? )
+   {
+       $alias = builder.buildJoinOp( $general_statement::alias,
+          $general_statement::parallel, $join_clause::inputAliases, $join_clause::joinPlans,
+          $join_type.type, $join_clause::innerFlags, $partition_clause.partitioner );
+   }
 ;
 
-rel_list : rel+
+join_type returns[JOINTYPE type]
+ : JOIN_TYPE_REPL { $type = JOINTYPE.REPLICATED; }
+ | JOIN_TYPE_MERGE { $type = JOINTYPE.MERGE; }
+ | JOIN_TYPE_SKEWED { $type = JOINTYPE.SKEWED; }
+ | JOIN_TYPE_DEFAULT { $type = JOINTYPE.HASH; }
 ;
 
-joint_clause : ^( JOIN join_sub_clause QUOTEDSTRING? partition_clause? )
+join_sub_clause
+ : join_item ( LEFT { $join_clause::innerFlags.add( false ); 
+                      $join_clause::innerFlags.add( true ); } 
+             | RIGHT { $join_clause::innerFlags.add( true ); 
+                       $join_clause::innerFlags.add( false ); }
+             | FULL { $join_clause::innerFlags.add( false ); 
+                      $join_clause::innerFlags.add( false ); } ) OUTER? join_item
+   {
+   }
+ | join_item+
 ;
 
-join_sub_clause : join_item ( LEFT | RIGHT | FULL ) OUTER? join_item
-                | join_item_list
+join_item
+ : ^( JOIN_ITEM rel join_group_by_clause[$rel.name] )
+   {
+       $join_clause::inputAliases.add( $rel.name );
+       $join_clause::joinPlans.put( $join_clause::inputIndex, $join_group_by_clause.plans );
+       $join_clause::inputIndex++;
+   }
 ;
 
-join_item_list : join_item ( join_item )+
+join_group_by_clause[String alias] returns[List<LogicalExpressionPlan> plans]
+scope { String inputAlias; }
+@init {
+    $join_group_by_clause::inputAlias = $alias;
+    $plans = new ArrayList<LogicalExpressionPlan>();
+}
+ : ^( BY ( join_group_by_expr { $plans.add( $join_group_by_expr.plan ); } )+ )
 ;
 
-join_item : rel flatten_generated_item_list
+join_group_by_expr returns[LogicalExpressionPlan plan]
+ : { $plan = new LogicalExpressionPlan(); } expr[$plan]
+ | STAR 
+   {
+       $plan = builder.buildProjectStar( $join_group_by_clause::inputAlias );
+   }
 ;
 
-union_clause : ^( UNION ONSCHEMA? rel_list )
+union_clause returns[String alias]
+ : ^( UNION ONSCHEMA? rel_list )
+   {
+      $alias = builder.buildUnionOp( $general_statement::alias,
+          $general_statement::parallel, $rel_list.aliasList );
+   }
 ;
 
 foreach_clause[String alias] returns[LogicalRelationalOperator op] : ^( FOREACH rel nested_plan )
@@ -522,8 +627,7 @@ generate_clause : ^( GENERATE flatten_ge
 nested_command_list : nested_command*
 ;
 
-nested_command : ^( NESTED_CMD IDENTIFIER expr[null]  )
-               | ^( NESTED_CMD IDENTIFIER nested_op )
+nested_command : ^( NESTED_CMD IDENTIFIER ( expr[null] | nested_op ) )
 ;
 
 nested_op : nested_proj
@@ -539,16 +643,19 @@ nested_proj : ^( NESTED_PROJ col_ref[nul
 col_ref_list : col_ref[null]+
 ;
 
-nested_filter : ^( FILTER ( IDENTIFIER | nested_proj | expr[null] ) cond[null] )
+nested_alias_ref : IDENTIFIER
 ;
 
-nested_sort : ^( ORDER ( IDENTIFIER | nested_proj | expr[null] )  order_by_clause func_clause? )
+nested_filter : ^( FILTER ( nested_alias_ref | nested_proj | expr[null] ) cond[null] )
 ;
 
-nested_distinct : ^( DISTINCT ( IDENTIFIER | nested_proj | expr[null] ) )
+nested_sort : ^( ORDER ( nested_alias_ref | nested_proj | expr[null] )  order_by_clause[null] func_clause? )
 ;
 
-nested_limit : ^( LIMIT ( IDENTIFIER | nested_proj | expr[null] ) INTEGER )
+nested_distinct : ^( DISTINCT ( nested_alias_ref | nested_proj | expr[null] ) )
+;
+
+nested_limit : ^( LIMIT ( nested_alias_ref | nested_proj | expr[null] ) INTEGER )
 ;
 
 stream_clause : ^( STREAM rel ( EXECCOMMAND | IDENTIFIER ) as_clause? )
@@ -557,10 +664,20 @@ stream_clause : ^( STREAM rel ( EXECCOMM
 mr_clause : ^( MAPREDUCE QUOTEDSTRING path_list? store_clause load_clause EXECCOMMAND? )
 ;
 
-split_clause : ^( SPLIT rel split_branch+ )
+split_clause returns[String alias]
+ : ^( SPLIT rel { $alias = builder.buildSplitOp( $general_statement::alias, 
+                  $general_statement::parallel, $rel.name ); } 
+      split_branch[$alias]+ )
+   
 ;
 
-split_branch : ^( SPLIT_BRANCH IDENTIFIER cond[null] )
+split_branch[String inputAlias] returns[String alias]
+@init { LogicalExpressionPlan splitPlan = new LogicalExpressionPlan(); }
+ : ^( SPLIT_BRANCH IDENTIFIER cond[splitPlan] )
+   {
+       $alias = builder.buildSplitOutputOp( $IDENTIFIER.text,
+          $general_statement::parallel, $inputAlias, splitPlan );
+   }
 ;
 
 col_ref[LogicalExpressionPlan plan] returns[LogicalExpression expr]
@@ -778,4 +895,3 @@ rel_str_op returns[String id]
  | STR_OP_LTE { $id = $STR_OP_LTE.text; }
  | STR_OP_MATCHES { $id = $STR_OP_MATCHES.text; }
 ;
-

Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1050538&r1=1050537&r2=1050538&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Sat Dec 18 00:01:49 2010
@@ -266,6 +266,18 @@ DOUBLENUMBER : FLOATINGPOINT ( 'E' ( MIN
 FLOATNUMBER : DOUBLENUMBER ( 'F' )?
 ;
 
+JOIN_TYPE_REPL : '\'REPL\'' |  '\'REPLICATED\''
+;
+
+JOIN_TYPE_SKEWED : '\'SKEWED\''
+;
+
+JOIN_TYPE_MERGE : '\'MERGE\''
+;
+
+JOIN_TYPE_DEFAULT : '\'HASH\'' | '\'DEFAULT\''
+;
+
 QUOTEDSTRING :  '\'' (   ( ~ ( '\'' | '\\' | '\n' | '\r' ) )
                        | ( '\\' ( ( 'N' | 'T' | 'B' | 'R' | 'F' | '\\' | '\'' ) ) )
                        | ( '\\u' ( '0'..'9' | 'A'..'F' )

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1050538&r1=1050537&r2=1050538&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Sat Dec 18 00:01:49 2010
@@ -54,6 +54,7 @@ tokens {
     BAG_TYPE;
     NEG;
     EXPR_IN_PAREN;
+    JOIN_ITEM;
 }
 
 @header {
@@ -136,9 +137,8 @@ op_clause : define_clause 
           | limit_clause
           | sample_clause
           | order_clause
-          | partition_clause
           | cross_clause
-          | joint_clause
+          | join_clause
           | union_clause
           | stream_clause
           | mr_clause
@@ -245,17 +245,12 @@ group_item_list : group_item ( COMMA gro
                -> group_item+
 ;
 
-group_item : rel ( ( BY! flatten_generated_item_list ) | ALL | ANY ) ( INNER | OUTER )?
+group_item : rel ( join_group_by_clause | ALL | ANY ) ( INNER | OUTER )?
 ;
 
 rel : alias | LEFT_PAREN! op_clause RIGHT_PAREN!
 ;
 
-flatten_generated_item_list : LEFT_PAREN flatten_generated_item ( COMMA flatten_generated_item )* RIGHT_PAREN
-                           -> flatten_generated_item+
-                            | flatten_generated_item
-;
-
 flatten_generated_item : ( flatten_clause | expr | STAR ) as_clause?
 ;
 
@@ -379,7 +374,10 @@ rel_list : rel ( COMMA rel )*
         -> rel+
 ;
 
-joint_clause : JOIN^ join_sub_clause ( USING! QUOTEDSTRING )? partition_clause?
+join_clause : JOIN^ join_sub_clause ( USING! join_type )? partition_clause?
+;
+
+join_type : JOIN_TYPE_REPL | JOIN_TYPE_MERGE | JOIN_TYPE_SKEWED | JOIN_TYPE_DEFAULT
 ;
 
 join_sub_clause : join_item ( LEFT | RIGHT | FULL ) OUTER? join_item
@@ -389,7 +387,19 @@ join_sub_clause : join_item ( LEFT | RIG
 join_item_list : join_item ( COMMA! join_item )+
 ;
 
-join_item : rel BY! flatten_generated_item_list
+join_item : rel join_group_by_clause
+         -> ^( JOIN_ITEM  rel join_group_by_clause )
+;
+
+join_group_by_clause : BY^ join_group_by_expr_list
+;
+
+join_group_by_expr_list : LEFT_PAREN join_group_by_expr ( COMMA join_group_by_expr )* RIGHT_PAREN
+                       -> join_group_by_expr+
+                        | join_group_by_expr
+;
+
+join_group_by_expr : expr | STAR
 ;
 
 union_clause : UNION^ ONSCHEMA? rel_list
@@ -416,8 +426,8 @@ nested_command_list : ( nested_command S
                     |
 ;
 
-nested_command : IDENTIFIER EQUAL expr 
-              -> ^( NESTED_CMD IDENTIFIER expr  )
+nested_command : IDENTIFIER EQUAL expr
+              -> ^( NESTED_CMD IDENTIFIER expr )
                | IDENTIFIER EQUAL nested_op
               -> ^( NESTED_CMD IDENTIFIER nested_op )
 ;
@@ -437,16 +447,19 @@ col_ref_list : ( col_ref | ( LEFT_PAREN 
             -> col_ref+
 ;
 
-nested_filter : FILTER^ ( IDENTIFIER | nested_proj | expr_eval ) BY! cond
+nested_alias_ref : IDENTIFIER
+;
+
+nested_filter : FILTER^ ( nested_alias_ref | nested_proj | expr_eval ) BY! cond
 ;
 
-nested_sort : ORDER^ ( IDENTIFIER | nested_proj | expr_eval ) BY!  order_by_clause ( USING! func_clause )?
+nested_sort : ORDER^ ( nested_alias_ref | nested_proj | expr_eval ) BY!  order_by_clause ( USING! func_clause )?
 ;
 
-nested_distinct : DISTINCT^ ( IDENTIFIER | nested_proj | expr_eval )
+nested_distinct : DISTINCT^ ( nested_alias_ref | nested_proj | expr_eval )
 ;
 
-nested_limit : LIMIT^ ( IDENTIFIER | nested_proj | expr_eval ) INTEGER
+nested_limit : LIMIT^ ( nested_alias_ref | nested_proj | expr_eval ) INTEGER
 ;
 
 stream_clause : STREAM^ rel THROUGH! ( EXECCOMMAND | IDENTIFIER ) as_clause?

Added: pig/trunk/src/org/apache/pig/parser/UndefinedAliasException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/UndefinedAliasException.java?rev=1050538&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/UndefinedAliasException.java (added)
+++ pig/trunk/src/org/apache/pig/parser/UndefinedAliasException.java Sat Dec 18 00:01:49 2010
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import org.antlr.runtime.IntStream;
+import org.antlr.runtime.RecognitionException;
+
+public class UndefinedAliasException extends RecognitionException {
+    private static final long serialVersionUID = 1L;
+    
+    private String alias;
+    
+    public UndefinedAliasException(IntStream input, String alias) {
+        super( input );
+        this.alias = alias;
+    }
+    
+    public String toString() {
+        return "Undefined alias: " + alias;
+    }
+
+    public String getAlias() {
+        return alias;
+    }
+
+}

Modified: pig/trunk/test/org/apache/pig/parser/TestAstValidator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestAstValidator.java?rev=1050538&r1=1050537&r2=1050538&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestAstValidator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestAstValidator.java Sat Dec 18 00:01:49 2010
@@ -108,4 +108,17 @@ public class TestAstValidator {
         }
         Assert.assertTrue( false ); // should never come here.
     }
+
+    @Test
+    public void tesNegative3() throws RecognitionException, IOException {
+        try {
+            ParserTestingUtils.validateAst( "A = load 'x'; C = limit B 100;" );
+        } catch(ParsingFailureException ex) {
+            Assert.assertEquals( AstValidator.class, ex.getParsingClass() );
+            return;
+        }
+        Assert.assertTrue( false ); // should never come here.
+    }
+    
+    // TODO: need a test similar to above but for foreach inner plan.
 }

Modified: pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java?rev=1050538&r1=1050537&r2=1050538&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java Sat Dec 18 00:01:49 2010
@@ -28,17 +28,35 @@ import org.junit.Test;
 public class TestLogicalPlanGenerator {
     @Test
     public void test1() throws RecognitionException, IOException, ParsingFailureException {
-        ParserTestingUtils.generateLogicalPlan( "A = load 'x' using org.apache.pig.TextLoader( 'a', 'b' ) as ( u:int, v:long, w:bytearray); B = limit A 100; C = filter B by 2 > 1; D = store C into 'output';" );
+        try {
+        ParserTestingUtils.generateLogicalPlan( 
+        		"A = load 'x' using org.apache.pig.TextLoader( 'a', 'b' ) as ( u:int, v:long, w:bytearray); " + 
+        		"B = limit A 100; " +
+        		"C = filter B by 2 > 1; " +
+        		"D = load 'y' as (d1, d2); " +
+        		"E = join C by ( $0, $1 ), D by ( d1, d2 ) using 'replicated' parallel 16; " +
+        		"F = store E into 'output';" );
+        } catch(Exception ex) {
+            Assert.assertTrue( false );// should never come here.
+        }
     }
 
     @Test
-    public void testNegative2() throws RecognitionException, IOException {
+    public void test2() throws RecognitionException, IOException, ParsingFailureException {
         try {
-            ParserTestingUtils.validateAst( "A = load 'x' as ( u:int, v:long, w:tuple( w:long, u:chararray, w:bytearray) );" );
-        } catch(ParsingFailureException ex) {
-            Assert.assertEquals( AstValidator.class, ex.getParsingClass() );
-            return;
+        ParserTestingUtils.generateLogicalPlan( 
+        		"A = load 'x' as ( u:int, v:long, w:bytearray); " + 
+        		"B = distinct A partition by org.apache.pig.Identity; " +
+        		"C = sample B 0.49; " +
+        		"D = order C by $0, $1; " +
+        		"E = load 'y' as (d1, d2); " +
+        		"F = union onschema D, E; " +
+        		"G = load 'z' as (g1:int, g2:tuple(g21, g22)); " +
+        		"H = cross F, G; " +
+        		"I = split H into I if 10 > 5, J if 'world' eq 'hello', K if 77 <= 200; " +
+        		"L = store J into 'output';" );
+        } catch(Exception ex) {
+            Assert.assertTrue( false );// should never come here.
         }
-        Assert.assertTrue( false ); // should never come here.
     }
 }