You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by gd...@apache.org on 2012/09/13 16:55:38 UTC

svn commit: r1384352 [3/4] - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...

Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java Thu Sep 13 14:55:36 2012
@@ -57,6 +57,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -373,13 +374,21 @@ public class LineageFindRelVisitor exten
             visitExpression(expPlan);
         }
     }
-    
-    
+
+    @Override
+    public void visit(LORank rank) throws FrontendException{
+        mapToPredLoadFunc(rank);
+        List<LogicalExpressionPlan> expPlans = rank.getRankColPlans();
+        for(LogicalExpressionPlan expPlan : expPlans){
+            visitExpression(expPlan);
+        }
+    }
+
     @Override
     public void visit(LODistinct relOp) throws FrontendException{
         mapToPredLoadFunc(relOp);
     }
-    
+
     @Override
     public void visit(LOLimit loLimit) throws FrontendException{
         mapToPredLoadFunc(loLimit);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java Thu Sep 13 14:55:36 2012
@@ -42,6 +42,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOInnerLoad;
 import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
@@ -113,11 +114,55 @@ public class ProjectStarExpander extends
                 );
             }
         }
-        
+
         sort.setSortColPlans(newExpPlans);
         sort.setAscendingCols(newAscOrder);
     }
 
+    @Override
+    public void visit(LORank rank) throws FrontendException {
+
+        List<LogicalExpressionPlan> expPlans = rank.getRankColPlans();
+        List<Boolean> ascOrder = rank.getAscendingCol();
+
+        List<LogicalExpressionPlan> newExpPlans = new ArrayList<LogicalExpressionPlan>();
+        List<Boolean> newAscOrder = new ArrayList<Boolean>();
+
+        if (expPlans.size() != ascOrder.size()) {
+            throw new AssertionError(
+                    "Size of expPlans and ascorder should be same");
+        }
+
+        for (int i = 0; i < expPlans.size(); i++) {
+            // expand the plan
+            LogicalExpressionPlan ithExpPlan = expPlans.get(i);
+            List<LogicalExpressionPlan> expandedPlans = expandPlan(ithExpPlan,
+                    0);
+            newExpPlans.addAll(expandedPlans);
+
+            // add corresponding isAsc flags
+            Boolean isAsc = ascOrder.get(i);
+            for (int j = 0; j < expandedPlans.size(); j++) {
+                newAscOrder.add(isAsc);
+            }
+        }
+
+        // check if there is a project-star-to-end followed by another sort plan
+        // in the expanded plans (can happen if there is no input schema)
+        for (int i = 0; i < newExpPlans.size(); i++) {
+            ProjectExpression proj = getProjectStar(newExpPlans.get(i));
+            if (proj != null && proj.isRangeProject() && proj.getEndCol() == -1
+                    && i != newExpPlans.size() - 1) {
+                String msg = "Project-range to end (eg. x..)"
+                        + " is supported in rank-by only as last rank column";
+                throw new FrontendException(msg, 1128, PigException.INPUT);
+            }
+        }
+
+        rank.setRankColPlan(newExpPlans);
+        rank.setAscendingCol(newAscOrder);
+    }
+
     /**
      * Expand plan into multiple plans if the plan contains a project star,
      * if there is no project star the returned list contains the plan argument.

Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java Thu Sep 13 14:55:36 2012
@@ -36,6 +36,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LONative;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -54,11 +55,11 @@ public class SchemaAliasVisitor extends 
 
     /***
      * The logic here is to check if we have duplicate alias in each schema
-     * @throws FrontendException 
+     * @throws FrontendException
      */
     protected void validate(LogicalRelationalOperator op) throws FrontendException {
         LogicalSchema schema = op.getSchema();
-        
+
         Set<String> seenAliases = new HashSet<String>();
         if( schema != null){
             for( int i = 0; i < schema.size(); i++){
@@ -88,22 +89,22 @@ public class SchemaAliasVisitor extends 
     public void visit(LOFilter filter) throws FrontendException {
         validate( filter );
     }
-    
+
     @Override
     public void visit(LOJoin join) throws FrontendException {
         validate( join );
     }
-    
+
     @Override
     public void visit(LOForEach foreach) throws FrontendException {
         new SchemaAliasVisitor( foreach.getInnerPlan() ).visit();
     }
-    
+
     @Override
     public void visit(LOGenerate gen) throws FrontendException {
         validate( gen );
     }
-    
+
     @Override
     public void visit(LOInnerLoad load) throws FrontendException {
         validate( load );
@@ -113,42 +114,47 @@ public class SchemaAliasVisitor extends 
     public void visit(LOCogroup group) throws FrontendException {
         validate( group );
     }
-    
+
     @Override
     public void visit(LOSplit split) throws FrontendException {
         validate( split );
     }
-    
+
     @Override
     public void visit(LOSplitOutput splitOutput) throws FrontendException {
         validate( splitOutput );
     }
-    
+
     @Override
     public void visit(LOUnion union) throws FrontendException {
         validate( union );
     }
-    
+
     @Override
     public void visit(LOSort sort) throws FrontendException {
         validate( sort );
     }
-    
+
+    @Override
+    public void visit(LORank rank) throws FrontendException {
+        validate( rank );
+    }
+
     @Override
     public void visit(LODistinct distinct) throws FrontendException {
         validate( distinct );
     }
-    
+
     @Override
     public void visit(LOLimit limit) throws FrontendException {
         validate( limit );
     }
-    
+
     @Override
     public void visit(LOCross cross) throws FrontendException {
         validate( cross );
     }
-    
+
     @Override
     public void visit(LOStream stream) throws FrontendException {
         validate( stream );

Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Thu Sep 13 14:55:36 2012
@@ -46,6 +46,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -547,6 +548,45 @@ public class TypeCheckingRelVisitor exte
     }
 
     /***
+     * The schema of rank output will be the same as input, plus a rank field.
+     * @throws FrontendException
+     *
+     */
+    public void visit(LORank rank) throws FrontendException {
+        rank.resetSchema();
+
+        // Type checking internal plans.
+        List<LogicalExpressionPlan> rankColPlans = rank.getRankColPlans();
+
+        for(int i=0;i < rankColPlans.size(); i++) {
+            LogicalExpressionPlan rankColPlan = rankColPlans.get(i) ;
+
+            // Check that the inner plan has only 1 output port
+            if (rankColPlan.getSources().size() != 1) {
+                int errCode = 1057;
+                String msg = "Rank's inner plan can only have one output (leaf)" ;
+                msgCollector.collect(msg, MessageType.Error) ;
+                throwTypeCheckerException(rank, msg, errCode, PigException.INPUT, null) ;
+            }
+
+            visitExpressionPlan(rankColPlan, rank);
+
+        }
+
+        try {
+            // Compute the schema
+            rank.getSchema() ;
+        }
+        catch (FrontendException fee) {
+            int errCode = 1059;
+            String msg = "Problem while reconciling output schema of Rank" ;
+            msgCollector.collect(msg, MessageType.Error);
+            throwTypeCheckerException(rank, msg, errCode, PigException.INPUT, fee) ;
+        }
+
+    }
+
+    /***
      * The schema of split output will be the same as split input
      */
 

Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Thu Sep 13 14:55:36 2012
@@ -123,6 +123,7 @@ op_clause : define_clause 
           | limit_clause
           | sample_clause
           | order_clause
+          | rank_clause
           | cross_clause
           | join_clause
           | union_clause
@@ -374,48 +375,65 @@ col_index 
 col_range :  ^(COL_RANGE col_ref? DOUBLE_PERIOD col_ref?)
 ;
 
-pound_proj 
+pound_proj
     : ^( POUND ( QUOTEDSTRING | NULL ) )
 ;
 
-bin_expr 
-    : ^( BIN_EXPR cond expr expr )     
+bin_expr
+    : ^( BIN_EXPR cond expr expr )
 ;
 
-limit_clause 
+limit_clause
     : ^( LIMIT rel ( INTEGER | LONGINTEGER | expr ) )
 ;
 
-sample_clause 
+sample_clause
     :	 ^( SAMPLE rel ( DOUBLENUMBER | expr ) )
 ;
 
-order_clause 
+rank_clause
+	: ^( RANK rel ( rank_by_statement )? )
+;
+
+rank_by_statement
+	: ^( BY rank_by_clause ( DENSE )? )
+;
+
+rank_by_clause
+	: STAR ( ASC | DESC )?
+    | rank_col+
+;
+
+rank_col
+	: ( col_range | col_ref ) ( ASC | DESC )?
+;
+
+order_clause
     : ^( ORDER rel order_by_clause func_clause? )
 ;
 
-order_by_clause 
+order_by_clause
     : STAR ( ASC | DESC )?
     | order_col+
 ;
 
-order_col 
-    : (col_range | col_ref) ( ASC | DESC )?    
+order_col
+    : (col_range | col_ref) ( ASC | DESC )?
 ;
 
-distinct_clause 
+distinct_clause
     : ^( DISTINCT rel partition_clause? )
 ;
 
-partition_clause 
-    : ^( PARTITION func_name )    
+partition_clause
+    : ^( PARTITION func_name )
 ;
 
-cross_clause 
-    : ^( CROSS rel_list partition_clause? )    
+cross_clause
+    : ^( CROSS rel_list partition_clause? )
 ;
 
-rel_list 
+rel_list
     : rel+
 ;
 
@@ -601,6 +619,7 @@ eid : rel_str_op
     | ROLLUP
     | MATCHES
     | ORDER
+    | RANK
     | DISTINCT
     | COGROUP
     | JOIN

Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Thu Sep 13 14:55:36 2012
@@ -90,6 +90,7 @@ op_clause : define_clause 
           | limit_clause
           | sample_clause
           | order_clause
+          | rank_clause
           | cross_clause
           | join_clause
           | union_clause
@@ -346,25 +347,42 @@ col_index 
 col_range :  ^(COL_RANGE col_ref? { sb.append(".."); } DOUBLE_PERIOD col_ref?)
 ;
 
-pound_proj 
+pound_proj
     : ^( POUND { sb.append($POUND.text); }
         ( QUOTEDSTRING { sb.append($QUOTEDSTRING.text); } | NULL { sb.append($NULL.text); } ) )
 ;
 
-bin_expr 
-    : ^( BIN_EXPR { sb.append(" ("); } cond { sb.append(" ? "); } expr { sb.append(" : "); } expr { sb.append(") "); } )     
+bin_expr
+    : ^( BIN_EXPR { sb.append(" ("); } cond { sb.append(" ? "); } expr { sb.append(" : "); } expr { sb.append(") "); } )
 ;
 
-limit_clause 
-    : ^( LIMIT { sb.append($LIMIT.text).append(" "); } rel 
+limit_clause
+    : ^( LIMIT { sb.append($LIMIT.text).append(" "); } rel
         ( INTEGER { sb.append(" ").append($INTEGER.text); } | LONGINTEGER { sb.append(" ").append($LONGINTEGER.text); } | expr ) )
 ;
 
-sample_clause 
-    : ^( SAMPLE { sb.append($SAMPLE.text).append(" "); } rel ( DOUBLENUMBER { sb.append(" ").append($DOUBLENUMBER.text); } | expr ) )    
+sample_clause
+    : ^( SAMPLE { sb.append($SAMPLE.text).append(" "); } rel ( DOUBLENUMBER { sb.append(" ").append($DOUBLENUMBER.text); } | expr ) )
 ;
 
-order_clause 
+rank_clause
+    : ^( RANK { sb.append($RANK.text).append(" "); } rel ( rank_by_statement )? )
+;
+
+rank_by_statement
+	: ^( BY { sb.append(" ").append($BY.text); } rank_by_clause ( DENSE { sb.append(" ").append($DENSE.text); } )? )
+;
+
+rank_by_clause
+	: STAR { sb.append($STAR.text); } ( ASC { sb.append(" ").append($ASC.text); } | DESC { sb.append(" ").append($DESC.text); } )?
+    | rank_col ( { sb.append(", "); } rank_col )*
+;
+
+rank_col
+	: ( col_range | col_ref ) ( ASC { sb.append(" ").append($ASC.text); } | DESC { sb.append(" ").append($DESC.text); } )?
+;
+
+order_clause
     : ^( ORDER { sb.append($ORDER.text).append(" "); } rel
         { sb.append(" BY "); } order_by_clause
         ( { sb.append(" USING "); } func_clause )? )
@@ -589,6 +607,7 @@ eid : rel_str_op
     | ROLLUP    { sb.append($ROLLUP.text); }
     | MATCHES   { sb.append($MATCHES.text); }
     | ORDER     { sb.append($ORDER.text); }
+    | RANK      { sb.append($RANK.text); }
     | DISTINCT  { sb.append($DISTINCT.text); }
     | COGROUP   { sb.append($COGROUP.text); }
     | JOIN      { sb.append($JOIN.text); }

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Thu Sep 13 14:55:36 2012
@@ -138,6 +138,7 @@ op_clause : define_clause 
           | limit_clause
           | sample_clause
           | order_clause
+          | rank_clause
           | cross_clause
           | join_clause
           | union_clause
@@ -398,6 +399,20 @@ limit_clause : ^( LIMIT rel ( INTEGER | 
 sample_clause : ^( SAMPLE rel ( DOUBLENUMBER | expr ) )
 ;
 
+rank_clause : ^( RANK rel ( rank_by_statement )? )
+;
+
+rank_by_statement : ^( BY rank_by_clause ( DENSE )? )
+;
+
+rank_by_clause : STAR ( ASC | DESC )?
+               | rank_col+
+;
+
+rank_col : col_range (ASC | DESC)?
+         | col_ref ( ASC | DESC )?
+;
+
 order_clause : ^( ORDER rel order_by_clause func_clause? )
 ;
 
@@ -599,6 +614,7 @@ eid : rel_str_op
     | ROLLUP
     | MATCHES
     | ORDER
+    | RANK
     | DISTINCT
     | COGROUP
     | JOIN

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Thu Sep 13 14:55:36 2012
@@ -79,6 +79,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LONative;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -306,7 +307,28 @@ public class LogicalPlanBuilder {
         expandAndResetVisitor(loc, sort);
         return alias;
     }
-    
+
+    LORank createRankOp() {
+        return new LORank( plan );
+    }
+
+    String buildRankOp(SourceLocation loc, LORank rank, String alias, String inputAlias, List<LogicalExpressionPlan> plans,
+            List<Boolean> ascFlags) throws ParserValidationException {
+
+        //Rank
+        rank.setRankColPlan(plans);
+        if (ascFlags.isEmpty()) {
+            for (int i=0;i<plans.size();i++)
+                ascFlags.add(true);
+        }
+        rank.setAscendingCol(ascFlags);
+
+        buildOp( loc, rank, alias, inputAlias, null );
+        expandAndResetVisitor(loc, rank);
+
+        return alias;
+    }
+
     LOJoin createJoinOp() {
         return new LOJoin( plan );
     }

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu Sep 13 14:55:36 2012
@@ -85,6 +85,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
@@ -213,6 +214,7 @@ op_clause returns[String alias] : 
           | limit_clause { $alias = $limit_clause.alias; }
           | sample_clause { $alias = $sample_clause.alias; }
           | order_clause { $alias = $order_clause.alias; }
+          | rank_clause { $alias = $rank_clause.alias; }
           | cross_clause { $alias = $cross_clause.alias; }
           | join_clause { $alias = $join_clause.alias; }
           | union_clause { $alias = $union_clause.alias; }
@@ -1052,6 +1054,76 @@ scope GScope;
   ) )
 ;
 
+rank_clause returns[String alias]
+scope {
+	LORank rankOp;
+}
+scope GScope;
+@init {
+	$GScope::currentOp = builder.createRankOp();
+}
+@after {
+}
+ : ^( RANK rel rank_by_statement? )
+ {
+	SourceLocation loc = new SourceLocation( (PigParserNode) $rank_clause.start );
+
+	List<LogicalExpressionPlan> tempPlans = $rank_by_statement.plans;
+	List<Boolean> tempAscFlags = $rank_by_statement.ascFlags;
+
+	if(tempPlans == null && tempAscFlags == null) {
+		tempPlans = new ArrayList<LogicalExpressionPlan>();
+		tempAscFlags = new ArrayList<Boolean>();
+
+		((LORank)$GScope::currentOp).setIsRowNumber( true );
+	}
+
+	((LORank)$GScope::currentOp).setIsDenseRank( $rank_by_statement.isDenseRank != null?$rank_by_statement.isDenseRank:false );
+
+	$alias = builder.buildRankOp( loc, (LORank)$GScope::currentOp, $statement::alias, $statement::inputAlias, tempPlans, tempAscFlags );
+ }
+;
+
+rank_by_statement returns[List<LogicalExpressionPlan> plans, List<Boolean> ascFlags, Boolean isDenseRank]
+@init {
+    $plans = new ArrayList<LogicalExpressionPlan>();
+    $ascFlags = new ArrayList<Boolean>();
+    $isDenseRank = false;
+}
+ : ^( BY rank_by_clause ( DENSE { $isDenseRank =  true; }  )? )
+ {
+	$plans = $rank_by_clause.plans;
+	$ascFlags = $rank_by_clause.ascFlags;
+ }
+;
+
+rank_by_clause returns[List<LogicalExpressionPlan> plans, List<Boolean> ascFlags]
+@init {
+    $plans = new ArrayList<LogicalExpressionPlan>();
+    $ascFlags = new ArrayList<Boolean>();
+}
+ : STAR {
+		LogicalExpressionPlan plan = new LogicalExpressionPlan();
+		builder.buildProjectExpr( new SourceLocation( (PigParserNode)$STAR ), plan, $GScope::currentOp, $statement::inputIndex, null, -1 );
+		$plans.add( plan );
+   }
+   ( ASC { $ascFlags.add( true ); } | DESC { $ascFlags.add( false ); } )?
+ | ( rank_col
+   {
+       $plans.add( $rank_col.plan );
+       $ascFlags.add( $rank_col.ascFlag );
+   } )+
+;
+
+rank_col returns[LogicalExpressionPlan plan, Boolean ascFlag]
+@init {
+    $plan = new LogicalExpressionPlan();
+    $ascFlag = true;
+}
+ : col_range[$plan] (ASC | DESC { $ascFlag = false; } )?
+ | col_ref[$plan] ( ASC | DESC { $ascFlag = false; } )?
+;
+
 order_clause returns[String alias]
 scope GScope;
 @init {

Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Thu Sep 13 14:55:36 2012
@@ -81,6 +81,12 @@ FOREACH : 'FOREACH'
 ORDER   :  'ORDER'
 ;
 
+RANK   :  'RANK'
+;
+
+DENSE   :  'DENSE'
+;
+
 CUBE    : 'CUBE'
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Thu Sep 13 14:55:36 2012
@@ -222,6 +222,7 @@ op_clause : define_clause 
           | limit_clause
           | sample_clause
           | order_clause
+          | rank_clause
           | cross_clause
           | join_clause
           | union_clause
@@ -501,6 +502,24 @@ limit_clause : LIMIT^ rel ( (INTEGER SEM
 sample_clause : SAMPLE^ rel ( (DOUBLENUMBER SEMI_COLON) => DOUBLENUMBER | expr )
 ;
 
+rank_clause : RANK^ rel ( rank_by_statement )?
+;
+
+rank_by_statement : BY^ rank_by_clause ( DENSE )?
+;
+
+rank_by_clause : STAR ( ASC | DESC )?
+			   | rank_list
+;
+
+rank_list : rank_col ( COMMA rank_col )*
+         -> rank_col+
+;
+
+rank_col : col_range ( ASC | DESC )?
+         | col_ref ( ASC | DESC )?
+;
+
 order_clause : ORDER^ rel BY! order_by_clause ( USING! func_clause )?
 ;
 

Modified: pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java (original)
+++ pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java Thu Sep 13 14:55:36 2012
@@ -27,7 +27,9 @@ import java.util.List;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -93,13 +95,13 @@ public class IllustratorAttacher extends
     LineageTracer lineage;
 
     HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap;
-    
+
     private HashMap<PhysicalOperator, DataBag> poToDataMap;
     private int maxRecords;
     private boolean revisit = false;
     private ArrayList<Boolean[]> subExpResults = null;
     private final Map<POLoad, LogicalSchema> poloadToSchemaMap;
-    
+
     public IllustratorAttacher(PhysicalPlan plan, LineageTracer lineage, int maxRecords,
         Map<POLoad, LogicalSchema> poLoadToSchemaMap, PigContext hadoopPigContext) throws VisitorException {
         super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
@@ -124,7 +126,7 @@ public class IllustratorAttacher extends
         mPlan = oriPlan;
         popWalker();
     }
-    
+
     private void setIllustrator(PhysicalOperator po, int nEqClasses) {
         if (revisit && po.getIllustrator() != null)
             return;
@@ -139,7 +141,7 @@ public class IllustratorAttacher extends
         po.setIllustrator(illustrator);
         poToDataMap.put(po, illustrator.getData());
     }
-    
+
     private void setIllustrator(PhysicalOperator po, LinkedList<IdentityHashSet<Tuple>> eqClasses) {
         if (revisit && po.getIllustrator() != null)
             return;
@@ -161,20 +163,20 @@ public class IllustratorAttacher extends
         poToEqclassesMap.put(po, eqClasses);
         poToDataMap.put(po, illustrator.getData());
     }
-    
+
     public Map<PhysicalOperator, DataBag> getDataMap() {
         return poToDataMap;
     }
-    
+
     @Override
     public void visitLoad(POLoad ld) throws VisitorException{
-        // LOAD from temporary files need no illustrator 
+        // LOAD from temporary files need no illustrator
         if (revisit)
             return;
-        
+
         LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
         poToEqclassesMap.put(ld, eqClasses);
-        
+
         IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
         eqClasses.add(eqClass);
         Illustrator illustrator;
@@ -182,12 +184,12 @@ public class IllustratorAttacher extends
         ld.setIllustrator(illustrator);
         poToDataMap.put(ld, illustrator.getData());
     }
-    
+
     @Override
     public void visitStore(POStore st) throws VisitorException{
         setIllustrator(st, 1);
     }
-    
+
     @Override
     public void visitFilter(POFilter fl) throws VisitorException{
         setIllustrator(fl, 0);
@@ -195,13 +197,13 @@ public class IllustratorAttacher extends
         innerPlanAttach(fl, fl.getPlan());
         subExpResults = null;
     }
-    
+
     @Override
     public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
       super.visitLocalRearrange(lr);
       setIllustrator(lr);
     }
-    
+
     @Override
     public void visitPackage(POPackage pkg) throws VisitorException{
         if (!(pkg instanceof POPackageLite) && pkg.isDistinct())
@@ -209,17 +211,17 @@ public class IllustratorAttacher extends
         else
             setIllustrator(pkg, null);
     }
-    
+
     @Override
     public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
         setIllustrator(pkg);
     }
- 
+
     @Override
     public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
       setIllustrator(pkg);
     }
-    
+
     @Override
     public void visitPOForEach(POForEach nfe) throws VisitorException {
         if (revisit && nfe.getIllustrator() != null)
@@ -239,14 +241,14 @@ public class IllustratorAttacher extends
         } else
             setIllustrator(nfe, 1);
     }
-    
+
     @Override
     public void visitUnion(POUnion un) throws VisitorException{
         if (revisit && un.getIllustrator() != null)
           return;
         setIllustrator(un, null);
     }
-    
+
     @Override
     public void visitSplit(POSplit spl) throws VisitorException{
         if (revisit && spl.getIllustrator() != null)
@@ -265,63 +267,73 @@ public class IllustratorAttacher extends
         innerPlanAttach(demux, innerPlan);
       setIllustrator(demux);
     }
-    
+
     @Override
 	public void visitDistinct(PODistinct distinct) throws VisitorException {
         setIllustrator(distinct, 1);
 	}
-    
+
     @Override
 	public void visitSort(POSort sort) throws VisitorException {
         setIllustrator(sort, 1);
 	}
-    
+
+    @Override
+    public void visitRank(PORank rank) throws VisitorException {
+        setIllustrator(rank, 3);
+    }
+
+    @Override
+    public void visitCounter(POCounter counter) throws VisitorException {
+        setIllustrator(counter, 1);
+    }
+
     @Override
     public void visitProject(POProject proj) throws VisitorException{
     }
-    
+
     @Override
     public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
         setIllustrator(grt, 0);
         if (!revisit && subExpResults != null)
             subExpResults.add(grt.getIllustrator().getSubExpResult());
     }
-    
+
     @Override
     public void visitLessThan(LessThanExpr lt) throws VisitorException{
         setIllustrator(lt, 0);
         if (!revisit && subExpResults != null)
             subExpResults.add(lt.getIllustrator().getSubExpResult());
     }
-    
+
     @Override
     public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
         setIllustrator(gte, 0);
         if (!revisit && subExpResults != null)
             subExpResults.add(gte.getIllustrator().getSubExpResult());
     }
-    
+
     @Override
     public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
         setIllustrator(lte, 0);
         if (!revisit && subExpResults != null)
             subExpResults.add(lte.getIllustrator().getSubExpResult());
     }
-    
+
     @Override
     public void visitEqualTo(EqualToExpr eq) throws VisitorException{
         setIllustrator(eq, 0);
         if (!revisit && subExpResults != null)
             subExpResults.add(eq.getIllustrator().getSubExpResult());
     }
-    
+
     @Override
     public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
         setIllustrator(eq, 0);
         if (!revisit && subExpResults != null)
             subExpResults.add(eq.getIllustrator().getSubExpResult());
     }
-    
+
     @Override
     public void visitRegexp(PORegexp re) throws VisitorException{
         setIllustrator(re, 0);
@@ -335,12 +347,12 @@ public class IllustratorAttacher extends
         if (!revisit && subExpResults != null)
             subExpResults.add(isNull.getIllustrator().getSubExpResult());
     }
-    
+
     @Override
     public void visitAnd(POAnd and) throws VisitorException {
         setIllustrator(and, 0);
     }
-    
+
     @Override
     public void visitOr(POOr or) throws VisitorException {
         setIllustrator(or, 0);
@@ -362,11 +374,11 @@ public class IllustratorAttacher extends
     public void visitNegative(PONegative negative) {
       setIllustrator(negative, 1);
     }
-    
+
     @Override
     public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
     }
-    
+
     @Override
     public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException {
         // one each for >, ==, and <
@@ -377,7 +389,7 @@ public class IllustratorAttacher extends
     public void visitMapLookUp(POMapLookUp mapLookUp) {
       setIllustrator(mapLookUp, 1);
     }
-    
+
     @Override
     public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
         if (revisit &&  joinPackage.getIllustrator() != null)
@@ -389,12 +401,12 @@ public class IllustratorAttacher extends
     @Override
     public void visitCast(POCast cast) {
     }
-    
+
     @Override
     public void visitLimit(POLimit lim) throws VisitorException {
         setIllustrator(lim, 1);
     }
-    
+
     @Override
     public void visitStream(POStream stream) throws VisitorException {
         setIllustrator(stream, 1);
@@ -407,7 +419,7 @@ public class IllustratorAttacher extends
     public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
         visitPOForEach(optimizedForEach);
     }
-    
+
     private void innerPlanAttach(PhysicalOperator po, PhysicalPlan plan) throws VisitorException {
         PlanWalker<PhysicalOperator, PhysicalPlan> childWalker =
               mCurrentWalker.spawnChildWalker(plan);

Modified: pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original)
+++ pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java Thu Sep 13 14:55:36 2012
@@ -40,6 +40,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduceCounter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -184,13 +185,19 @@ public class LocalMapReduceSimulator {
                     split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
                     ++index;
                     Mapper<Text, Tuple, PigNullableWritable, Writable> map;
-                    
+
                     if (mro.reducePlan.isEmpty()) {
                         // map-only
                         map = new PigMapOnly.Map();
-                        ((PigMapBase) map).setMapPlan(mro.mapPlan);
                         Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapOnly.Map) map)
                           .getIllustratorContext(jobConf, input, intermediateData, split);
+                        if(mro.isCounterOperation()) {
+                            if(mro.isRowNumber()) {
+                                map = new PigMapReduceCounter.PigMapCounter();
+                            }
+                            context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split);
+                        }
+                        ((PigMapBase) map).setMapPlan(mro.mapPlan);
                         map.run(context);
                     } else {
                         if ("true".equals(jobConf.get("pig.usercomparator")))
@@ -216,9 +223,15 @@ public class LocalMapReduceSimulator {
                         reduce = new PigMapReduce.ReduceWithComparator();
                     else
                         reduce = new PigMapReduce.Reduce();
-                    reduce.setReducePlan(mro.reducePlan);
                     Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable>.Context
                         context = reduce.getIllustratorContext(job, intermediateData, (POPackage) pack);
+
+                    if(mro.isCounterOperation()) {
+                        reduce = new PigMapReduceCounter.PigReduceCounter();
+                        context = ((PigMapReduceCounter.PigReduceCounter)reduce).getIllustratorContext(job, intermediateData, (POPackage) pack);
+                    }
+
+                    ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan);
                     reduce.run(context);
                 }
                 for (PhysicalOperator key : mro.phyToMRMap.keySet())

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Sep 13 14:55:36 2012
@@ -61,6 +61,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -82,6 +84,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LONative;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOStream;
@@ -93,14 +96,14 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 
 /**
- * ScriptStates encapsulates settings for a Pig script that runs on a hadoop 
- * cluster. These settings are added to all MR jobs spawned by the script and 
- * in turn are persisted in the hadoop job xml. With the properties already in 
+ * ScriptStates encapsulates settings for a Pig script that runs on a hadoop
+ * cluster. These settings are added to all MR jobs spawned by the script and
+ * in turn are persisted in the hadoop job xml. With the properties already in
  * the job xml, users who want to know the relations between the script and MR
- * jobs can derive them from the job xmls.  
+ * jobs can derive them from the job xmls.
  */
 public class ScriptState {
-    
+
     /**
      * Keys of Pig settings added in MR job
      */
@@ -118,17 +121,17 @@ public class ScriptState {
         SCRIPT_FEATURES     ("pig.script.features"),
         JOB_ALIAS           ("pig.alias"),
         JOB_ALIAS_LOCATION  ("pig.alias.location");
-       
+
         private String displayStr;
-        
+
         private PIG_PROPERTY(String s) {
             displayStr = s;
         }
-        
+
         @Override
         public String toString() { return displayStr; }
     };
-    
+
     /**
      * Features used in a Pig script
      */
@@ -144,6 +147,7 @@ public class ScriptState {
         COGROUP,
         GROUP_BY,
         ORDER_BY,
+        RANK,
         DISTINCT,
         STREAMING,
         SAMPLER,
@@ -158,42 +162,42 @@ public class ScriptState {
         NATIVE,
         MAP_PARTIALAGG;
     };
-    
+
     /**
      * Pig property that allows user to turn off the inclusion of settings
-     * in the jobs 
+     * in the jobs
      */
     public static final String INSERT_ENABLED = "pig.script.info.enabled";
-    
+
     /**
-     * Restricts the size of Pig script stored in job xml 
+     * Restricts the size of Pig script stored in job xml
      */
-    public static final int MAX_SCRIPT_SIZE = 10240; 
-    
+    public static final int MAX_SCRIPT_SIZE = 10240;
+
     private static final Log LOG = LogFactory.getLog(ScriptState.class);
 
     private static ThreadLocal<ScriptState> tss = new ThreadLocal<ScriptState>();
-       
+
     private String id;
-    
+
     private String script;
     private String commandLine;
     private String fileName;
-    
+
     private String pigVersion;
     private String hodoopVersion;
-    
+
     private long scriptFeatures;
-    
+
     private PigContext pigContext;
-    
+
     private Map<MapReduceOper, String> featureMap = null;
     private Map<MapReduceOper, String> aliasMap = new HashMap<MapReduceOper, String>();
     private Map<MapReduceOper, String> aliasLocationMap = new HashMap<MapReduceOper, String>();
-    
+
     private List<PigProgressNotificationListener> listeners
             = new ArrayList<PigProgressNotificationListener>();
-    
+
     public static ScriptState start(String commandLine, PigContext pigContext) {
         ScriptState ss = new ScriptState(UUID.randomUUID().toString());
         ss.setCommandLine(commandLine);
@@ -201,10 +205,10 @@ public class ScriptState {
         tss.set(ss);
         return ss;
     }
-    
+
     private ScriptState(String id) {
         this.id = id;
-        this.script = ""; 
+        this.script = "";
     }
 
     public static ScriptState get() {
@@ -212,16 +216,16 @@ public class ScriptState {
             ScriptState.start("", null);
         }
         return tss.get();
-    }           
-       
+    }
+
     public void registerListener(PigProgressNotificationListener listener) {
         listeners.add(listener);
     }
-    
+
     public List<PigProgressNotificationListener> getAllListeners() {
         return listeners;
     }
-        
+
     public void emitInitialPlanNotification(MROperPlan plan) {
         for (PigProgressNotificationListener listener: listeners) {
             try {
@@ -327,11 +331,11 @@ public class ScriptState {
         }
 
         setPigFeature(mro, conf);
-        
+
         setJobParents(mro, conf);
     }
- 
-    public void setScript(File file) {            
+
+    public void setScript(File file) {
         try {
             setScript(new BufferedReader(new FileReader(file)));
         } catch (FileNotFoundException e) {
@@ -339,19 +343,19 @@ public class ScriptState {
         }
     }
 
-    public void setScript(String script) {        
+    public void setScript(String script) {
         if (script == null) return;
-        
+
         // restrict the size of the script to be stored in job conf
         script = (script.length() > MAX_SCRIPT_SIZE) ? script.substring(0,
                 MAX_SCRIPT_SIZE) : script;
-        
+
         // XML parser cann't handle certain characters, including
         // the control character (&#1). Use Base64 encoding to
-        // get around this problem        
+        // get around this problem
         this.script = new String(Base64.encodeBase64(script.getBytes()));
     }
-   
+
     public void setScriptFeatures(LogicalPlan plan) {
         BitSet bs = new BitSet();
         try {
@@ -359,59 +363,59 @@ public class ScriptState {
         } catch (FrontendException e) {
             LOG.warn("unable to get script feature", e);
         }
-        scriptFeatures = bitSetToLong(bs);        
-        
+        scriptFeatures = bitSetToLong(bs);
+
         LOG.info("Pig features used in the script: "
                 + featureLongToString(scriptFeatures));
     }
-    
+
     public String getHadoopVersion() {
         if (hodoopVersion == null) {
             hodoopVersion = VersionInfo.getVersion();
         }
         return (hodoopVersion == null) ? "" : hodoopVersion;
     }
-    
+
     public String getPigVersion() {
         if (pigVersion == null) {
             String findContainingJar = JarManager.findContainingJar(ScriptState.class);
             if (findContainingJar != null) {
-            try { 
-                JarFile jar = new JarFile(findContainingJar); 
-                final Manifest manifest = jar.getManifest(); 
-                final Map <String,Attributes> attrs = manifest.getEntries(); 
+                try {
+                    JarFile jar = new JarFile(findContainingJar);
+                    final Manifest manifest = jar.getManifest();
+                    final Map <String,Attributes> attrs = manifest.getEntries();
                 Attributes attr = attrs.get("org/apache/pig");
                 pigVersion = attr.getValue("Implementation-Version");
-            } catch (Exception e) { 
-                LOG.warn("unable to read pigs manifest file"); 
-            } 
+                } catch (Exception e) {
+                    LOG.warn("unable to read pigs manifest file");
+                }
             } else {
                 LOG.warn("unable to read pigs manifest file. Not running from the Pig jar");
         }
         }
         return (pigVersion == null) ? "" : pigVersion;
     }
-        
+
     public String getFileName() { return fileName; }
-    
+
     public void setFileName(String fileName) {
         this.fileName = fileName;
     }
-    
+
     String getId() { return id; }
-        
+
     private String getCommandLine() {
         return (commandLine == null) ? "" : commandLine;
     }
-    
+
     private void setCommandLine(String commandLine) {
         this.commandLine = commandLine;
     }
-    
+
     private String getScript() {
         return (script == null) ? "" : script;
     }
-    
+
     private void setScript(BufferedReader reader) {
         StringBuilder sb = new StringBuilder();
         try {
@@ -427,21 +431,21 @@ public class ScriptState {
         }
         setScript(sb.toString());
     }
-    
+
     private void setPigFeature(MapReduceOper mro, Configuration conf) {
         conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), getPigFeature(mro));
         if (scriptFeatures != 0) {
-            conf.set(PIG_PROPERTY.SCRIPT_FEATURES.toString(), 
+            conf.set(PIG_PROPERTY.SCRIPT_FEATURES.toString(),
                     String.valueOf(scriptFeatures));
         }
-        conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), getAlias(mro));    
+        conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), getAlias(mro));
         conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), getAliasLocation(mro));
     }
-    
+
     private void setJobParents(MapReduceOper mro, Configuration conf) {
         // PigStats maintains a job DAG with the job id being updated
-        // upon available. Therefore, before a job is submitted, the ids 
-        // of its parent jobs are already available. 
+        // upon available. Therefore, before a job is submitted, the ids
+        // of its parent jobs are already available.
         JobGraph jg = PigStats.get().getJobGraph();
         JobStats js = null;
         Iterator<JobStats> iter = jg.iterator();
@@ -465,11 +469,11 @@ public class ScriptState {
             }
         }
     }
-    
+
     String getScriptFeatures() {
         return featureLongToString(scriptFeatures);
     }
-    
+
     public String getAlias(MapReduceOper mro) {
         if (!aliasMap.containsKey(mro)) {
             setAlias(mro);
@@ -514,32 +518,32 @@ public class ScriptState {
         if (featureMap == null) {
             featureMap = new HashMap<MapReduceOper, String>();
         }
-        
+
         String retStr = featureMap.get(mro);
-        if (retStr == null) {       
+        if (retStr == null) {
             BitSet feature = new BitSet();
             feature.clear();
             if (mro.isSkewedJoin()) {
                 feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
-            } 
+            }
             if (mro.isGlobalSort()) {
                 feature.set(PIG_FEATURE.ORDER_BY.ordinal());
-            } 
-            if (mro.isSampler()) { 
+            }
+            if (mro.isSampler()) {
                 feature.set(PIG_FEATURE.SAMPLER.ordinal());
-            } 
-            if (mro.isIndexer()) { 
+            }
+            if (mro.isIndexer()) {
                 feature.set(PIG_FEATURE.INDEXER.ordinal());
             }
             if (mro.isCogroup()) {
                 feature.set(PIG_FEATURE.COGROUP.ordinal());
-            } 
+            }
             if (mro.isGroupBy()) {
                 feature.set(PIG_FEATURE.GROUP_BY.ordinal());
-            } 
+            }
             if (mro.isRegularJoin()) {
                 feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
-            }  
+            }
             if (mro.needsDistinctCombiner()) {
                 feature.set(PIG_FEATURE.DISTINCT.ordinal());
             }
@@ -570,8 +574,8 @@ public class ScriptState {
             featureMap.put(mro, retStr);
         }
         return retStr;
-    }    
-    
+    }
+
     private long bitSetToLong(BitSet bs) {
         long ret = 0;
         for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
@@ -579,10 +583,10 @@ public class ScriptState {
         }
         return ret;
     }
-    
+
     private String featureLongToString(long l) {
         if (l == 0) return PIG_FEATURE.UNKNOWN.name();
-        
+
         StringBuilder sb = new StringBuilder();
         for (int i=0; i<PIG_FEATURE.values().length; i++) {
             if (((l >> i) & 0x00000001) != 0) {
@@ -592,7 +596,7 @@ public class ScriptState {
         }
         return sb.toString();
     }
-    
+
     public void setPigContext(PigContext pigContext) {
         this.pigContext = pigContext;
     }
@@ -603,18 +607,18 @@ public class ScriptState {
 
     private static class FeatureVisitor extends PhyPlanVisitor {
         private BitSet feature;
-        
+
         public FeatureVisitor(PhysicalPlan plan, BitSet feature) {
             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
                     plan));
             this.feature = feature;
         }
-        
+
         @Override
         public void visitFRJoin(POFRJoin join) throws VisitorException {
             feature.set(PIG_FEATURE.REPLICATED_JOIN.ordinal());
         }
-        
+
         @Override
         public void visitMergeJoin(POMergeJoin join) throws VisitorException {
             if (join.getJoinType()==LOJoin.JOINTYPE.MERGESPARSE)
@@ -622,55 +626,55 @@ public class ScriptState {
             else
                 feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
         }
-        
+
         @Override
         public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
                 throws VisitorException {
             feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());;
         }
-        
+
         @Override
         public void visitCollectedGroup(POCollectedGroup mg)
-                throws VisitorException {           
+        throws VisitorException {
             feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
         }
-        
+
         @Override
         public void visitDistinct(PODistinct distinct) throws VisitorException {
             feature.set(PIG_FEATURE.DISTINCT.ordinal());
         }
-        
+
         @Override
         public void visitStream(POStream stream) throws VisitorException {
             feature.set(PIG_FEATURE.STREAMING.ordinal());
         }
-        
+
         @Override
         public void visitSplit(POSplit split) throws VisitorException {
             feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
         }
-        
+
         @Override
         public void visitDemux(PODemux demux) throws VisitorException {
-            feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());         
+            feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
         }
-        
+
         @Override
         public void visitPartialAgg(POPartialAgg partAgg){
             feature.set(PIG_FEATURE.MAP_PARTIALAGG.ordinal());
         }
-        
-    }    
-    
+
+    }
+
     static class LogicalPlanFeatureVisitor extends LogicalRelationalNodesVisitor {
-        
+
         private BitSet feature;
-        
+
         protected LogicalPlanFeatureVisitor(LogicalPlan plan, BitSet feature) throws FrontendException {
-            super(plan, new org.apache.pig.newplan.DepthFirstWalker(plan));            
+            super(plan, new org.apache.pig.newplan.DepthFirstWalker(plan));
             this.feature = feature;
         }
-        
+
         @Override
         public void visit(LOCogroup op) {
             if (op.getGroupType() == GROUPTYPE.COLLECTED) {
@@ -685,27 +689,27 @@ public class ScriptState {
                 }
             }
         }
-        
+
         @Override
         public void visit(LOCross op) {
             feature.set(PIG_FEATURE.CROSS.ordinal());
         }
-        
+
         @Override
         public void visit(LODistinct op) {
             feature.set(PIG_FEATURE.DISTINCT.ordinal());
         }
-        
+
         @Override
         public void visit(LOFilter op) {
             feature.set(PIG_FEATURE.FILTER.ordinal());
         }
-        
+
         @Override
         public void visit(LOForEach op) {
-            
+
         }
-                
+
         @Override
         public void visit(LOJoin op) {
             if (op.getJoinType() == JOINTYPE.HASH) {
@@ -720,45 +724,49 @@ public class ScriptState {
                 feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
             }
         }
-        
+
         @Override
         public void visit(LOLimit op) {
             feature.set(PIG_FEATURE.LIMIT.ordinal());
         }
-        
+
         @Override
+        public void visit(LORank op) {
+            feature.set(PIG_FEATURE.RANK.ordinal());
+        }
+
         public void visit(LOSort op) {
             feature.set(PIG_FEATURE.ORDER_BY.ordinal());
         }
-        
+
         @Override
         public void visit(LOStream op) {
             feature.set(PIG_FEATURE.STREAMING.ordinal());
         }
-        
+
         @Override
         public void visit(LOSplit op) {
-            
+
         }
-        
+
         @Override
         public void visit(LOUnion op) {
             feature.set(PIG_FEATURE.UNION.ordinal());
         }
-        
+
         @Override
         public void visit(LONative n) {
             feature.set(PIG_FEATURE.NATIVE.ordinal());
         }
 
     }
-    
+
     private static class AliasVisitor extends PhyPlanVisitor {
-        
+
         private HashSet<String> aliasSet;
-        
+
         private List<String> alias;
-        
+
         private final List<String> aliasLocation;
 
         public AliasVisitor(PhysicalPlan plan, List<String> alias, List<String> aliasLocation) {
@@ -771,69 +779,69 @@ public class ScriptState {
                 for (String s : alias) aliasSet.add(s);
             }
         }
-        
+
         @Override
         public void visitLoad(POLoad load) throws VisitorException {
             setAlias(load);
             super.visitLoad(load);
         }
-        
+
         @Override
         public void visitFRJoin(POFRJoin join) throws VisitorException {
             setAlias(join);
             super.visitFRJoin(join);
         }
-        
+
         @Override
         public void visitMergeJoin(POMergeJoin join) throws VisitorException {
             setAlias(join);
             super.visitMergeJoin(join);
         }
-        
+
         @Override
         public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
                 throws VisitorException {
             setAlias(mergeCoGrp);
             super.visitMergeCoGroup(mergeCoGrp);
         }
-        
+
         @Override
         public void visitCollectedGroup(POCollectedGroup mg)
-                throws VisitorException {           
+        throws VisitorException {
             setAlias(mg);
             super.visitCollectedGroup(mg);
         }
-        
+
         @Override
         public void visitDistinct(PODistinct distinct) throws VisitorException {
             setAlias(distinct);
             super.visitDistinct(distinct);
         }
-        
+
         @Override
         public void visitStream(POStream stream) throws VisitorException {
             setAlias(stream);
             super.visitStream(stream);
         }
-        
+
         @Override
         public void visitFilter(POFilter fl) throws VisitorException {
             setAlias(fl);
             super.visitFilter(fl);
         }
-         
+
         @Override
         public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException {
             setAlias(lr);
             super.visitLocalRearrange(lr);
         }
-        
+
         @Override
         public void visitPOForEach(POForEach nfe) throws VisitorException {
             setAlias(nfe);
             super.visitPOForEach(nfe);
         }
-        
+
         @Override
         public void visitUnion(POUnion un) throws VisitorException {
             setAlias(un);
@@ -845,19 +853,19 @@ public class ScriptState {
             setAlias(sort);
             super.visitSort(sort);
         }
- 
+
         @Override
         public void visitLimit(POLimit lim) throws VisitorException {
             setAlias(lim);
             super.visitLimit(lim);
         }
-        
+
         @Override
         public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
             setAlias(sk);
             super.visitSkewedJoin(sk);
         }
-        
+
         private void setAlias(PhysicalOperator op) {
             String s = op.getAlias();
             if (s != null) {
@@ -872,5 +880,5 @@ public class ScriptState {
         }
     }
     }
-    
+
 }

Modified: pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm (original)
+++ pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm Thu Sep 13 14:55:36 2012
@@ -228,6 +228,16 @@ sub generateData
             'filetype' => "numbers",
             'rows' => 5000,
             'hdfs' => "types/numbers.txt",
+        }, {
+            'name' => "biggish",
+            'filetype' => "biggish",
+            'rows' => 1000000,
+            'hdfs' => "singlefile/biggish",
+        }, {
+            'name' => "prerank",
+            'filetype' => "ranking",
+            'rows' => 30,
+            'hdfs' => "singlefile/prerank",
         }
     );
 

Modified: pig/trunk/test/e2e/pig/deployers/LocalDeployer.pm
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/deployers/LocalDeployer.pm?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/deployers/LocalDeployer.pm (original)
+++ pig/trunk/test/e2e/pig/deployers/LocalDeployer.pm Thu Sep 13 14:55:36 2012
@@ -204,6 +204,16 @@ sub generateData
             'filetype' => "numbers",
             'rows' => 5000,
             'outfile' => "types/numbers.txt",
+        }, {
+            'name' => "biggish",
+            'filetype' => "biggish",
+            'rows' => 1000000,
+            'outfile' => "singlefile/biggish",
+        }, {
+            'name' => "prerank",
+            'filetype' => "ranking",
+            'rows' => 30,
+            'outfile' => "singlefile/prerank",
         }
     );
 

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Thu Sep 13 14:55:36 2012
@@ -4974,6 +4974,173 @@ store a into ':OUTPATH:';\,
                         store F into ':OUTPATH:';\,
                         }
                 ]
+            },
+            {
+                'name' => 'Rank',
+                'tests' => [
+                    {
+						'num' => 1,
+						'pig' => q\
+									SET default_parallel 9;
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									B = rank A by a ASC,b ASC DENSE;
+									C = foreach B generate rank_A,a,b;
+									store C into ':OUTPATH:';
+								\,
+						'verify_pig_script' => q\
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									C = foreach A generate rankaaba,a,b;
+									store C into ':OUTPATH:';
+								\,
+					}, {
+						'num' => 2,
+						'pig' => q\
+									SET default_parallel 9;
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									B = rank A by a ASC,c DESC DENSE;
+									C = foreach B generate rank_A,a,c;
+									store C into ':OUTPATH:';
+								\,
+						'verify_pig_script' => q\
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									C = foreach A generate rankaacd,a,c;
+									store C into ':OUTPATH:';
+								\,
+					}, {
+						'num' => 3,
+						'pig' => q\
+									SET default_parallel 7;
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									B = rank A by b DESC,c ASC DENSE;
+									C = foreach B generate rank_A,b,c;
+									store C into ':OUTPATH:';
+								\,
+						'verify_pig_script' => q\
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									C = foreach A generate rankbdca,b,c;
+									store C into ':OUTPATH:';
+								\,
+					}, {
+						'num' => 4,
+						'pig' => q\
+									SET default_parallel 7;
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									B = rank A;
+									C = foreach B generate rank_A,a,b,c;
+									store C into ':OUTPATH:';
+								\,
+						'verify_pig_script' => q\
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									C = foreach A generate rownumber,a,b,c;
+									store C into ':OUTPATH:';
+								\,
+					}, {
+						'num' =>5,
+						'pig' => q\
+									SET default_parallel 9;
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									B = rank A by b DESC,a ASC;
+									C = foreach B generate rank_A,b,a;
+									store C into ':OUTPATH:';
+								\,
+						'verify_pig_script' => q\
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									C = foreach A generate rankbdaa,b,a;
+									store C into ':OUTPATH:';
+								\,
+					}, {
+						'num' =>6,
+						'pig' => q\
+									SET default_parallel 7;
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									B = rank A by c ASC,b DESC;
+									C = foreach B generate rank_A,c,b;
+									store C into ':OUTPATH:';
+								\,
+						'verify_pig_script' => q\
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									C = foreach A generate rankcabd,c,b;
+									store C into ':OUTPATH:';
+								\,
+					}, {
+						'num' => 7,
+						'pig' => q\
+									SET default_parallel 7;
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									B = foreach A generate a,b,c,tail;
+									C = rank B by a ASC,b ASC DENSE;
+									D = rank C by a ASC,c DESC DENSE;
+									E = rank D by b DESC,c ASC DENSE;
+									F = foreach E generate rank_D,rank_C,rank_B,a,b,c;
+									store F into ':OUTPATH:';
+								\,
+						'verify_pig_script' => q\
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									B = foreach A generate rankbdca,rankaacd,rankaaba,a,b,c;
+									store B into ':OUTPATH:';
+								\,
+					}, {
+						'num' => 8,
+						'pig' => q\
+									SET default_parallel 9;
+									SET pig.splitCombination false;
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									B = foreach A generate a,b,c;
+									C = rank B by a ASC,b ASC DENSE;
+									D = rank B by a ASC,c DESC DENSE;
+									F = join C by $0, D by $0;
+									G = foreach F generate C::rank_B, D::rank_B, C::a, C::b, C::c;
+									H = order G by a ASC, b ASC, c DESC;
+									store H into ':OUTPATH:';
+								\,
+						'verify_pig_script' => q\
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									C = foreach A generate rankaaba,a,b,c;
+									E = order C by a ASC,b ASC;
+									D = foreach A generate rankaacd,a,b,c;
+									F = order D by a ASC,c DESC;
+									G = join E by $0, F by $0;
+									H = foreach G generate E::rankaaba, F::rankaacd, E::a, E::b, E::c;
+									store H into ':OUTPATH:';
+								\,
+					}, {
+						'num' => 9,
+						'pig' => q\
+									SET default_parallel 25;
+									A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray);
+									B = rank A;
+									C = order B by rank_A;
+									D = foreach C generate rank_A,rownumber;
+									store D into ':OUTPATH:';
+								\,
+						'verify_pig_script' => q\
+									A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray);
+									D = foreach A generate idx,rownumber;
+									store D into ':OUTPATH:';
+								\,
+					}, {
+						'num' => 10,
+						'pig' => q\
+									SET default_parallel 11;
+									SET pig.splitCombination false;
+									A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray);
+									B = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+									C = join A by rownumber, B by rownumber;
+									D = order C by B::rankcabd,B::rankbdca,B::rankaaba;
+									E = rank D;
+									F = group E by rank_D;
+									G = foreach F generate group, COUNT(E);
+									H = order G by group;
+									store H into ':OUTPATH:';
+								\,
+						'verify_pig_script' => q\
+									A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray);
+									B = foreach A generate rownumber,1;
+									C = order B by rownumber;
+									store C into ':OUTPATH:';
+								\,
+            }
+                ]
             }
         ],
     },

Modified: pig/trunk/test/e2e/pig/tools/generate/generate_data.pl
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tools/generate/generate_data.pl?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tools/generate/generate_data.pl (original)
+++ pig/trunk/test/e2e/pig/tools/generate/generate_data.pl Thu Sep 13 14:55:36 2012
@@ -32,6 +32,16 @@ our @lastName = ("allen", "brown", "cars
     "nixon", "ovid", "polk", "quirinius", "robinson", "steinbeck", "thompson",
     "underhill", "van buren", "white", "xylophone", "young", "zipper");
 
+our @rankedTuples = (
+	"1,21,5,7,1,1,0,8,8","2,26,2,3,2,5,1,9,10","3,30,24,21,2,3,1,3,10","4,6,10,8,3,4,1,7,2",
+	"5,8,28,25,3,2,1,0,2","6,28,11,12,4,6,2,7,10","7,9,26,22,5,7,3,2,3","8,5,6,5,6,8,3,8,1",
+	"9,29,16,15,7,9,4,6,10","10,18,12,10,8,11,5,7,6","11,14,17,14,9,10,5,6,5","12,6,12,8,10,11,5,7,2",
+	"13,2,17,13,11,10,5,6,0","14,26,3,3,12,14,6,9,10","15,15,20,18,13,13,6,4,5","16,3,29,24,14,12,6,0,0",
+	"17,23,21,19,15,16,7,4,8","18,19,19,16,16,17,7,5,6","19,20,30,26,16,15,7,0,6","20,12,21,17,17,16,7,4,4",
+	"21,4,1,1,18,19,7,10,1","22,1,7,4,19,18,7,8,0","23,24,14,11,20,21,8,7,9","24,16,25,20,21,20,8,3,5",
+	"25,25,27,23,22,22,9,1,9","26,21,8,7,23,25,9,8,8","27,17,4,2,24,26,9,9,6","28,10,8,6,25,25,9,8,4",
+	"29,11,15,9,25,24,9,7,4","30,12,23,17,25,23,9,4,4");
+
 sub randomName()
 {
     return sprintf("%s %s", $firstName[int(rand(26))],
@@ -473,6 +483,23 @@ sub getBulkCopyCmd(){
             my $randf = rand(10);
             printf HDFS "%d:%d:%d:%d:%d:%dL:%.2ff:%.2f\n", $tid, $i, $rand5, $rand100, $rand1000, $rand1000, $randf, $randf;
         }
+    }  elsif ($filetype eq "ranking") {
+        for (my $i = 0; $i < $numRows; $i++) {
+            my $tuple = $rankedTuples[int($i)];
+            printf HDFS "$tuple,";
+            for my $j ( 0 .. 1000000) {
+				printf HDFS "%d",$j;
+			}
+			printf HDFS "\n";
+        }
+    } elsif ($filetype eq "biggish") {
+        for (my $i = 1; $i < $numRows; $i++) {
+            printf HDFS "$i,$i,";
+            for my $j ( 0 .. 1000) {
+				printf HDFS "%d",$j;
+            }
+            printf HDFS "\n";
+        }
     } else {
         warn "Unknown filetype $filetype\n";
         usage();

Modified: pig/trunk/test/org/apache/pig/parser/TestLexer.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLexer.pig?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLexer.pig (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLexer.pig Thu Sep 13 14:55:36 2012
@@ -73,7 +73,13 @@ A = LOAD 'data' AS (f1:int,f2:int,f3:int
 
 X = SAMPLE A 0.01;
 
+R = rank A by f1;
 
+R = rank A by f1 ASC, f2 DESC, f3;
+
+R = rank A by *;
+
+R = rank A by * DESC DENSE;
 
 
 

Modified: pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java Thu Sep 13 14:55:36 2012
@@ -411,4 +411,61 @@ public class TestLogicalPlanGenerator {
         generateLogicalPlan( query );
     }
 
+    @Test
+    public void testRank01() {
+        String query = "A = LOAD 'data4' AS (name:chararray,surname:chararray,sales:double,code:int);"
+            + "B = rank A by sales;" + "store B into 'rank01_test';";
+        generateLogicalPlan(query);
+    }
+
+    @Test
+    public void testRank02() {
+        String query = "A = LOAD 'data4' AS (name:chararray,surname:chararray,sales:double,code:int);"
+            + "C = rank A by sales DENSE;" + "store C into 'rank02_test';";
+        generateLogicalPlan(query);
+    }
+
+    @Test
+    public void testRank03() {
+        String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+            + "B = rank A;" + "store B into 'rank03_test';";
+        generateLogicalPlan(query);
+    }
+
+    @Test
+    public void testRank04() {
+        String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+            + "C = rank A by postalcode DESC;"
+            + "store C into 'rank04_test';";
+        generateLogicalPlan(query);
+    }
+
+    @Test
+    public void testRank05() {
+        String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+            + "D = rank A by postalcode DENSE;"
+            + "store D into 'rank05_test';";
+        generateLogicalPlan(query);
+    }
+
+    @Test
+    public void testRank06() {
+        String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+            + "C = rank A by x..rz;";
+        generateLogicalPlan(query);
+    }
+
+    @Test
+    public void testRank07() {
+        String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+            + "C = rank A by x ASC, y DESC;";
+        generateLogicalPlan(query);
+    }
+
+    @Test
+    public void testRank08() {
+        String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+            + "C = rank A;";
+        generateLogicalPlan(query);
+    }
 }

Modified: pig/trunk/test/org/apache/pig/parser/TestParser.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestParser.pig?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestParser.pig (original)
+++ pig/trunk/test/org/apache/pig/parser/TestParser.pig Thu Sep 13 14:55:36 2012
@@ -90,3 +90,11 @@ H = union onschema A, B;
 
 --stream
 C = stream A through CMD;
+
+
+--rank
+
+R = rank A;
+R = rank A by a;
+R = rank A by a DESC;
+R = rank A by a DESC, b;
\ No newline at end of file

Modified: pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java Thu Sep 13 14:55:36 2012
@@ -28,7 +28,7 @@ import org.antlr.runtime.Token;
 import org.junit.Test;
 
 public class TestQueryLexer {
-    
+
     @Test
     public void TestLexer() throws IOException {
         CharStream input = new QueryParserFileStream( "test/org/apache/pig/parser/TestLexer.pig" );
@@ -45,13 +45,13 @@ public class TestQueryLexer {
                 System.out.print( token.getText() + "(" + token.getType() + ") " );
             }
         }
-        
+
         // While we can check more conditions, such as type of each token, for now I think the following
         // is enough. If the token type is wrong, it will be most likely caught by the parser.
-        Assert.assertEquals( 419, tokenCount );
+        Assert.assertEquals( 455, tokenCount );
         Assert.assertEquals( 0, lexer.getNumberOfSyntaxErrors() );
     }
-    
+
     @Test
     public void test2() throws IOException {
         String query = "A = load 'input' using PigStorage(';');" +

Modified: pig/trunk/test/org/apache/pig/parser/TestQueryParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryParser.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryParser.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryParser.java Thu Sep 13 14:55:36 2012
@@ -26,6 +26,7 @@ import junit.framework.Assert;
 
 import org.antlr.runtime.CharStream;
 import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.MismatchedTokenException;
 import org.antlr.runtime.RecognitionException;
 import org.antlr.runtime.tree.CommonTree;
 import org.antlr.runtime.tree.Tree;
@@ -415,4 +416,61 @@ public class TestQueryParser {
         PigServer pig = new PigServer(ExecType.LOCAL);
         Util.registerMultiLineQuery(pig, query);
     }
+
+
+    //RANK
+    @Test
+    public void testRankPositive1() throws IOException, RecognitionException {
+        shouldPass("B = rank A;");
+    }
+
+    @Test
+    public void testRankPositive2() throws IOException, RecognitionException {
+        shouldPass("B = rank A by x;");
+    }
+
+    @Test
+    public void testRankPositive3() throws IOException, RecognitionException {
+        shouldPass("B = rank A by x DESC;");
+    }
+
+    @Test
+    public void testRankPositive4() throws IOException, RecognitionException {
+        shouldPass("B = rank A by x, y ASC, w DESC, z ASC;");
+    }
+
+    @Test
+    public void testRankPositive5() throws IOException, RecognitionException {
+        String query = "A = load 'data' as (x:int, y:chararray, z:int, rz:chararray);";
+        query += "B = rank A by x..z;";
+        shouldPass(query);
+    }
+
+    @Test
+    public void testRankPositive6() throws IOException, RecognitionException {
+        String query = "A = load 'data' as (x:int, y:chararray, z:int, rz:chararray);";
+        query += "B = rank A by *;";
+        shouldPass(query);
+    }
+
+    @Test
+    public void testRankPositive7() throws IOException, RecognitionException {
+        String query = "A = load 'data' as (x:int, y:chararray, z:int, rz:chararray);";
+        query += "B = rank A by x DESC DENSE;";
+        shouldPass(query);
+    }
+
+    @Test
+    public void testRankPositive8() throws IOException, RecognitionException {
+        String query = "A = load 'data' as (x:int, y:chararray, z:int, rz:chararray);";
+        query += "B = rank A by x DESC,y ASC DENSE;";
+        shouldPass(query);
+    }
+
+    @Test
+    public void testRankPositive9() throws IOException, RecognitionException {
+        String query = "A = load 'data' as (x:int, y:chararray, z:int, rz:chararray);";
+        query += "B = rank A by * DENSE;";
+        shouldPass(query);
+    }
 }

Modified: pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java (original)
+++ pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java Thu Sep 13 14:55:36 2012
@@ -57,6 +57,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -179,12 +180,18 @@ public class OptimizeLimitPlanPrinter ex
         sb.append(";\n");
         appendEdges(loSort);
     }
-    
+
+    @Override
+    public void visit(LORank loRank) throws FrontendException {
+        appendOp(loRank) ;
+        appendEdges(loRank);
+    }
+
     @Override
     public void visit(LODistinct loDistinct) throws FrontendException {
         appendEdges(loDistinct);
     }
-    
+
     @Override
     public void visit(LOLimit loLimit) throws FrontendException {
         appendOp(loLimit) ;