You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by gd...@apache.org on 2012/09/13 16:55:38 UTC
svn commit: r1384352 [3/4] - in /pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java Thu Sep 13 14:55:36 2012
@@ -57,6 +57,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -373,13 +374,21 @@ public class LineageFindRelVisitor exten
visitExpression(expPlan);
}
}
-
-
+
+ @Override
+ public void visit(LORank rank) throws FrontendException{
+ mapToPredLoadFunc(rank);
+ List<LogicalExpressionPlan> expPlans = rank.getRankColPlans();
+ for(LogicalExpressionPlan expPlan : expPlans){
+ visitExpression(expPlan);
+ }
+ }
+
@Override
public void visit(LODistinct relOp) throws FrontendException{
mapToPredLoadFunc(relOp);
}
-
+
@Override
public void visit(LOLimit loLimit) throws FrontendException{
mapToPredLoadFunc(loLimit);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java Thu Sep 13 14:55:36 2012
@@ -42,6 +42,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
@@ -113,11 +114,55 @@ public class ProjectStarExpander extends
);
}
}
-
+
sort.setSortColPlans(newExpPlans);
sort.setAscendingCols(newAscOrder);
}
+ @Override
+ public void visit(LORank rank) throws FrontendException {
+
+ List<LogicalExpressionPlan> expPlans = rank.getRankColPlans();
+ List<Boolean> ascOrder = rank.getAscendingCol();
+
+ List<LogicalExpressionPlan> newExpPlans = new ArrayList<LogicalExpressionPlan>();
+ List<Boolean> newAscOrder = new ArrayList<Boolean>();
+
+ if (expPlans.size() != ascOrder.size()) {
+ throw new AssertionError(
+ "Size of expPlans and ascorder should be same");
+ }
+
+ for (int i = 0; i < expPlans.size(); i++) {
+ // expand the plan
+ LogicalExpressionPlan ithExpPlan = expPlans.get(i);
+ List<LogicalExpressionPlan> expandedPlans = expandPlan(ithExpPlan,
+ 0);
+ newExpPlans.addAll(expandedPlans);
+
+ // add corresponding isAsc flags
+ Boolean isAsc = ascOrder.get(i);
+ for (int j = 0; j < expandedPlans.size(); j++) {
+ newAscOrder.add(isAsc);
+ }
+ }
+
+ // check if there is a project-star-to-end followed by another sort plan
+ // in the expanded plans (can happen if there is no input schema)
+ for (int i = 0; i < newExpPlans.size(); i++) {
+ ProjectExpression proj = getProjectStar(newExpPlans.get(i));
+ if (proj != null && proj.isRangeProject() && proj.getEndCol() == -1
+ && i != newExpPlans.size() - 1) {
+ String msg = "Project-range to end (eg. x..)"
+ + " is supported in rank-by only as last rank column";
+ throw new FrontendException(msg, 1128, PigException.INPUT);
+ }
+ }
+
+ rank.setRankColPlan(newExpPlans);
+ rank.setAscendingCol(newAscOrder);
+ }
+
/**
* Expand plan into multiple plans if the plan contains a project star,
* if there is no project star the returned list contains the plan argument.
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java Thu Sep 13 14:55:36 2012
@@ -36,6 +36,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LONative;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -54,11 +55,11 @@ public class SchemaAliasVisitor extends
/***
* The logic here is to check if we have duplicate alias in each schema
- * @throws FrontendException
+ * @throws FrontendException
*/
protected void validate(LogicalRelationalOperator op) throws FrontendException {
LogicalSchema schema = op.getSchema();
-
+
Set<String> seenAliases = new HashSet<String>();
if( schema != null){
for( int i = 0; i < schema.size(); i++){
@@ -88,22 +89,22 @@ public class SchemaAliasVisitor extends
public void visit(LOFilter filter) throws FrontendException {
validate( filter );
}
-
+
@Override
public void visit(LOJoin join) throws FrontendException {
validate( join );
}
-
+
@Override
public void visit(LOForEach foreach) throws FrontendException {
new SchemaAliasVisitor( foreach.getInnerPlan() ).visit();
}
-
+
@Override
public void visit(LOGenerate gen) throws FrontendException {
validate( gen );
}
-
+
@Override
public void visit(LOInnerLoad load) throws FrontendException {
validate( load );
@@ -113,42 +114,47 @@ public class SchemaAliasVisitor extends
public void visit(LOCogroup group) throws FrontendException {
validate( group );
}
-
+
@Override
public void visit(LOSplit split) throws FrontendException {
validate( split );
}
-
+
@Override
public void visit(LOSplitOutput splitOutput) throws FrontendException {
validate( splitOutput );
}
-
+
@Override
public void visit(LOUnion union) throws FrontendException {
validate( union );
}
-
+
@Override
public void visit(LOSort sort) throws FrontendException {
validate( sort );
}
-
+
+ @Override
+ public void visit(LORank rank) throws FrontendException {
+ validate( rank );
+ }
+
@Override
public void visit(LODistinct distinct) throws FrontendException {
validate( distinct );
}
-
+
@Override
public void visit(LOLimit limit) throws FrontendException {
validate( limit );
}
-
+
@Override
public void visit(LOCross cross) throws FrontendException {
validate( cross );
}
-
+
@Override
public void visit(LOStream stream) throws FrontendException {
validate( stream );
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Thu Sep 13 14:55:36 2012
@@ -46,6 +46,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -547,6 +548,45 @@ public class TypeCheckingRelVisitor exte
}
/***
+ * The schema of rank output will be the same as input, plus a rank field.
+ * @throws FrontendException
+ *
+ */
+ public void visit(LORank rank) throws FrontendException {
+ rank.resetSchema();
+
+ // Type checking internal plans.
+ List<LogicalExpressionPlan> rankColPlans = rank.getRankColPlans();
+
+ for(int i=0;i < rankColPlans.size(); i++) {
+ LogicalExpressionPlan rankColPlan = rankColPlans.get(i) ;
+
+ // Check that the inner plan has only 1 output port
+ if (rankColPlan.getSources().size() != 1) {
+ int errCode = 1057;
+ String msg = "Rank's inner plan can only have one output (leaf)" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throwTypeCheckerException(rank, msg, errCode, PigException.INPUT, null) ;
+ }
+
+ visitExpressionPlan(rankColPlan, rank);
+
+ }
+
+ try {
+ // Compute the schema
+ rank.getSchema() ;
+ }
+ catch (FrontendException fee) {
+ int errCode = 1059;
+ String msg = "Problem while reconciling output schema of Rank" ;
+ msgCollector.collect(msg, MessageType.Error);
+ throwTypeCheckerException(rank, msg, errCode, PigException.INPUT, fee) ;
+ }
+
+ }
+
+ /***
* The schema of split output will be the same as split input
*/
Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Thu Sep 13 14:55:36 2012
@@ -123,6 +123,7 @@ op_clause : define_clause
| limit_clause
| sample_clause
| order_clause
+ | rank_clause
| cross_clause
| join_clause
| union_clause
@@ -374,48 +375,65 @@ col_index
col_range : ^(COL_RANGE col_ref? DOUBLE_PERIOD col_ref?)
;
-pound_proj
+pound_proj
: ^( POUND ( QUOTEDSTRING | NULL ) )
;
-bin_expr
- : ^( BIN_EXPR cond expr expr )
+bin_expr
+ : ^( BIN_EXPR cond expr expr )
;
-limit_clause
+limit_clause
: ^( LIMIT rel ( INTEGER | LONGINTEGER | expr ) )
;
-sample_clause
+sample_clause
: ^( SAMPLE rel ( DOUBLENUMBER | expr ) )
;
-order_clause
+rank_clause
+ : ^( RANK rel ( rank_by_statement )? )
+;
+
+rank_by_statement
+ : ^( BY rank_by_clause ( DENSE )? )
+;
+
+rank_by_clause
+ : STAR ( ASC | DESC )?
+ | rank_col+
+;
+
+rank_col
+ : ( col_range | col_ref ) ( ASC | DESC )?
+;
+
+order_clause
: ^( ORDER rel order_by_clause func_clause? )
;
-order_by_clause
+order_by_clause
: STAR ( ASC | DESC )?
| order_col+
;
-order_col
- : (col_range | col_ref) ( ASC | DESC )?
+order_col
+ : (col_range | col_ref) ( ASC | DESC )?
;
-distinct_clause
+distinct_clause
: ^( DISTINCT rel partition_clause? )
;
-partition_clause
- : ^( PARTITION func_name )
+partition_clause
+ : ^( PARTITION func_name )
;
-cross_clause
- : ^( CROSS rel_list partition_clause? )
+cross_clause
+ : ^( CROSS rel_list partition_clause? )
;
-rel_list
+rel_list
: rel+
;
@@ -601,6 +619,7 @@ eid : rel_str_op
| ROLLUP
| MATCHES
| ORDER
+ | RANK
| DISTINCT
| COGROUP
| JOIN
Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Thu Sep 13 14:55:36 2012
@@ -90,6 +90,7 @@ op_clause : define_clause
| limit_clause
| sample_clause
| order_clause
+ | rank_clause
| cross_clause
| join_clause
| union_clause
@@ -346,25 +347,42 @@ col_index
col_range : ^(COL_RANGE col_ref? { sb.append(".."); } DOUBLE_PERIOD col_ref?)
;
-pound_proj
+pound_proj
: ^( POUND { sb.append($POUND.text); }
( QUOTEDSTRING { sb.append($QUOTEDSTRING.text); } | NULL { sb.append($NULL.text); } ) )
;
-bin_expr
- : ^( BIN_EXPR { sb.append(" ("); } cond { sb.append(" ? "); } expr { sb.append(" : "); } expr { sb.append(") "); } )
+bin_expr
+ : ^( BIN_EXPR { sb.append(" ("); } cond { sb.append(" ? "); } expr { sb.append(" : "); } expr { sb.append(") "); } )
;
-limit_clause
- : ^( LIMIT { sb.append($LIMIT.text).append(" "); } rel
+limit_clause
+ : ^( LIMIT { sb.append($LIMIT.text).append(" "); } rel
( INTEGER { sb.append(" ").append($INTEGER.text); } | LONGINTEGER { sb.append(" ").append($LONGINTEGER.text); } | expr ) )
;
-sample_clause
- : ^( SAMPLE { sb.append($SAMPLE.text).append(" "); } rel ( DOUBLENUMBER { sb.append(" ").append($DOUBLENUMBER.text); } | expr ) )
+sample_clause
+ : ^( SAMPLE { sb.append($SAMPLE.text).append(" "); } rel ( DOUBLENUMBER { sb.append(" ").append($DOUBLENUMBER.text); } | expr ) )
;
-order_clause
+rank_clause
+ : ^( RANK { sb.append($RANK.text).append(" "); } rel ( rank_by_statement )? )
+;
+
+rank_by_statement
+ : ^( BY { sb.append(" ").append($BY.text); } rank_by_clause ( DENSE { sb.append(" ").append($DENSE.text); } )? )
+;
+
+rank_by_clause
+ : STAR { sb.append($STAR.text); } ( ASC { sb.append(" ").append($ASC.text); } | DESC { sb.append(" ").append($DESC.text); } )?
+ | rank_col ( { sb.append(", "); } rank_col )*
+;
+
+rank_col
+ : ( col_range | col_ref ) ( ASC { sb.append(" ").append($ASC.text); } | DESC { sb.append(" ").append($DESC.text); } )?
+;
+
+order_clause
: ^( ORDER { sb.append($ORDER.text).append(" "); } rel
{ sb.append(" BY "); } order_by_clause
( { sb.append(" USING "); } func_clause )? )
@@ -589,6 +607,7 @@ eid : rel_str_op
| ROLLUP { sb.append($ROLLUP.text); }
| MATCHES { sb.append($MATCHES.text); }
| ORDER { sb.append($ORDER.text); }
+ | RANK { sb.append($RANK.text); }
| DISTINCT { sb.append($DISTINCT.text); }
| COGROUP { sb.append($COGROUP.text); }
| JOIN { sb.append($JOIN.text); }
Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Thu Sep 13 14:55:36 2012
@@ -138,6 +138,7 @@ op_clause : define_clause
| limit_clause
| sample_clause
| order_clause
+ | rank_clause
| cross_clause
| join_clause
| union_clause
@@ -398,6 +399,20 @@ limit_clause : ^( LIMIT rel ( INTEGER |
sample_clause : ^( SAMPLE rel ( DOUBLENUMBER | expr ) )
;
+rank_clause : ^( RANK rel ( rank_by_statement )? )
+;
+
+rank_by_statement : ^( BY rank_by_clause ( DENSE )? )
+;
+
+rank_by_clause : STAR ( ASC | DESC )?
+ | rank_col+
+;
+
+rank_col : col_range (ASC | DESC)?
+ | col_ref ( ASC | DESC )?
+;
+
order_clause : ^( ORDER rel order_by_clause func_clause? )
;
@@ -599,6 +614,7 @@ eid : rel_str_op
| ROLLUP
| MATCHES
| ORDER
+ | RANK
| DISTINCT
| COGROUP
| JOIN
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Thu Sep 13 14:55:36 2012
@@ -79,6 +79,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LONative;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -306,7 +307,28 @@ public class LogicalPlanBuilder {
expandAndResetVisitor(loc, sort);
return alias;
}
-
+
+ LORank createRankOp() {
+ return new LORank( plan );
+ }
+
+ String buildRankOp(SourceLocation loc, LORank rank, String alias, String inputAlias, List<LogicalExpressionPlan> plans,
+ List<Boolean> ascFlags) throws ParserValidationException {
+
+ //Rank
+ rank.setRankColPlan(plans);
+ if (ascFlags.isEmpty()) {
+ for (int i=0;i<plans.size();i++)
+ ascFlags.add(true);
+ }
+ rank.setAscendingCol(ascFlags);
+
+ buildOp( loc, rank, alias, inputAlias, null );
+ expandAndResetVisitor(loc, rank);
+
+ return alias;
+ }
+
LOJoin createJoinOp() {
return new LOJoin( plan );
}
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu Sep 13 14:55:36 2012
@@ -85,6 +85,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
@@ -213,6 +214,7 @@ op_clause returns[String alias] :
| limit_clause { $alias = $limit_clause.alias; }
| sample_clause { $alias = $sample_clause.alias; }
| order_clause { $alias = $order_clause.alias; }
+ | rank_clause { $alias = $rank_clause.alias; }
| cross_clause { $alias = $cross_clause.alias; }
| join_clause { $alias = $join_clause.alias; }
| union_clause { $alias = $union_clause.alias; }
@@ -1052,6 +1054,76 @@ scope GScope;
) )
;
+rank_clause returns[String alias]
+scope {
+ LORank rankOp;
+}
+scope GScope;
+@init {
+ $GScope::currentOp = builder.createRankOp();
+}
+@after {
+}
+ : ^( RANK rel rank_by_statement? )
+ {
+ SourceLocation loc = new SourceLocation( (PigParserNode) $rank_clause.start );
+
+ List<LogicalExpressionPlan> tempPlans = $rank_by_statement.plans;
+ List<Boolean> tempAscFlags = $rank_by_statement.ascFlags;
+
+ if(tempPlans == null && tempAscFlags == null) {
+ tempPlans = new ArrayList<LogicalExpressionPlan>();
+ tempAscFlags = new ArrayList<Boolean>();
+
+ ((LORank)$GScope::currentOp).setIsRowNumber( true );
+ }
+
+ ((LORank)$GScope::currentOp).setIsDenseRank( $rank_by_statement.isDenseRank != null?$rank_by_statement.isDenseRank:false );
+
+ $alias = builder.buildRankOp( loc, (LORank)$GScope::currentOp, $statement::alias, $statement::inputAlias, tempPlans, tempAscFlags );
+ }
+;
+
+rank_by_statement returns[List<LogicalExpressionPlan> plans, List<Boolean> ascFlags, Boolean isDenseRank]
+@init {
+ $plans = new ArrayList<LogicalExpressionPlan>();
+ $ascFlags = new ArrayList<Boolean>();
+ $isDenseRank = false;
+}
+ : ^( BY rank_by_clause ( DENSE { $isDenseRank = true; } )? )
+ {
+ $plans = $rank_by_clause.plans;
+ $ascFlags = $rank_by_clause.ascFlags;
+ }
+;
+
+rank_by_clause returns[List<LogicalExpressionPlan> plans, List<Boolean> ascFlags]
+@init {
+ $plans = new ArrayList<LogicalExpressionPlan>();
+ $ascFlags = new ArrayList<Boolean>();
+}
+ : STAR {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan();
+ builder.buildProjectExpr( new SourceLocation( (PigParserNode)$STAR ), plan, $GScope::currentOp, $statement::inputIndex, null, -1 );
+ $plans.add( plan );
+ }
+ ( ASC { $ascFlags.add( true ); } | DESC { $ascFlags.add( false ); } )?
+ | ( rank_col
+ {
+ $plans.add( $rank_col.plan );
+ $ascFlags.add( $rank_col.ascFlag );
+ } )+
+;
+
+rank_col returns[LogicalExpressionPlan plan, Boolean ascFlag]
+@init {
+ $plan = new LogicalExpressionPlan();
+ $ascFlag = true;
+}
+ : col_range[$plan] (ASC | DESC { $ascFlag = false; } )?
+ | col_ref[$plan] ( ASC | DESC { $ascFlag = false; } )?
+;
+
order_clause returns[String alias]
scope GScope;
@init {
Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Thu Sep 13 14:55:36 2012
@@ -81,6 +81,12 @@ FOREACH : 'FOREACH'
ORDER : 'ORDER'
;
+RANK : 'RANK'
+;
+
+DENSE : 'DENSE'
+;
+
CUBE : 'CUBE'
;
Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Thu Sep 13 14:55:36 2012
@@ -222,6 +222,7 @@ op_clause : define_clause
| limit_clause
| sample_clause
| order_clause
+ | rank_clause
| cross_clause
| join_clause
| union_clause
@@ -501,6 +502,24 @@ limit_clause : LIMIT^ rel ( (INTEGER SEM
sample_clause : SAMPLE^ rel ( (DOUBLENUMBER SEMI_COLON) => DOUBLENUMBER | expr )
;
+rank_clause : RANK^ rel ( rank_by_statement )?
+;
+
+rank_by_statement : BY^ rank_by_clause ( DENSE )?
+;
+
+rank_by_clause : STAR ( ASC | DESC )?
+ | rank_list
+;
+
+rank_list : rank_col ( COMMA rank_col )*
+ -> rank_col+
+;
+
+rank_col : col_range ( ASC | DESC )?
+ | col_ref ( ASC | DESC )?
+;
+
order_clause : ORDER^ rel BY! order_by_clause ( USING! func_clause )?
;
Modified: pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java (original)
+++ pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java Thu Sep 13 14:55:36 2012
@@ -27,7 +27,9 @@ import java.util.List;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -93,13 +95,13 @@ public class IllustratorAttacher extends
LineageTracer lineage;
HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap;
-
+
private HashMap<PhysicalOperator, DataBag> poToDataMap;
private int maxRecords;
private boolean revisit = false;
private ArrayList<Boolean[]> subExpResults = null;
private final Map<POLoad, LogicalSchema> poloadToSchemaMap;
-
+
public IllustratorAttacher(PhysicalPlan plan, LineageTracer lineage, int maxRecords,
Map<POLoad, LogicalSchema> poLoadToSchemaMap, PigContext hadoopPigContext) throws VisitorException {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
@@ -124,7 +126,7 @@ public class IllustratorAttacher extends
mPlan = oriPlan;
popWalker();
}
-
+
private void setIllustrator(PhysicalOperator po, int nEqClasses) {
if (revisit && po.getIllustrator() != null)
return;
@@ -139,7 +141,7 @@ public class IllustratorAttacher extends
po.setIllustrator(illustrator);
poToDataMap.put(po, illustrator.getData());
}
-
+
private void setIllustrator(PhysicalOperator po, LinkedList<IdentityHashSet<Tuple>> eqClasses) {
if (revisit && po.getIllustrator() != null)
return;
@@ -161,20 +163,20 @@ public class IllustratorAttacher extends
poToEqclassesMap.put(po, eqClasses);
poToDataMap.put(po, illustrator.getData());
}
-
+
public Map<PhysicalOperator, DataBag> getDataMap() {
return poToDataMap;
}
-
+
@Override
public void visitLoad(POLoad ld) throws VisitorException{
- // LOAD from temporary files need no illustrator
+ // LOAD from temporary files need no illustrator
if (revisit)
return;
-
+
LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
poToEqclassesMap.put(ld, eqClasses);
-
+
IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
eqClasses.add(eqClass);
Illustrator illustrator;
@@ -182,12 +184,12 @@ public class IllustratorAttacher extends
ld.setIllustrator(illustrator);
poToDataMap.put(ld, illustrator.getData());
}
-
+
@Override
public void visitStore(POStore st) throws VisitorException{
setIllustrator(st, 1);
}
-
+
@Override
public void visitFilter(POFilter fl) throws VisitorException{
setIllustrator(fl, 0);
@@ -195,13 +197,13 @@ public class IllustratorAttacher extends
innerPlanAttach(fl, fl.getPlan());
subExpResults = null;
}
-
+
@Override
public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
super.visitLocalRearrange(lr);
setIllustrator(lr);
}
-
+
@Override
public void visitPackage(POPackage pkg) throws VisitorException{
if (!(pkg instanceof POPackageLite) && pkg.isDistinct())
@@ -209,17 +211,17 @@ public class IllustratorAttacher extends
else
setIllustrator(pkg, null);
}
-
+
@Override
public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
setIllustrator(pkg);
}
-
+
@Override
public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
setIllustrator(pkg);
}
-
+
@Override
public void visitPOForEach(POForEach nfe) throws VisitorException {
if (revisit && nfe.getIllustrator() != null)
@@ -239,14 +241,14 @@ public class IllustratorAttacher extends
} else
setIllustrator(nfe, 1);
}
-
+
@Override
public void visitUnion(POUnion un) throws VisitorException{
if (revisit && un.getIllustrator() != null)
return;
setIllustrator(un, null);
}
-
+
@Override
public void visitSplit(POSplit spl) throws VisitorException{
if (revisit && spl.getIllustrator() != null)
@@ -265,63 +267,73 @@ public class IllustratorAttacher extends
innerPlanAttach(demux, innerPlan);
setIllustrator(demux);
}
-
+
@Override
public void visitDistinct(PODistinct distinct) throws VisitorException {
setIllustrator(distinct, 1);
}
-
+
@Override
public void visitSort(POSort sort) throws VisitorException {
setIllustrator(sort, 1);
}
-
+
+ @Override
+ public void visitRank(PORank rank) throws VisitorException {
+ setIllustrator(rank, 3);
+ }
+
+ @Override
+ public void visitCounter(POCounter counter) throws VisitorException {
+ setIllustrator(counter, 1);
+ }
+
@Override
public void visitProject(POProject proj) throws VisitorException{
}
-
+
@Override
public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
setIllustrator(grt, 0);
if (!revisit && subExpResults != null)
subExpResults.add(grt.getIllustrator().getSubExpResult());
}
-
+
@Override
public void visitLessThan(LessThanExpr lt) throws VisitorException{
setIllustrator(lt, 0);
if (!revisit && subExpResults != null)
subExpResults.add(lt.getIllustrator().getSubExpResult());
}
-
+
@Override
public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
setIllustrator(gte, 0);
if (!revisit && subExpResults != null)
subExpResults.add(gte.getIllustrator().getSubExpResult());
}
-
+
@Override
public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
setIllustrator(lte, 0);
if (!revisit && subExpResults != null)
subExpResults.add(lte.getIllustrator().getSubExpResult());
}
-
+
@Override
public void visitEqualTo(EqualToExpr eq) throws VisitorException{
setIllustrator(eq, 0);
if (!revisit && subExpResults != null)
subExpResults.add(eq.getIllustrator().getSubExpResult());
}
-
+
@Override
public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
setIllustrator(eq, 0);
if (!revisit && subExpResults != null)
subExpResults.add(eq.getIllustrator().getSubExpResult());
}
-
+
@Override
public void visitRegexp(PORegexp re) throws VisitorException{
setIllustrator(re, 0);
@@ -335,12 +347,12 @@ public class IllustratorAttacher extends
if (!revisit && subExpResults != null)
subExpResults.add(isNull.getIllustrator().getSubExpResult());
}
-
+
@Override
public void visitAnd(POAnd and) throws VisitorException {
setIllustrator(and, 0);
}
-
+
@Override
public void visitOr(POOr or) throws VisitorException {
setIllustrator(or, 0);
@@ -362,11 +374,11 @@ public class IllustratorAttacher extends
public void visitNegative(PONegative negative) {
setIllustrator(negative, 1);
}
-
+
@Override
public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
}
-
+
@Override
public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException {
// one each for >, ==, and <
@@ -377,7 +389,7 @@ public class IllustratorAttacher extends
public void visitMapLookUp(POMapLookUp mapLookUp) {
setIllustrator(mapLookUp, 1);
}
-
+
@Override
public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
if (revisit && joinPackage.getIllustrator() != null)
@@ -389,12 +401,12 @@ public class IllustratorAttacher extends
@Override
public void visitCast(POCast cast) {
}
-
+
@Override
public void visitLimit(POLimit lim) throws VisitorException {
setIllustrator(lim, 1);
}
-
+
@Override
public void visitStream(POStream stream) throws VisitorException {
setIllustrator(stream, 1);
@@ -407,7 +419,7 @@ public class IllustratorAttacher extends
public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
visitPOForEach(optimizedForEach);
}
-
+
private void innerPlanAttach(PhysicalOperator po, PhysicalPlan plan) throws VisitorException {
PlanWalker<PhysicalOperator, PhysicalPlan> childWalker =
mCurrentWalker.spawnChildWalker(plan);
Modified: pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original)
+++ pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java Thu Sep 13 14:55:36 2012
@@ -40,6 +40,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduceCounter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -184,13 +185,19 @@ public class LocalMapReduceSimulator {
split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
++index;
Mapper<Text, Tuple, PigNullableWritable, Writable> map;
-
+
if (mro.reducePlan.isEmpty()) {
// map-only
map = new PigMapOnly.Map();
- ((PigMapBase) map).setMapPlan(mro.mapPlan);
Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapOnly.Map) map)
.getIllustratorContext(jobConf, input, intermediateData, split);
+ if(mro.isCounterOperation()) {
+ if(mro.isRowNumber()) {
+ map = new PigMapReduceCounter.PigMapCounter();
+ }
+ context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split);
+ }
+ ((PigMapBase) map).setMapPlan(mro.mapPlan);
map.run(context);
} else {
if ("true".equals(jobConf.get("pig.usercomparator")))
@@ -216,9 +223,15 @@ public class LocalMapReduceSimulator {
reduce = new PigMapReduce.ReduceWithComparator();
else
reduce = new PigMapReduce.Reduce();
- reduce.setReducePlan(mro.reducePlan);
Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable>.Context
context = reduce.getIllustratorContext(job, intermediateData, (POPackage) pack);
+
+ if(mro.isCounterOperation()) {
+ reduce = new PigMapReduceCounter.PigReduceCounter();
+ context = ((PigMapReduceCounter.PigReduceCounter)reduce).getIllustratorContext(job, intermediateData, (POPackage) pack);
+ }
+
+ ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan);
reduce.run(context);
}
for (PhysicalOperator key : mro.phyToMRMap.keySet())
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Sep 13 14:55:36 2012
@@ -61,6 +61,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -82,6 +84,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LONative;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOStream;
@@ -93,14 +96,14 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
/**
- * ScriptStates encapsulates settings for a Pig script that runs on a hadoop
- * cluster. These settings are added to all MR jobs spawned by the script and
- * in turn are persisted in the hadoop job xml. With the properties already in
+ * ScriptStates encapsulates settings for a Pig script that runs on a hadoop
+ * cluster. These settings are added to all MR jobs spawned by the script and
+ * in turn are persisted in the hadoop job xml. With the properties already in
* the job xml, users who want to know the relations between the script and MR
- * jobs can derive them from the job xmls.
+ * jobs can derive them from the job xmls.
*/
public class ScriptState {
-
+
/**
* Keys of Pig settings added in MR job
*/
@@ -118,17 +121,17 @@ public class ScriptState {
SCRIPT_FEATURES ("pig.script.features"),
JOB_ALIAS ("pig.alias"),
JOB_ALIAS_LOCATION ("pig.alias.location");
-
+
private String displayStr;
-
+
private PIG_PROPERTY(String s) {
displayStr = s;
}
-
+
@Override
public String toString() { return displayStr; }
};
-
+
/**
* Features used in a Pig script
*/
@@ -144,6 +147,7 @@ public class ScriptState {
COGROUP,
GROUP_BY,
ORDER_BY,
+ RANK,
DISTINCT,
STREAMING,
SAMPLER,
@@ -158,42 +162,42 @@ public class ScriptState {
NATIVE,
MAP_PARTIALAGG;
};
-
+
/**
* Pig property that allows user to turn off the inclusion of settings
- * in the jobs
+ * in the jobs
*/
public static final String INSERT_ENABLED = "pig.script.info.enabled";
-
+
/**
- * Restricts the size of Pig script stored in job xml
+ * Restricts the size of Pig script stored in job xml
*/
- public static final int MAX_SCRIPT_SIZE = 10240;
-
+ public static final int MAX_SCRIPT_SIZE = 10240;
+
private static final Log LOG = LogFactory.getLog(ScriptState.class);
private static ThreadLocal<ScriptState> tss = new ThreadLocal<ScriptState>();
-
+
private String id;
-
+
private String script;
private String commandLine;
private String fileName;
-
+
private String pigVersion;
private String hodoopVersion;
-
+
private long scriptFeatures;
-
+
private PigContext pigContext;
-
+
private Map<MapReduceOper, String> featureMap = null;
private Map<MapReduceOper, String> aliasMap = new HashMap<MapReduceOper, String>();
private Map<MapReduceOper, String> aliasLocationMap = new HashMap<MapReduceOper, String>();
-
+
private List<PigProgressNotificationListener> listeners
= new ArrayList<PigProgressNotificationListener>();
-
+
public static ScriptState start(String commandLine, PigContext pigContext) {
ScriptState ss = new ScriptState(UUID.randomUUID().toString());
ss.setCommandLine(commandLine);
@@ -201,10 +205,10 @@ public class ScriptState {
tss.set(ss);
return ss;
}
-
+
private ScriptState(String id) {
this.id = id;
- this.script = "";
+ this.script = "";
}
public static ScriptState get() {
@@ -212,16 +216,16 @@ public class ScriptState {
ScriptState.start("", null);
}
return tss.get();
- }
-
+ }
+
public void registerListener(PigProgressNotificationListener listener) {
listeners.add(listener);
}
-
+
public List<PigProgressNotificationListener> getAllListeners() {
return listeners;
}
-
+
public void emitInitialPlanNotification(MROperPlan plan) {
for (PigProgressNotificationListener listener: listeners) {
try {
@@ -327,11 +331,11 @@ public class ScriptState {
}
setPigFeature(mro, conf);
-
+
setJobParents(mro, conf);
}
-
- public void setScript(File file) {
+
+ public void setScript(File file) {
try {
setScript(new BufferedReader(new FileReader(file)));
} catch (FileNotFoundException e) {
@@ -339,19 +343,19 @@ public class ScriptState {
}
}
- public void setScript(String script) {
+ public void setScript(String script) {
if (script == null) return;
-
+
// restrict the size of the script to be stored in job conf
script = (script.length() > MAX_SCRIPT_SIZE) ? script.substring(0,
MAX_SCRIPT_SIZE) : script;
-
+
// XML parser cann't handle certain characters, including
// the control character (). Use Base64 encoding to
- // get around this problem
+ // get around this problem
this.script = new String(Base64.encodeBase64(script.getBytes()));
}
-
+
public void setScriptFeatures(LogicalPlan plan) {
BitSet bs = new BitSet();
try {
@@ -359,59 +363,59 @@ public class ScriptState {
} catch (FrontendException e) {
LOG.warn("unable to get script feature", e);
}
- scriptFeatures = bitSetToLong(bs);
-
+ scriptFeatures = bitSetToLong(bs);
+
LOG.info("Pig features used in the script: "
+ featureLongToString(scriptFeatures));
}
-
+
public String getHadoopVersion() {
if (hodoopVersion == null) {
hodoopVersion = VersionInfo.getVersion();
}
return (hodoopVersion == null) ? "" : hodoopVersion;
}
-
+
public String getPigVersion() {
if (pigVersion == null) {
String findContainingJar = JarManager.findContainingJar(ScriptState.class);
if (findContainingJar != null) {
- try {
- JarFile jar = new JarFile(findContainingJar);
- final Manifest manifest = jar.getManifest();
- final Map <String,Attributes> attrs = manifest.getEntries();
+ try {
+ JarFile jar = new JarFile(findContainingJar);
+ final Manifest manifest = jar.getManifest();
+ final Map <String,Attributes> attrs = manifest.getEntries();
Attributes attr = attrs.get("org/apache/pig");
pigVersion = attr.getValue("Implementation-Version");
- } catch (Exception e) {
- LOG.warn("unable to read pigs manifest file");
- }
+ } catch (Exception e) {
+ LOG.warn("unable to read pigs manifest file");
+ }
} else {
LOG.warn("unable to read pigs manifest file. Not running from the Pig jar");
}
}
return (pigVersion == null) ? "" : pigVersion;
}
-
+
public String getFileName() { return fileName; }
-
+
public void setFileName(String fileName) {
this.fileName = fileName;
}
-
+
String getId() { return id; }
-
+
private String getCommandLine() {
return (commandLine == null) ? "" : commandLine;
}
-
+
private void setCommandLine(String commandLine) {
this.commandLine = commandLine;
}
-
+
private String getScript() {
return (script == null) ? "" : script;
}
-
+
private void setScript(BufferedReader reader) {
StringBuilder sb = new StringBuilder();
try {
@@ -427,21 +431,21 @@ public class ScriptState {
}
setScript(sb.toString());
}
-
+
private void setPigFeature(MapReduceOper mro, Configuration conf) {
conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), getPigFeature(mro));
if (scriptFeatures != 0) {
- conf.set(PIG_PROPERTY.SCRIPT_FEATURES.toString(),
+ conf.set(PIG_PROPERTY.SCRIPT_FEATURES.toString(),
String.valueOf(scriptFeatures));
}
- conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), getAlias(mro));
+ conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), getAlias(mro));
conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), getAliasLocation(mro));
}
-
+
private void setJobParents(MapReduceOper mro, Configuration conf) {
// PigStats maintains a job DAG with the job id being updated
- // upon available. Therefore, before a job is submitted, the ids
- // of its parent jobs are already available.
+ // upon available. Therefore, before a job is submitted, the ids
+ // of its parent jobs are already available.
JobGraph jg = PigStats.get().getJobGraph();
JobStats js = null;
Iterator<JobStats> iter = jg.iterator();
@@ -465,11 +469,11 @@ public class ScriptState {
}
}
}
-
+
String getScriptFeatures() {
return featureLongToString(scriptFeatures);
}
-
+
public String getAlias(MapReduceOper mro) {
if (!aliasMap.containsKey(mro)) {
setAlias(mro);
@@ -514,32 +518,32 @@ public class ScriptState {
if (featureMap == null) {
featureMap = new HashMap<MapReduceOper, String>();
}
-
+
String retStr = featureMap.get(mro);
- if (retStr == null) {
+ if (retStr == null) {
BitSet feature = new BitSet();
feature.clear();
if (mro.isSkewedJoin()) {
feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
- }
+ }
if (mro.isGlobalSort()) {
feature.set(PIG_FEATURE.ORDER_BY.ordinal());
- }
- if (mro.isSampler()) {
+ }
+ if (mro.isSampler()) {
feature.set(PIG_FEATURE.SAMPLER.ordinal());
- }
- if (mro.isIndexer()) {
+ }
+ if (mro.isIndexer()) {
feature.set(PIG_FEATURE.INDEXER.ordinal());
}
if (mro.isCogroup()) {
feature.set(PIG_FEATURE.COGROUP.ordinal());
- }
+ }
if (mro.isGroupBy()) {
feature.set(PIG_FEATURE.GROUP_BY.ordinal());
- }
+ }
if (mro.isRegularJoin()) {
feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
- }
+ }
if (mro.needsDistinctCombiner()) {
feature.set(PIG_FEATURE.DISTINCT.ordinal());
}
@@ -570,8 +574,8 @@ public class ScriptState {
featureMap.put(mro, retStr);
}
return retStr;
- }
-
+ }
+
private long bitSetToLong(BitSet bs) {
long ret = 0;
for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
@@ -579,10 +583,10 @@ public class ScriptState {
}
return ret;
}
-
+
private String featureLongToString(long l) {
if (l == 0) return PIG_FEATURE.UNKNOWN.name();
-
+
StringBuilder sb = new StringBuilder();
for (int i=0; i<PIG_FEATURE.values().length; i++) {
if (((l >> i) & 0x00000001) != 0) {
@@ -592,7 +596,7 @@ public class ScriptState {
}
return sb.toString();
}
-
+
public void setPigContext(PigContext pigContext) {
this.pigContext = pigContext;
}
@@ -603,18 +607,18 @@ public class ScriptState {
private static class FeatureVisitor extends PhyPlanVisitor {
private BitSet feature;
-
+
public FeatureVisitor(PhysicalPlan plan, BitSet feature) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
plan));
this.feature = feature;
}
-
+
@Override
public void visitFRJoin(POFRJoin join) throws VisitorException {
feature.set(PIG_FEATURE.REPLICATED_JOIN.ordinal());
}
-
+
@Override
public void visitMergeJoin(POMergeJoin join) throws VisitorException {
if (join.getJoinType()==LOJoin.JOINTYPE.MERGESPARSE)
@@ -622,55 +626,55 @@ public class ScriptState {
else
feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
}
-
+
@Override
public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
throws VisitorException {
feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());;
}
-
+
@Override
public void visitCollectedGroup(POCollectedGroup mg)
- throws VisitorException {
+ throws VisitorException {
feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
}
-
+
@Override
public void visitDistinct(PODistinct distinct) throws VisitorException {
feature.set(PIG_FEATURE.DISTINCT.ordinal());
}
-
+
@Override
public void visitStream(POStream stream) throws VisitorException {
feature.set(PIG_FEATURE.STREAMING.ordinal());
}
-
+
@Override
public void visitSplit(POSplit split) throws VisitorException {
feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
}
-
+
@Override
public void visitDemux(PODemux demux) throws VisitorException {
- feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
+ feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
}
-
+
@Override
public void visitPartialAgg(POPartialAgg partAgg){
feature.set(PIG_FEATURE.MAP_PARTIALAGG.ordinal());
}
-
- }
-
+
+ }
+
static class LogicalPlanFeatureVisitor extends LogicalRelationalNodesVisitor {
-
+
private BitSet feature;
-
+
protected LogicalPlanFeatureVisitor(LogicalPlan plan, BitSet feature) throws FrontendException {
- super(plan, new org.apache.pig.newplan.DepthFirstWalker(plan));
+ super(plan, new org.apache.pig.newplan.DepthFirstWalker(plan));
this.feature = feature;
}
-
+
@Override
public void visit(LOCogroup op) {
if (op.getGroupType() == GROUPTYPE.COLLECTED) {
@@ -685,27 +689,27 @@ public class ScriptState {
}
}
}
-
+
@Override
public void visit(LOCross op) {
feature.set(PIG_FEATURE.CROSS.ordinal());
}
-
+
@Override
public void visit(LODistinct op) {
feature.set(PIG_FEATURE.DISTINCT.ordinal());
}
-
+
@Override
public void visit(LOFilter op) {
feature.set(PIG_FEATURE.FILTER.ordinal());
}
-
+
@Override
public void visit(LOForEach op) {
-
+
}
-
+
@Override
public void visit(LOJoin op) {
if (op.getJoinType() == JOINTYPE.HASH) {
@@ -720,45 +724,49 @@ public class ScriptState {
feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
}
}
-
+
@Override
public void visit(LOLimit op) {
feature.set(PIG_FEATURE.LIMIT.ordinal());
}
-
+
@Override
+ public void visit(LORank op) {
+ feature.set(PIG_FEATURE.RANK.ordinal());
+ }
+
public void visit(LOSort op) {
feature.set(PIG_FEATURE.ORDER_BY.ordinal());
}
-
+
@Override
public void visit(LOStream op) {
feature.set(PIG_FEATURE.STREAMING.ordinal());
}
-
+
@Override
public void visit(LOSplit op) {
-
+
}
-
+
@Override
public void visit(LOUnion op) {
feature.set(PIG_FEATURE.UNION.ordinal());
}
-
+
@Override
public void visit(LONative n) {
feature.set(PIG_FEATURE.NATIVE.ordinal());
}
}
-
+
private static class AliasVisitor extends PhyPlanVisitor {
-
+
private HashSet<String> aliasSet;
-
+
private List<String> alias;
-
+
private final List<String> aliasLocation;
public AliasVisitor(PhysicalPlan plan, List<String> alias, List<String> aliasLocation) {
@@ -771,69 +779,69 @@ public class ScriptState {
for (String s : alias) aliasSet.add(s);
}
}
-
+
@Override
public void visitLoad(POLoad load) throws VisitorException {
setAlias(load);
super.visitLoad(load);
}
-
+
@Override
public void visitFRJoin(POFRJoin join) throws VisitorException {
setAlias(join);
super.visitFRJoin(join);
}
-
+
@Override
public void visitMergeJoin(POMergeJoin join) throws VisitorException {
setAlias(join);
super.visitMergeJoin(join);
}
-
+
@Override
public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
throws VisitorException {
setAlias(mergeCoGrp);
super.visitMergeCoGroup(mergeCoGrp);
}
-
+
@Override
public void visitCollectedGroup(POCollectedGroup mg)
- throws VisitorException {
+ throws VisitorException {
setAlias(mg);
super.visitCollectedGroup(mg);
}
-
+
@Override
public void visitDistinct(PODistinct distinct) throws VisitorException {
setAlias(distinct);
super.visitDistinct(distinct);
}
-
+
@Override
public void visitStream(POStream stream) throws VisitorException {
setAlias(stream);
super.visitStream(stream);
}
-
+
@Override
public void visitFilter(POFilter fl) throws VisitorException {
setAlias(fl);
super.visitFilter(fl);
}
-
+
@Override
public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException {
setAlias(lr);
super.visitLocalRearrange(lr);
}
-
+
@Override
public void visitPOForEach(POForEach nfe) throws VisitorException {
setAlias(nfe);
super.visitPOForEach(nfe);
}
-
+
@Override
public void visitUnion(POUnion un) throws VisitorException {
setAlias(un);
@@ -845,19 +853,19 @@ public class ScriptState {
setAlias(sort);
super.visitSort(sort);
}
-
+
@Override
public void visitLimit(POLimit lim) throws VisitorException {
setAlias(lim);
super.visitLimit(lim);
}
-
+
@Override
public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
setAlias(sk);
super.visitSkewedJoin(sk);
}
-
+
private void setAlias(PhysicalOperator op) {
String s = op.getAlias();
if (s != null) {
@@ -872,5 +880,5 @@ public class ScriptState {
}
}
}
-
+
}
Modified: pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm (original)
+++ pig/trunk/test/e2e/pig/deployers/ExistingClusterDeployer.pm Thu Sep 13 14:55:36 2012
@@ -228,6 +228,16 @@ sub generateData
'filetype' => "numbers",
'rows' => 5000,
'hdfs' => "types/numbers.txt",
+ }, {
+ 'name' => "biggish",
+ 'filetype' => "biggish",
+ 'rows' => 1000000,
+ 'hdfs' => "singlefile/biggish",
+ }, {
+ 'name' => "prerank",
+ 'filetype' => "ranking",
+ 'rows' => 30,
+ 'hdfs' => "singlefile/prerank",
}
);
Modified: pig/trunk/test/e2e/pig/deployers/LocalDeployer.pm
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/deployers/LocalDeployer.pm?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/deployers/LocalDeployer.pm (original)
+++ pig/trunk/test/e2e/pig/deployers/LocalDeployer.pm Thu Sep 13 14:55:36 2012
@@ -204,6 +204,16 @@ sub generateData
'filetype' => "numbers",
'rows' => 5000,
'outfile' => "types/numbers.txt",
+ }, {
+ 'name' => "biggish",
+ 'filetype' => "biggish",
+ 'rows' => 1000000,
+ 'outfile' => "singlefile/biggish",
+ }, {
+ 'name' => "prerank",
+ 'filetype' => "ranking",
+ 'rows' => 30,
+ 'outfile' => "singlefile/prerank",
}
);
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Thu Sep 13 14:55:36 2012
@@ -4974,6 +4974,173 @@ store a into ':OUTPATH:';\,
store F into ':OUTPATH:';\,
}
]
+ },
+ {
+ 'name' => 'Rank',
+ 'tests' => [
+ {
+ 'num' => 1,
+ 'pig' => q\
+ SET default_parallel 9;
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ B = rank A by a ASC,b ASC DENSE;
+ C = foreach B generate rank_A,a,b;
+ store C into ':OUTPATH:';
+ \,
+ 'verify_pig_script' => q\
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ C = foreach A generate rankaaba,a,b;
+ store C into ':OUTPATH:';
+ \,
+ }, {
+ 'num' => 2,
+ 'pig' => q\
+ SET default_parallel 9;
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ B = rank A by a ASC,c DESC DENSE;
+ C = foreach B generate rank_A,a,c;
+ store C into ':OUTPATH:';
+ \,
+ 'verify_pig_script' => q\
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ C = foreach A generate rankaacd,a,c;
+ store C into ':OUTPATH:';
+ \,
+ }, {
+ 'num' => 3,
+ 'pig' => q\
+ SET default_parallel 7;
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ B = rank A by b DESC,c ASC DENSE;
+ C = foreach B generate rank_A,b,c;
+ store C into ':OUTPATH:';
+ \,
+ 'verify_pig_script' => q\
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ C = foreach A generate rankbdca,b,c;
+ store C into ':OUTPATH:';
+ \,
+ }, {
+ 'num' => 4,
+ 'pig' => q\
+ SET default_parallel 7;
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ B = rank A;
+ C = foreach B generate rank_A,a,b,c;
+ store C into ':OUTPATH:';
+ \,
+ 'verify_pig_script' => q\
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ C = foreach A generate rownumber,a,b,c;
+ store C into ':OUTPATH:';
+ \,
+ }, {
+ 'num' =>5,
+ 'pig' => q\
+ SET default_parallel 9;
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ B = rank A by b DESC,a ASC;
+ C = foreach B generate rank_A,b,a;
+ store C into ':OUTPATH:';
+ \,
+ 'verify_pig_script' => q\
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ C = foreach A generate rankbdaa,b,a;
+ store C into ':OUTPATH:';
+ \,
+ }, {
+ 'num' =>6,
+ 'pig' => q\
+ SET default_parallel 7;
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ B = rank A by c ASC,b DESC;
+ C = foreach B generate rank_A,c,b;
+ store C into ':OUTPATH:';
+ \,
+ 'verify_pig_script' => q\
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ C = foreach A generate rankcabd,c,b;
+ store C into ':OUTPATH:';
+ \,
+ }, {
+ 'num' => 7,
+ 'pig' => q\
+ SET default_parallel 7;
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ B = foreach A generate a,b,c,tail;
+ C = rank B by a ASC,b ASC DENSE;
+ D = rank C by a ASC,c DESC DENSE;
+ E = rank D by b DESC,c ASC DENSE;
+ F = foreach E generate rank_D,rank_C,rank_B,a,b,c;
+ store F into ':OUTPATH:';
+ \,
+ 'verify_pig_script' => q\
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ B = foreach A generate rankbdca,rankaacd,rankaaba,a,b,c;
+ store B into ':OUTPATH:';
+ \,
+ }, {
+ 'num' => 8,
+ 'pig' => q\
+ SET default_parallel 9;
+ SET pig.splitCombination false;
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ B = foreach A generate a,b,c;
+ C = rank B by a ASC,b ASC DENSE;
+ D = rank B by a ASC,c DESC DENSE;
+ F = join C by $0, D by $0;
+ G = foreach F generate C::rank_B, D::rank_B, C::a, C::b, C::c;
+ H = order G by a ASC, b ASC, c DESC;
+ store H into ':OUTPATH:';
+ \,
+ 'verify_pig_script' => q\
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ C = foreach A generate rankaaba,a,b,c;
+ E = order C by a ASC,b ASC;
+ D = foreach A generate rankaacd,a,b,c;
+ F = order D by a ASC,c DESC;
+ G = join E by $0, F by $0;
+ H = foreach G generate E::rankaaba, F::rankaacd, E::a, E::b, E::c;
+ store H into ':OUTPATH:';
+ \,
+ }, {
+ 'num' => 9,
+ 'pig' => q\
+ SET default_parallel 25;
+ A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray);
+ B = rank A;
+ C = order B by rank_A;
+ D = foreach C generate rank_A,rownumber;
+ store D into ':OUTPATH:';
+ \,
+ 'verify_pig_script' => q\
+ A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray);
+ D = foreach A generate idx,rownumber;
+ store D into ':OUTPATH:';
+ \,
+ }, {
+ 'num' => 10,
+ 'pig' => q\
+ SET default_parallel 11;
+ SET pig.splitCombination false;
+ A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray);
+ B = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray);
+ C = join A by rownumber, B by rownumber;
+ D = order C by B::rankcabd,B::rankbdca,B::rankaaba;
+ E = rank D;
+ F = group E by rank_D;
+ G = foreach F generate group, COUNT(E);
+ H = order G by group;
+ store H into ':OUTPATH:';
+ \,
+ 'verify_pig_script' => q\
+ A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray);
+ B = foreach A generate rownumber,1;
+ C = order B by rownumber;
+ store C into ':OUTPATH:';
+ \,
+ }
+ ]
}
],
},
Modified: pig/trunk/test/e2e/pig/tools/generate/generate_data.pl
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tools/generate/generate_data.pl?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tools/generate/generate_data.pl (original)
+++ pig/trunk/test/e2e/pig/tools/generate/generate_data.pl Thu Sep 13 14:55:36 2012
@@ -32,6 +32,16 @@ our @lastName = ("allen", "brown", "cars
"nixon", "ovid", "polk", "quirinius", "robinson", "steinbeck", "thompson",
"underhill", "van buren", "white", "xylophone", "young", "zipper");
+our @rankedTuples = (
+ "1,21,5,7,1,1,0,8,8","2,26,2,3,2,5,1,9,10","3,30,24,21,2,3,1,3,10","4,6,10,8,3,4,1,7,2",
+ "5,8,28,25,3,2,1,0,2","6,28,11,12,4,6,2,7,10","7,9,26,22,5,7,3,2,3","8,5,6,5,6,8,3,8,1",
+ "9,29,16,15,7,9,4,6,10","10,18,12,10,8,11,5,7,6","11,14,17,14,9,10,5,6,5","12,6,12,8,10,11,5,7,2",
+ "13,2,17,13,11,10,5,6,0","14,26,3,3,12,14,6,9,10","15,15,20,18,13,13,6,4,5","16,3,29,24,14,12,6,0,0",
+ "17,23,21,19,15,16,7,4,8","18,19,19,16,16,17,7,5,6","19,20,30,26,16,15,7,0,6","20,12,21,17,17,16,7,4,4",
+ "21,4,1,1,18,19,7,10,1","22,1,7,4,19,18,7,8,0","23,24,14,11,20,21,8,7,9","24,16,25,20,21,20,8,3,5",
+ "25,25,27,23,22,22,9,1,9","26,21,8,7,23,25,9,8,8","27,17,4,2,24,26,9,9,6","28,10,8,6,25,25,9,8,4",
+ "29,11,15,9,25,24,9,7,4","30,12,23,17,25,23,9,4,4");
+
sub randomName()
{
return sprintf("%s %s", $firstName[int(rand(26))],
@@ -473,6 +483,23 @@ sub getBulkCopyCmd(){
my $randf = rand(10);
printf HDFS "%d:%d:%d:%d:%d:%dL:%.2ff:%.2f\n", $tid, $i, $rand5, $rand100, $rand1000, $rand1000, $randf, $randf;
}
+ } elsif ($filetype eq "ranking") {
+ for (my $i = 0; $i < $numRows; $i++) {
+ my $tuple = $rankedTuples[int($i)];
+ printf HDFS "$tuple,";
+ for my $j ( 0 .. 1000000) {
+ printf HDFS "%d",$j;
+ }
+ printf HDFS "\n";
+ }
+ } elsif ($filetype eq "biggish") {
+ for (my $i = 1; $i < $numRows; $i++) {
+ printf HDFS "$i,$i,";
+ for my $j ( 0 .. 1000) {
+ printf HDFS "%d",$j;
+ }
+ printf HDFS "\n";
+ }
} else {
warn "Unknown filetype $filetype\n";
usage();
Modified: pig/trunk/test/org/apache/pig/parser/TestLexer.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLexer.pig?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLexer.pig (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLexer.pig Thu Sep 13 14:55:36 2012
@@ -73,7 +73,13 @@ A = LOAD 'data' AS (f1:int,f2:int,f3:int
X = SAMPLE A 0.01;
+R = rank A by f1;
+R = rank A by f1 ASC, f2 DESC, f3;
+
+R = rank A by *;
+
+R = rank A by * DESC DENSE;
Modified: pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java Thu Sep 13 14:55:36 2012
@@ -411,4 +411,61 @@ public class TestLogicalPlanGenerator {
generateLogicalPlan( query );
}
+ @Test
+ public void testRank01() {
+ String query = "A = LOAD 'data4' AS (name:chararray,surname:chararray,sales:double,code:int);"
+ + "B = rank A by sales;" + "store B into 'rank01_test';";
+ generateLogicalPlan(query);
+ }
+
+ @Test
+ public void testRank02() {
+ String query = "A = LOAD 'data4' AS (name:chararray,surname:chararray,sales:double,code:int);"
+ + "C = rank A by sales DENSE;" + "store C into 'rank02_test';";
+ generateLogicalPlan(query);
+ }
+
+ @Test
+ public void testRank03() {
+ String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+ + "B = rank A;" + "store B into 'rank03_test';";
+ generateLogicalPlan(query);
+ }
+
+ @Test
+ public void testRank04() {
+ String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+ + "C = rank A by postalcode DESC;"
+ + "store C into 'rank04_test';";
+ generateLogicalPlan(query);
+ }
+
+ @Test
+ public void testRank05() {
+ String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+ + "D = rank A by postalcode DENSE;"
+ + "store D into 'rank05_test';";
+ generateLogicalPlan(query);
+ }
+
+ @Test
+ public void testRank06() {
+ String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+ + "C = rank A by x..rz;";
+ generateLogicalPlan(query);
+ }
+
+ @Test
+ public void testRank07() {
+ String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+ + "C = rank A by x ASC, y DESC;";
+ generateLogicalPlan(query);
+ }
+
+ @Test
+ public void testRank08() {
+ String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+ + "C = rank A;";
+ generateLogicalPlan(query);
+ }
}
Modified: pig/trunk/test/org/apache/pig/parser/TestParser.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestParser.pig?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestParser.pig (original)
+++ pig/trunk/test/org/apache/pig/parser/TestParser.pig Thu Sep 13 14:55:36 2012
@@ -90,3 +90,11 @@ H = union onschema A, B;
--stream
C = stream A through CMD;
+
+
+--rank
+
+R = rank A;
+R = rank A by a;
+R = rank A by a DESC;
+R = rank A by a DESC, b;
\ No newline at end of file
Modified: pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java Thu Sep 13 14:55:36 2012
@@ -28,7 +28,7 @@ import org.antlr.runtime.Token;
import org.junit.Test;
public class TestQueryLexer {
-
+
@Test
public void TestLexer() throws IOException {
CharStream input = new QueryParserFileStream( "test/org/apache/pig/parser/TestLexer.pig" );
@@ -45,13 +45,13 @@ public class TestQueryLexer {
System.out.print( token.getText() + "(" + token.getType() + ") " );
}
}
-
+
// While we can check more conditions, such as type of each token, for now I think the following
// is enough. If the token type is wrong, it will be most likely caught by the parser.
- Assert.assertEquals( 419, tokenCount );
+ Assert.assertEquals( 455, tokenCount );
Assert.assertEquals( 0, lexer.getNumberOfSyntaxErrors() );
}
-
+
@Test
public void test2() throws IOException {
String query = "A = load 'input' using PigStorage(';');" +
Modified: pig/trunk/test/org/apache/pig/parser/TestQueryParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryParser.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryParser.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryParser.java Thu Sep 13 14:55:36 2012
@@ -26,6 +26,7 @@ import junit.framework.Assert;
import org.antlr.runtime.CharStream;
import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.MismatchedTokenException;
import org.antlr.runtime.RecognitionException;
import org.antlr.runtime.tree.CommonTree;
import org.antlr.runtime.tree.Tree;
@@ -415,4 +416,61 @@ public class TestQueryParser {
PigServer pig = new PigServer(ExecType.LOCAL);
Util.registerMultiLineQuery(pig, query);
}
+
+
+ //RANK
+ @Test
+ public void testRankPositive1() throws IOException, RecognitionException {
+ shouldPass("B = rank A;");
+ }
+
+ @Test
+ public void testRankPositive2() throws IOException, RecognitionException {
+ shouldPass("B = rank A by x;");
+ }
+
+ @Test
+ public void testRankPositive3() throws IOException, RecognitionException {
+ shouldPass("B = rank A by x DESC;");
+ }
+
+ @Test
+ public void testRankPositive4() throws IOException, RecognitionException {
+ shouldPass("B = rank A by x, y ASC, w DESC, z ASC;");
+ }
+
+ @Test
+ public void testRankPositive5() throws IOException, RecognitionException {
+ String query = "A = load 'data' as (x:int, y:chararray, z:int, rz:chararray);";
+ query += "B = rank A by x..z;";
+ shouldPass(query);
+ }
+
+ @Test
+ public void testRankPositive6() throws IOException, RecognitionException {
+ String query = "A = load 'data' as (x:int, y:chararray, z:int, rz:chararray);";
+ query += "B = rank A by *;";
+ shouldPass(query);
+ }
+
+ @Test
+ public void testRankPositive7() throws IOException, RecognitionException {
+ String query = "A = load 'data' as (x:int, y:chararray, z:int, rz:chararray);";
+ query += "B = rank A by x DESC DENSE;";
+ shouldPass(query);
+ }
+
+ @Test
+ public void testRankPositive8() throws IOException, RecognitionException {
+ String query = "A = load 'data' as (x:int, y:chararray, z:int, rz:chararray);";
+ query += "B = rank A by x DESC,y ASC DENSE;";
+ shouldPass(query);
+ }
+
+ @Test
+ public void testRankPositive9() throws IOException, RecognitionException {
+ String query = "A = load 'data' as (x:int, y:chararray, z:int, rz:chararray);";
+ query += "B = rank A by * DENSE;";
+ shouldPass(query);
+ }
}
Modified: pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java (original)
+++ pig/trunk/test/org/apache/pig/test/OptimizeLimitPlanPrinter.java Thu Sep 13 14:55:36 2012
@@ -57,6 +57,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -179,12 +180,18 @@ public class OptimizeLimitPlanPrinter ex
sb.append(";\n");
appendEdges(loSort);
}
-
+
+ @Override
+ public void visit(LORank loRank) throws FrontendException {
+ appendOp(loRank) ;
+ appendEdges(loRank);
+ }
+
@Override
public void visit(LODistinct loDistinct) throws FrontendException {
appendEdges(loDistinct);
}
-
+
@Override
public void visit(LOLimit loLimit) throws FrontendException {
appendOp(loLimit) ;