You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/11/15 00:38:54 UTC
svn commit: r1409562 - in /pig/trunk: ./
src/org/apache/pig/newplan/logical/expression/ src/org/apache/pig/parser/
test/org/apache/pig/parser/
Author: jcoveney
Date: Wed Nov 14 23:38:53 2012
New Revision: 1409562
URL: http://svn.apache.org/viewvc?rev=1409562&view=rev
Log:
PIG-2937: generated field in nested foreach does not inherit the variable name as the field name (jcoveney)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Nov 14 23:38:53 2012
@@ -76,6 +76,8 @@ PIG-1891 Enable StoreFunc to make intell
IMPROVEMENTS
+PIG-2937: generated field in nested foreach does not inherit the variable name as the field name (jcoveney)
+
PIG-3019: Need a target in build.xml for source releases (gates)
PIG-2832: org.apache.pig.pigunit.pig.PigServer does not initialize udf.import.list of PigContext (prkommireddi via rohini)
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java Wed Nov 14 23:38:53 2012
@@ -29,7 +29,7 @@ import org.apache.pig.parser.SourceLocat
public class BinCondExpression extends LogicalExpression {
/**
- * Will add this operator to the plan and connect it to the
+ * Will add this operator to the plan and connect it to the
* left and right hand side operators and the condition operator
* @param plan plan this operator is part of
* @param lhs expression on its left hand side
@@ -45,7 +45,7 @@ public class BinCondExpression extends L
plan.connect(this, lhs);
plan.connect(this, rhs);
}
-
+
/**
* Returns the operator which handles this condition
* @return expression which handles the condition
@@ -58,16 +58,16 @@ public class BinCondExpression extends L
/**
* Get the left hand side of this expression.
* @return expression on the left hand side
- * @throws FrontendException
+ * @throws FrontendException
*/
public LogicalExpression getLhs() throws FrontendException {
- return (LogicalExpression)plan.getSuccessors(this).get(1);
+ return (LogicalExpression)plan.getSuccessors(this).get(1);
}
/**
* Get the right hand side of this expression.
* @return expression on the right hand side
- * @throws FrontendException
+ * @throws FrontendException
*/
public LogicalExpression getRhs() throws FrontendException {
return (LogicalExpression)plan.getSuccessors(this).get(2);
@@ -83,32 +83,32 @@ public class BinCondExpression extends L
}
((LogicalExpressionVisitor)v).visit(this);
}
-
+
@Override
public boolean isEqual(Operator other) throws FrontendException {
if (other != null && other instanceof BinCondExpression) {
BinCondExpression ao = (BinCondExpression)other;
- return ao.getCondition().isEqual(getCondition()) &&
+ return ao.getCondition().isEqual(getCondition()) &&
ao.getLhs().isEqual(getLhs()) && ao.getRhs().isEqual(getRhs());
} else {
return false;
}
}
-
+
@Override
public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendException {
if (fieldSchema!=null)
return fieldSchema;
-
+
//TypeCheckingExpVisitor will ensure that lhs and rhs have same schema
LogicalFieldSchema argFs = getLhs().getFieldSchema();
fieldSchema = argFs.deepCopy();
fieldSchema.resetUid();
-
+
uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
return fieldSchema;
}
-
+
@Override
public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws FrontendException {
LogicalExpression copy = new BinCondExpression(
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java Wed Nov 14 23:38:53 2012
@@ -27,11 +27,11 @@ import org.apache.pig.newplan.logical.re
/**
* Logical representation of expression operators. Expression operators have
- * a data type and a uid. Uid is a unique id for each expression.
+ * a data type and a uid. Uid is a unique id for each expression.
*
*/
public abstract class LogicalExpression extends Operator {
-
+
static long nextUid = 1;
protected LogicalSchema.LogicalFieldSchema fieldSchema;
protected LogicalSchema.LogicalFieldSchema uidOnlyFieldSchema;
@@ -45,14 +45,22 @@ public abstract class LogicalExpression
nextUid = 1;
}
/**
- *
+ *
* @param name of the operator
* @param plan LogicalExpressionPlan this is part of
*/
public LogicalExpression(String name, OperatorPlan plan) {
super(name, plan);
}
-
+
+ /**
+ * This is a convenience method to avoid the side-effectful nature of getFieldSchema().
+ * It simply returns whether or not fieldSchema is currently null.
+ */
+ public boolean hasFieldSchema() {
+ return fieldSchema != null;
+ }
+
/**
* Get the field schema for the output of this expression operator. This does
* not merely return the field schema variable. If schema is not yet set, this
@@ -62,11 +70,11 @@ public abstract class LogicalExpression
* @throws FrontendException
*/
abstract public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendException;
-
+
public void resetFieldSchema() {
fieldSchema = null;
}
-
+
/**
* Get the data type for this expression.
* @return data type, one of the static bytes of DataType
@@ -76,7 +84,7 @@ public abstract class LogicalExpression
return getFieldSchema().type;
return DataType.BYTEARRAY;
}
-
+
public String toString() {
StringBuilder msg = new StringBuilder();
msg.append("(Name: " + name + " Type: ");
@@ -93,7 +101,7 @@ public abstract class LogicalExpression
return msg.toString();
}
-
+
public void neverUseForRealSetFieldSchema(LogicalFieldSchema fs) throws FrontendException {
fieldSchema = fs;
uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
@@ -118,4 +126,4 @@ public abstract class LogicalExpression
public void resetUid() {
uidOnlyFieldSchema = null;
}
-}
+}
\ No newline at end of file
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java Wed Nov 14 23:38:53 2012
@@ -42,7 +42,7 @@ import org.apache.pig.parser.SourceLocat
*
*/
public class ProjectExpression extends ColumnExpression {
-
+
private int input; // Which input of the relational operator this project
// is projecting from. Count is zero based. So if this
// project is in a filter the input number will always
@@ -52,20 +52,20 @@ public class ProjectExpression extends C
private int col; // The column in the input which the project references.
// Count is zero based.
private String alias; // The alias of the projected field.
-
+
private LogicalRelationalOperator attachedRelationalOp;
- //fields for range projection.
+ //fields for range projection.
private boolean isRangeProject = false;
//start and end columns in range. endCol value of -1 represents everything upto end
private int startCol = -1;
private int endCol = -2;
-
+
private String startAlias;
private String endAlias;
-
-
+
+
/**
* Adds projection to the plan.
* @param plan LogicalExpressionPlan this projection will be a part of
@@ -89,7 +89,7 @@ public class ProjectExpression extends C
* @param inputNum
* @param alias
* @param attachedRelationalOp
- * @throws FrontendException
+ * @throws FrontendException
*/
public ProjectExpression(OperatorPlan plan, int inputNum, String alias,
LogicalRelationalOperator attachedRelationalOp) {
@@ -103,7 +103,7 @@ public class ProjectExpression extends C
/**
* Constructor for range projection
* Adds projection to the plan.
- * The start and end alias/column-number should be set separately.
+ * The start and end alias/column-number should be set separately.
* @param plan
* @param inputNum
* @param attachedRelationalOp
@@ -133,7 +133,7 @@ public class ProjectExpression extends C
this.startAlias = projExpr.startAlias;
this.endAlias = projExpr.endAlias;
plan.add(this);
-
+
}
/**
@@ -155,7 +155,7 @@ public class ProjectExpression extends C
"range projection (..) " + startCol;
throw new PlanValidationException(this, msg, 2270, PigException.BUG);
}
-
+
if(endCol > 0 && startCol > endCol){
String msg = "start column appears after end column in " +
"range projection (..) . Start column position " + startCol +
@@ -166,13 +166,13 @@ public class ProjectExpression extends C
setColNum(findColNum(alias));
}
}
-
+
private int findColNum(String alias) throws FrontendException {
LogicalPlan lp = (LogicalPlan)attachedRelationalOp.getPlan();
List<Operator> inputs = lp.getPredecessors( attachedRelationalOp );
LogicalRelationalOperator input = (LogicalRelationalOperator)inputs.get( getInputNum() );
LogicalSchema inputSchema = input.getSchema();
-
+
if( alias != null ) {
int colNum = inputSchema == null ? -1 : inputSchema.getFieldPosition( alias );
if( colNum == -1 ) {
@@ -187,15 +187,15 @@ public class ProjectExpression extends C
int col = getColNum();
if( inputSchema != null && col >= inputSchema.size() ) {
throw new PlanValidationException( this,
- "Out of bound access. Trying to access non-existent column: " +
- col + ". Schema " + inputSchema.toString(false) +
+ "Out of bound access. Trying to access non-existent column: " +
+ col + ". Schema " + inputSchema.toString(false) +
" has " + inputSchema.size() + " column(s)." , 1000);
}
return col;
}
}
-
+
/**
* @link org.apache.pig.newplan.Operator#accept(org.apache.pig.newplan.PlanVisitor)
*/
@@ -217,12 +217,12 @@ public class ProjectExpression extends C
public int getInputNum() {
return input;
}
-
-
+
+
public void setInputNum(int inputNum) {
input = inputNum;
}
-
+
/**
* Column number this project references. The column number is the column
* in the relational operator that contains this expression. The count
@@ -235,21 +235,21 @@ public class ProjectExpression extends C
}
return col;
}
-
+
public String getColAlias() {
return alias;
}
-
+
/**
* Set the column number for this project. This should only be called by
- * ProjectionPatcher. Stupid Java needs friends.
+ * ProjectionPatcher. Stupid Java needs friends.
* @param colNum new column number for projection
*/
public void setColNum(int colNum) {
col = colNum;
alias = null; // Once the column number is set, alias is no longer needed.
}
-
+
public boolean isProjectStar() {
return col<0;
}
@@ -257,19 +257,19 @@ public class ProjectExpression extends C
public boolean isRangeProject() {
return isRangeProject;
}
-
+
public boolean isRangeOrStarProject(){
return isProjectStar() || isRangeProject();
}
-
+
@Override
public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendException {
if (fieldSchema!=null)
return fieldSchema;
LogicalRelationalOperator referent = findReferent();
-
+
LogicalSchema schema = referent.getSchema();
-
+
if (attachedRelationalOp instanceof LOGenerate && plan.getSuccessors(this)==null) {
if (!(findReferent() instanceof LOInnerLoad)||
((LOInnerLoad)findReferent()).sourceIsBag()) {
@@ -278,7 +278,7 @@ public class ProjectExpression extends C
Pair<List<LOInnerLoad>, Boolean> innerLoadsPair = LOForEach.findReacheableInnerLoadFromBoundaryProject(this);
List<LOInnerLoad> innerLoads = innerLoadsPair.first;
boolean needNewUid = innerLoadsPair.second;
-
+
// pull tuple information from innerload
if (innerLoads.get(0).getProjection().getFieldSchema().schema!=null &&
innerLoads.get(0).getProjection().getFieldSchema().type==DataType.BAG) {
@@ -342,10 +342,10 @@ public class ProjectExpression extends C
} else {
fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);
}
-
+
if (fieldSchema!=null)
uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
- }
+ }
else {
int index = -1;
if (!isRangeOrStarProject() && uidOnlyFieldSchema!=null) {
@@ -364,7 +364,7 @@ public class ProjectExpression extends C
}
if (index==-1)
index = col;
-
+
if (!isRangeOrStarProject()) {
if (schema!=null && schema.size()>index)
fieldSchema = schema.getField(index);
@@ -403,22 +403,22 @@ public class ProjectExpression extends C
if (preds == null || input >= preds.size()) {
throw new FrontendException("Projection with nothing to reference!", 2225);
}
-
+
LogicalRelationalOperator pred =
(LogicalRelationalOperator)preds.get(input);
if (pred == null) {
- throw new FrontendException("Cannot fine reference for " + this, 2226);
+ throw new FrontendException("Cannot find reference for " + this, 2226);
}
return pred;
}
-
+
@Override
public boolean isEqual(Operator other) throws FrontendException {
if (other != null && other instanceof ProjectExpression) {
ProjectExpression po = (ProjectExpression)other;
if (po.input != input || po.col != col)
return false;
-
+
Operator mySucc = getPlan().getSuccessors(this)!=null?
getPlan().getSuccessors(this).get(0):null;
Operator theirSucc = other.getPlan().getSuccessors(other)!=null?
@@ -432,7 +432,7 @@ public class ProjectExpression extends C
return false;
}
}
-
+
public String toString() {
StringBuilder msg = new StringBuilder();
if (fieldSchema!=null && fieldSchema.alias!=null)
@@ -460,18 +460,18 @@ public class ProjectExpression extends C
return msg.toString();
}
-
+
public LogicalRelationalOperator getAttachedRelationalOp() {
return attachedRelationalOp;
}
-
+
public void setAttachedRelationalOp(LogicalRelationalOperator attachedRelationalOp) {
this.attachedRelationalOp = attachedRelationalOp;
}
-
+
@Override
public byte getType() throws FrontendException {
- // for boundary project, if
+ // for boundary project, if
if (getFieldSchema()==null) {
if (attachedRelationalOp instanceof LOGenerate && findReferent() instanceof
LOInnerLoad) {
@@ -513,7 +513,7 @@ public class ProjectExpression extends C
/**
* @param startAlias
- * @throws FrontendException
+ * @throws FrontendException
*/
public void setStartAlias(String startAlias) throws FrontendException {
this.startAlias = startAlias;
@@ -521,7 +521,7 @@ public class ProjectExpression extends C
/**
* @param endAlias
- * @throws FrontendException
+ * @throws FrontendException
*/
public void setEndAlias(String endAlias) throws FrontendException {
this.endAlias = endAlias;
@@ -535,13 +535,13 @@ public class ProjectExpression extends C
this.getColNum(),
this.getAttachedRelationalOp());
copy.setLocation( new SourceLocation( location ) );
- copy.alias = alias;
+ copy.alias = alias;
copy.isRangeProject = this.isRangeProject;
copy.startCol = this.startCol;
copy.endCol = this.endCol;
copy.startAlias = this.startAlias;
copy.endAlias = this.endAlias;
-
+
return copy;
}
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Wed Nov 14 23:38:53 2012
@@ -107,13 +107,13 @@ public class LogicalPlanBuilder {
private IntStream intStream;
private int storeIndex = 0;
private int loadIndex = 0;
-
+
private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
-
+
public static long getNextId(String scope) {
return nodeIdGen.getNextNodeId( scope );
}
-
+
LogicalPlanBuilder(PigContext pigContext, String scope, Map<String, String> fileNameMap,
IntStream input) {
this.pigContext = pigContext;
@@ -121,7 +121,7 @@ public class LogicalPlanBuilder {
this.fileNameMap = fileNameMap;
this.intStream = input;
}
-
+
LogicalPlanBuilder(IntStream input) throws ExecException {
pigContext = new PigContext( ExecType.LOCAL, new Properties() );
pigContext.connect();
@@ -129,7 +129,7 @@ public class LogicalPlanBuilder {
this.fileNameMap = new HashMap<String, String>();
this.intStream = input;
}
-
+
Operator lookupOperator(String alias) {
return operators.get( alias );
}
@@ -137,27 +137,27 @@ public class LogicalPlanBuilder {
FuncSpec lookupFunction(String alias) {
return pigContext.getFuncSpecFromAlias( alias );
}
-
+
StreamingCommand lookupCommand(String alias) {
return pigContext.getCommandForAlias( alias );
}
-
+
void defineCommand(String alias, StreamingCommand command) {
pigContext.registerStreamCmd( alias, command );
}
-
+
void defineFunction(String alias, FuncSpec fs) {
pigContext.registerFunction( alias, fs );
}
-
+
LogicalPlan getPlan() {
return plan;
}
-
+
Map<String, Operator> getOperators() {
return operators;
}
-
+
LOFilter createFilterOp() {
return new LOFilter( plan );
}
@@ -165,26 +165,26 @@ public class LogicalPlanBuilder {
LOLimit createLimitOp() {
return new LOLimit( plan );
}
-
+
LOFilter createSampleOp() {
return new LOFilter( plan, true );
}
-
- String buildFilterOp(SourceLocation loc, LOFilter op, String alias,
+
+ String buildFilterOp(SourceLocation loc, LOFilter op, String alias,
String inputAlias, LogicalExpressionPlan expr)
throws ParserValidationException {
-
+
op.setFilterPlan( expr );
- alias = buildOp( loc, op, alias, inputAlias, null ); // it should actually return same alias
+ alias = buildOp( loc, op, alias, inputAlias, null ); // it should actually return same alias
try {
(new ProjStarInUdfExpander(op.getPlan())).visit(op);
new SchemaResetter(op.getPlan(), true).visit(op);
} catch (FrontendException e) {
throw new ParserValidationException( intStream, loc, e );
- }
+ }
return alias;
}
-
+
String buildDistinctOp(SourceLocation loc, String alias, String inputAlias, String partitioner) throws ParserValidationException {
LODistinct op = new LODistinct( plan );
return buildOp( loc, op, alias, inputAlias, partitioner );
@@ -194,16 +194,16 @@ public class LogicalPlanBuilder {
LOLimit op = new LOLimit( plan, limit );
return buildOp( loc, op, alias, inputAlias, null );
}
-
+
String buildLimitOp(SourceLocation loc, LOLimit op, String alias, String inputAlias, LogicalExpressionPlan expr) throws ParserValidationException {
op.setLimitPlan(expr);
return buildOp(loc, op, alias, inputAlias, null);
}
-
+
String buildSampleOp(SourceLocation loc, String alias, String inputAlias, double value,
SourceLocation valLoc)
throws ParserValidationException {
-
+
LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
// Generate a filter condition.
LogicalExpression konst = new ConstantExpression( filterPlan, value);
@@ -213,16 +213,16 @@ public class LogicalPlanBuilder {
LOFilter filter = new LOFilter( plan, true );
return buildFilterOp( loc, filter, alias, inputAlias, filterPlan );
}
-
+
String buildSampleOp(SourceLocation loc, LOFilter filter, String alias, String inputAlias,
LogicalExpressionPlan samplePlan, LogicalExpression expr)
throws ParserValidationException {
-
+
UserFuncExpression udf = new UserFuncExpression( samplePlan, new FuncSpec( RANDOM.class.getName() ) );
new LessThanExpression( samplePlan, udf, expr );
return buildFilterOp( loc, filter, alias, inputAlias, samplePlan );
}
-
+
String buildUnionOp(SourceLocation loc, String alias, List<String> inputAliases, boolean onSchema) throws ParserValidationException {
LOUnion op = new LOUnion( plan, onSchema );
return buildOp( loc, op, alias, inputAliases, null );
@@ -232,17 +232,17 @@ public class LogicalPlanBuilder {
LOSplit op = new LOSplit( plan );
return buildOp( loc, op, null, inputAlias, null );
}
-
+
LOSplitOutput createSplitOutputOp() {
return new LOSplitOutput( plan );
}
-
+
String buildSplitOutputOp(SourceLocation loc, LOSplitOutput op, String alias, String inputAlias,
LogicalExpressionPlan filterPlan) throws ParserValidationException {
op.setFilterPlan( filterPlan );
return buildOp ( loc, op, alias, inputAlias, null );
}
-
+
String buildSplitOtherwiseOp(SourceLocation loc, LOSplitOutput op, String alias, String inputAlias)
throws ParserValidationException, PlanGenerationFailureException {
LogicalExpressionPlan splitPlan = new LogicalExpressionPlan();
@@ -284,17 +284,17 @@ public class LogicalPlanBuilder {
op.setFilterPlan(splitPlan);
return buildOp(loc, op, alias, inputAlias, null);
}
-
+
String buildCrossOp(SourceLocation loc, String alias, List<String> inputAliases, String partitioner) throws ParserValidationException {
LOCross op = new LOCross( plan );
return buildOp ( loc, op, alias, inputAliases, partitioner );
}
-
+
LOSort createSortOp() {
return new LOSort( plan );
}
-
- String buildSortOp(SourceLocation loc, LOSort sort, String alias, String inputAlias, List<LogicalExpressionPlan> plans,
+
+ String buildSortOp(SourceLocation loc, LOSort sort, String alias, String inputAlias, List<LogicalExpressionPlan> plans,
List<Boolean> ascFlags, FuncSpec fs) throws ParserValidationException {
sort.setSortColPlans( plans );
sort.setUserFunc( fs );
@@ -342,15 +342,15 @@ public class LogicalPlanBuilder {
else {
op.pinOption(LOJoin.OPTION_JOIN);
}
-
+
int inputCount = inputAliases.size();
-
+
if( jt == JOINTYPE.SKEWED ) {
if( partitioner != null ) {
throw new ParserValidationException( intStream, loc,
"Custom Partitioner is not supported for skewed join" );
}
-
+
if( inputCount != 2 ) {
throw new ParserValidationException( intStream, loc,
"Skewed join can only be applied for 2-way joins" );
@@ -394,7 +394,7 @@ public class LogicalPlanBuilder {
throw new ParserValidationException(intStream, loc, e);
}
}
-
+
LOCube createCubeOp() {
return new LOCube(plan);
}
@@ -772,20 +772,20 @@ public class LogicalPlanBuilder {
}
}
-
+
LOCogroup createGroupOp() {
return new LOCogroup( plan );
}
-
- String buildGroupOp(SourceLocation loc, LOCogroup op, String alias, List<String> inputAliases,
+
+ String buildGroupOp(SourceLocation loc, LOCogroup op, String alias, List<String> inputAliases,
MultiMap<Integer, LogicalExpressionPlan> expressionPlans, GROUPTYPE gt, List<Boolean> innerFlags,
String partitioner) throws ParserValidationException {
if( gt == GROUPTYPE.COLLECTED ) {
if( inputAliases.size() > 1 ) {
- throw new ParserValidationException( intStream, loc,
+ throw new ParserValidationException( intStream, loc,
"Collected group is only supported for single input" );
}
-
+
List<LogicalExpressionPlan> exprPlans = expressionPlans.get( 0 );
for( LogicalExpressionPlan exprPlan : exprPlans ) {
Iterator<Operator> it = exprPlan.getOperators();
@@ -797,7 +797,7 @@ public class LogicalPlanBuilder {
}
}
}
-
+
boolean[] flags = new boolean[innerFlags.size()];
for( int i = 0; i < innerFlags.size(); i++ ) {
flags[i] = innerFlags.get( i );
@@ -846,15 +846,15 @@ public class LogicalPlanBuilder {
return buildOp( loc, op, alias, new ArrayList<String>(), null );
}
- private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias,
+ private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias,
String inputAlias, String partitioner) throws ParserValidationException {
List<String> inputAliases = new ArrayList<String>();
if( inputAlias != null )
inputAliases.add( inputAlias );
return buildOp( loc, op, alias, inputAliases, partitioner );
}
-
- private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias,
+
+ private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias,
List<String> inputAliases, String partitioner) throws ParserValidationException {
setAlias( op, alias );
setPartitioner( op, partitioner );
@@ -868,7 +868,7 @@ public class LogicalPlanBuilder {
plan.connect( pred, op );
}
operators.put( op.getAlias(), op );
- pigContext.setLastAlias(op.getAlias());
+ pigContext.setLastAlias(op.getAlias());
return op.getAlias();
}
@@ -908,15 +908,15 @@ public class LogicalPlanBuilder {
private String newOperatorKey() {
return new OperatorKey( scope, getNextId() ).toString();
}
-
+
public static String newOperatorKey(String scope) {
return new OperatorKey( scope, getNextId(scope)).toString();
}
-
+
LOForEach createForeachOp() {
return new LOForEach( plan );
}
-
+
String buildForeachOp(SourceLocation loc, LOForEach op, String alias, String inputAlias, LogicalPlan innerPlan)
throws ParserValidationException {
op.setInnerPlan( innerPlan );
@@ -924,30 +924,43 @@ public class LogicalPlanBuilder {
expandAndResetVisitor(loc, op);
return alias;
}
-
+
LOGenerate createGenerateOp(LogicalPlan plan) {
return new LOGenerate( plan );
}
-
+
void buildGenerateOp(SourceLocation loc, LOForEach foreach, LOGenerate gen,
Map<String, Operator> operators,
List<LogicalExpressionPlan> exprPlans, List<Boolean> flattenFlags,
List<LogicalSchema> schemas)
- throws ParserValidationException{
-
+ throws ParserValidationException {
+
boolean[] flags = new boolean[ flattenFlags.size() ];
for( int i = 0; i < flattenFlags.size(); i++ )
flags[i] = flattenFlags.get( i );
LogicalPlan innerPlan = (LogicalPlan)gen.getPlan();
ArrayList<Operator> inputs = new ArrayList<Operator>();
+ int idx = 0;
for( LogicalExpressionPlan exprPlan : exprPlans ) {
+ LogicalExpression expr = (LogicalExpression)exprPlan.getSources().get(0);
+ LogicalSchema userSchema = schemas.get(idx);
+ if (userSchema == null && expr.hasFieldSchema()) {
+ LogicalSchema ls = new LogicalSchema();
+ try {
+ ls.addField(expr.getFieldSchema());
+ schemas.set(idx, ls);
+ } catch (FrontendException e) {
+ // if we get an exception, then we have no schema to set
+ }
+ }
+ idx++;
try {
processExpressionPlan( foreach, innerPlan, exprPlan, operators, inputs );
} catch (FrontendException e) {
throw new ParserValidationException(intStream, loc, e);
}
}
-
+
gen.setOutputPlans( exprPlans );
gen.setFlattenFlags( flags );
gen.setUserDefinedSchema( schemas );
@@ -957,21 +970,21 @@ public class LogicalPlanBuilder {
innerPlan.connect( input, gen );
}
}
-
+
/**
* Process expression plans of LOGenerate and set inputs relation
* for the ProjectExpression
- * @param foreach
+ * @param foreach
* @param lp Logical plan in which the LOGenerate is in
* @param plan One of the output expression of the LOGenerate
* @param operators All logical operators in lp;
* @param inputs inputs of the LOGenerate
- * @throws FrontendException
+ * @throws FrontendException
*/
private static void processExpressionPlan(LOForEach foreach,
- LogicalPlan lp,
- LogicalExpressionPlan plan,
- Map<String, Operator> operators,
+ LogicalPlan lp,
+ LogicalExpressionPlan plan,
+ Map<String, Operator> operators,
ArrayList<Operator> inputs ) throws FrontendException {
Iterator<Operator> it = plan.getOperators();
while( it.hasNext() ) {
@@ -981,7 +994,7 @@ public class LogicalPlanBuilder {
ProjectExpression projExpr = (ProjectExpression)sink;
String colAlias = projExpr.getColAlias();
if( projExpr.isRangeProject()){
-
+
LOInnerLoad innerLoad = new LOInnerLoad( lp, foreach,
new ProjectExpression(projExpr, new LogicalExpressionPlan())
);
@@ -994,7 +1007,7 @@ public class LogicalPlanBuilder {
// this means the project expression refers to a relation
// in the nested foreach
- //add the relation to inputs of LOGenerate and set
+ //add the relation to inputs of LOGenerate and set
// projection input
int index = inputs.indexOf( op );
if( index == -1 ) {
@@ -1019,7 +1032,7 @@ public class LogicalPlanBuilder {
}
}
}
-
+
private static void setupInnerLoadAndProj(LOInnerLoad innerLoad,
ProjectExpression projExpr, LogicalPlan lp,
ArrayList<Operator> inputs) {
@@ -1028,10 +1041,10 @@ public class LogicalPlanBuilder {
projExpr.setColNum( -1 ); // Projection Expression on InnerLoad is always (*).
lp.add( innerLoad );
inputs.add( innerLoad );
-
+
}
- Operator buildNestedOperatorInput(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
+ Operator buildNestedOperatorInput(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
Map<String, Operator> operators, LogicalExpression expr)
throws NonProjectExpressionException, ParserValidationException {
OperatorPlan plan = expr.getPlan();
@@ -1056,7 +1069,7 @@ public class LogicalPlanBuilder {
}
return op;
}
-
+
private LOInnerLoad createInnerLoad(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
String colAlias) throws ParserValidationException {
try {
@@ -1072,7 +1085,7 @@ public class LogicalPlanBuilder {
StreamingCommand command = null;
try {
command = buildCommand( loc, cmd );
-
+
// Process ship paths
if( shipPaths != null ) {
if( shipPaths.size() == 0 ) {
@@ -1082,25 +1095,25 @@ public class LogicalPlanBuilder {
command.addPathToShip( path );
}
}
-
+
// Process cache paths
if( cachePaths != null ) {
for( String path : cachePaths )
command.addPathToCache( path );
}
-
+
// Process input handle specs
if( inputHandleSpecs != null ) {
for( HandleSpec spec : inputHandleSpecs )
command.addHandleSpec( Handle.INPUT, spec );
}
-
+
// Process output handle specs
if( outputHandleSpecs != null ) {
for( HandleSpec spec : outputHandleSpecs )
command.addHandleSpec( Handle.OUTPUT, spec );
}
-
+
// error handling
if( logDir != null )
command.setLogDir( logDir );
@@ -1109,10 +1122,10 @@ public class LogicalPlanBuilder {
} catch(IOException e) {
throw new PlanGenerationFailureException( intStream, loc, e );
}
-
+
return command;
}
-
+
StreamingCommand buildCommand(SourceLocation loc, String cmd) throws RecognitionException {
try {
String[] args = StreamingCommandUtils.splitArgs( cmd );
@@ -1124,7 +1137,7 @@ public class LogicalPlanBuilder {
throw new InvalidCommandException( intStream, loc, cmd );
}
}
-
+
String buildStreamOp(SourceLocation loc, String alias, String inputAlias, StreamingCommand command,
LogicalSchema schema, IntStream input)
throws RecognitionException {
@@ -1135,7 +1148,7 @@ public class LogicalPlanBuilder {
throw new PlanGenerationFailureException( input, loc, ex );
}
}
-
+
String buildNativeOp(SourceLocation loc, String inputJar, String cmd,
List<String> paths, String storeAlias, String loadAlias, IntStream input)
throws RecognitionException {
@@ -1157,31 +1170,31 @@ public class LogicalPlanBuilder {
throw new InvalidPathException( input, loc, e);
}
}
-
+
void setAlias(LogicalRelationalOperator op, String alias) {
if( alias == null )
alias = newOperatorKey();
op.setAlias( alias );
}
-
+
void setParallel(LogicalRelationalOperator op, Integer parallel) {
if( parallel != null ) {
op.setRequestedParallelism( pigContext.getExecType() == ExecType.LOCAL ? 1 : parallel );
}
}
-
+
static void setPartitioner(LogicalRelationalOperator op, String partitioner) {
if( partitioner != null )
op.setCustomPartitioner( partitioner );
}
-
+
FuncSpec buildFuncSpec(SourceLocation loc, String funcName, List<String> args, byte ft) throws RecognitionException {
String[] argArray = new String[args.size()];
FuncSpec funcSpec = new FuncSpec( funcName, args.size() == 0 ? null : args.toArray( argArray ) );
validateFuncSpec( loc, funcSpec, ft );
return funcSpec;
}
-
+
private void validateFuncSpec(SourceLocation loc, FuncSpec funcSpec, byte ft) throws RecognitionException {
switch (ft) {
case FunctionType.COMPARISONFUNC:
@@ -1197,15 +1210,15 @@ public class LogicalPlanBuilder {
}
}
}
-
+
static String unquote(String s) {
return StringUtils.unescapeInputString( s.substring(1, s.length() - 1 ) );
}
-
+
static int undollar(String s) {
- return Integer.parseInt( s.substring( 1, s.length() ) );
+ return Integer.parseInt( s.substring( 1, s.length() ) );
}
-
+
/**
* Parse the long given as a string such as "34L".
*/
@@ -1218,23 +1231,23 @@ public class LogicalPlanBuilder {
TupleFactory tf = TupleFactory.getInstance();
return tf.newTuple( objList );
}
-
+
static DataBag createDataBag() {
BagFactory bagFactory = BagFactory.getInstance();
return bagFactory.newDefaultBag();
}
-
+
/**
* Build a project expression in foreach inner plan.
* The only difference here is that the projection can be for an expression alias, for which
* we will return whatever the expression alias represents.
- * @throws RecognitionException
+ * @throws RecognitionException
*/
LogicalExpression buildProjectExpr(SourceLocation loc, LogicalExpressionPlan plan, LogicalRelationalOperator op,
Map<String, LogicalExpressionPlan> exprPlans, String colAlias, int col)
throws RecognitionException {
ProjectExpression result = null;
-
+
if( colAlias != null ) {
LogicalExpressionPlan exprPlan = exprPlans.get( colAlias );
if( exprPlan != null ) {
@@ -1246,7 +1259,7 @@ public class LogicalPlanBuilder {
throw new PlanGenerationFailureException( intStream, loc, ex );
}
// The projected alias is actually expression alias, so the projections in the represented
- // expression doesn't have any operator associated with it. We need to set it when we
+ // expression doesn't have any operator associated with it. We need to set it when we
// substitute the expression alias with the its expression.
if( op != null ) {
Iterator<Operator> it = plan.getOperators();
@@ -1258,7 +1271,17 @@ public class LogicalPlanBuilder {
}
}
}
- return (LogicalExpression)planCopy.getSources().get( 0 );// get the root of the plan
+ LogicalExpression root = (LogicalExpression)planCopy.getSources().get( 0 );// get the root of the plan
+ LogicalFieldSchema schema;
+ try {
+ schema = root.getFieldSchema();
+ if (schema.alias == null) {
+ schema.alias = colAlias;
+ }
+ } catch (FrontendException e) {
+ // Sometimes it can throw an exception. If it does, then there is no schema to get
+ }
+ return root;
} else {
result = new ProjectExpression( plan, 0, colAlias, op );
result.setLocation( loc );
@@ -1272,9 +1295,9 @@ public class LogicalPlanBuilder {
/**
* Build a project expression for a projection present in global plan (not in nested foreach plan).
- * @throws ParserValidationException
+ * @throws ParserValidationException
*/
- LogicalExpression buildProjectExpr(SourceLocation loc,
+ LogicalExpression buildProjectExpr(SourceLocation loc,
LogicalExpressionPlan plan, LogicalRelationalOperator relOp,
int input, String colAlias, int col)
throws ParserValidationException {
@@ -1288,28 +1311,28 @@ public class LogicalPlanBuilder {
/**
* Build a project expression that projects a range of columns
- * @param loc
+ * @param loc
* @param plan
* @param relOp
* @param input
- * @param startExpr the first expression to be projected, null
+ * @param startExpr the first expression to be projected, null
* if everything from first is to be projected
- * @param endExpr the last expression to be projected, null
+ * @param endExpr the last expression to be projected, null
* if everything to the end is to be projected
* @return project expression
- * @throws ParserValidationException
+ * @throws ParserValidationException
*/
LogicalExpression buildRangeProjectExpr(SourceLocation loc, LogicalExpressionPlan plan, LogicalRelationalOperator relOp,
int input, LogicalExpression startExpr, LogicalExpression endExpr)
throws ParserValidationException {
-
+
if(startExpr == null && endExpr == null){
// should not reach here as the parser is enforcing this condition
String msg = "in range project (..) at least one of start or end " +
"has to be specified. Use project-star (*) instead.";
throw new ParserValidationException(intStream, loc, msg);
}
-
+
ProjectExpression proj = new ProjectExpression(plan, input, relOp);
//set first column to be projected
@@ -1328,7 +1351,7 @@ public class LogicalPlanBuilder {
}else{
proj.setStartCol(0);//project from first column
}
-
+
//set last column to be projected
if(endExpr != null){
checkRangeProjectExpr(loc, endExpr);
@@ -1345,7 +1368,7 @@ public class LogicalPlanBuilder {
}else{
proj.setEndCol(-1); //project to last column
}
-
+
try {
if(startExpr != null)
plan.removeAndReconnect(startExpr);
@@ -1354,8 +1377,8 @@ public class LogicalPlanBuilder {
} catch (FrontendException e) {
throw new ParserValidationException(intStream, loc, e);
}
-
-
+
+
return proj;
}
@@ -1367,7 +1390,7 @@ public class LogicalPlanBuilder {
" Found :" + startExpr;
throw new ParserValidationException(intStream, loc, msg);
}
-
+
}
LogicalExpression buildUDF(SourceLocation loc, LogicalExpressionPlan plan,
@@ -1381,7 +1404,7 @@ public class LogicalPlanBuilder {
} catch (Exception e) {
throw new PlanGenerationFailureException(intStream, loc, e);
}
-
+
FuncSpec funcSpec = pigContext.getFuncSpecFromAlias(funcName);
LogicalExpression le;
if( funcSpec == null ) {
@@ -1393,11 +1416,11 @@ public class LogicalPlanBuilder {
} else {
le = new UserFuncExpression(plan, funcSpec, args, true);
}
-
+
le.setLocation(loc);
return le;
}
-
+
private long getNextId() {
return getNextId(scope);
}
@@ -1405,13 +1428,13 @@ public class LogicalPlanBuilder {
static LOFilter createNestedFilterOp(LogicalPlan plan) {
return new LOFilter( plan );
}
-
+
static LOLimit createNestedLimitOp(LogicalPlan plan) {
return new LOLimit ( plan );
}
-
+
// Build operator for foreach inner plan.
- Operator buildNestedFilterOp(SourceLocation loc, LOFilter op, LogicalPlan plan, String alias,
+ Operator buildNestedFilterOp(SourceLocation loc, LOFilter op, LogicalPlan plan, String alias,
Operator inputOp, LogicalExpressionPlan expr) {
op.setFilterPlan( expr );
buildNestedOp( loc, plan, op, alias, inputOp );
@@ -1429,21 +1452,21 @@ public class LogicalPlanBuilder {
buildNestedOp( loc, plan, op, alias, inputOp );
return op;
}
-
- Operator buildNestedLimitOp(SourceLocation loc, LOLimit op, LogicalPlan plan, String alias,
+
+ Operator buildNestedLimitOp(SourceLocation loc, LOLimit op, LogicalPlan plan, String alias,
Operator inputOp, LogicalExpressionPlan expr) {
op.setLimitPlan( expr );
buildNestedOp( loc, plan, op, alias, inputOp );
return op;
}
-
+
Operator buildNestedCrossOp(SourceLocation loc, LogicalPlan plan, String alias, List<Operator> inputOpList) {
LOCross op = new LOCross( plan );
op.setNested(true);
buildNestedOp( loc, plan, op, alias, inputOpList );
return op;
}
-
+
private void buildNestedOp(SourceLocation loc, LogicalPlan plan, LogicalRelationalOperator op,
String alias, Operator inputOp) {
op.setLocation( loc );
@@ -1451,7 +1474,7 @@ public class LogicalPlanBuilder {
plan.add( op );
plan.connect( inputOp, op );
}
-
+
private void buildNestedOp(SourceLocation loc, LogicalPlan plan, LogicalRelationalOperator op,
String alias, List<Operator> inputOpList) {
op.setLocation( loc );
@@ -1465,7 +1488,7 @@ public class LogicalPlanBuilder {
static LOSort createNestedSortOp(LogicalPlan plan) {
return new LOSort( plan );
}
-
+
/**
* For any UNKNOWN type in the schema fields, set the type to BYTEARRAY
* @param sch
@@ -1480,13 +1503,13 @@ public class LogicalPlanBuilder {
}
}
}
-
+
static LOForEach createNestedForeachOp(LogicalPlan plan) {
return new LOForEach(plan);
}
-
+
Operator buildNestedSortOp(SourceLocation loc, LOSort op, LogicalPlan plan, String alias, Operator inputOp,
- List<LogicalExpressionPlan> plans,
+ List<LogicalExpressionPlan> plans,
List<Boolean> ascFlags, FuncSpec fs) {
op.setSortColPlans( plans );
if (ascFlags.isEmpty()) {
@@ -1498,8 +1521,8 @@ public class LogicalPlanBuilder {
buildNestedOp( loc, plan, op, alias, inputOp );
return op;
}
-
- Operator buildNestedForeachOp(SourceLocation loc, LOForEach op, LogicalPlan plan, String alias,
+
+ Operator buildNestedForeachOp(SourceLocation loc, LOForEach op, LogicalPlan plan, String alias,
Operator inputOp, LogicalPlan innerPlan)
throws ParserValidationException
{
@@ -1507,8 +1530,8 @@ public class LogicalPlanBuilder {
buildNestedOp(loc, plan, op, alias, inputOp);
return op;
}
-
- Operator buildNestedProjectOp(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
+
+ Operator buildNestedProjectOp(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
Map<String, Operator> operators,
String alias,
ProjectExpression projExpr,
@@ -1532,19 +1555,19 @@ public class LogicalPlanBuilder {
input = new LOInnerLoad( innerPlan, foreach, projExpr.getColNum() );
input.setLocation( projExpr.getLocation() );
}
-
+
LogicalPlan lp = new LogicalPlan(); // f's inner plan
LOForEach f = new LOForEach( innerPlan );
f.setInnerPlan( lp );
f.setLocation( loc );
LOGenerate gen = new LOGenerate( lp );
boolean[] flatten = new boolean[exprPlans.size()];
-
+
List<Operator> innerLoads = new ArrayList<Operator>( exprPlans.size() );
for( LogicalExpressionPlan plan : exprPlans ) {
ProjectExpression pe = (ProjectExpression)plan.getSinks().get( 0 );
String al = pe.getColAlias();
- LOInnerLoad iload = ( al == null ) ?
+ LOInnerLoad iload = ( al == null ) ?
new LOInnerLoad( lp, f, pe.getColNum() ) : createInnerLoad(loc, lp, f, al );
iload.setLocation( pe.getLocation() );
pe.setColNum( -1 );
@@ -1552,7 +1575,7 @@ public class LogicalPlanBuilder {
pe.setAttachedRelationalOp( gen );
innerLoads.add( iload );
}
-
+
gen.setOutputPlans( exprPlans );
gen.setFlattenFlags( flatten );
lp.add( gen );
@@ -1561,7 +1584,7 @@ public class LogicalPlanBuilder {
lp.add( il );
lp.connect( il, gen );
}
-
+
// Connect the inner load operators to gen
setAlias( f, alias );
innerPlan.add( input );
@@ -1569,10 +1592,10 @@ public class LogicalPlanBuilder {
innerPlan.connect( input, f );
return f;
}
-
+
GROUPTYPE parseGroupType(String hint, SourceLocation loc) throws ParserValidationException {
String modifier = unquote( hint );
-
+
if( modifier.equalsIgnoreCase( "collected" ) ) {
return GROUPTYPE.COLLECTED;
} else if( modifier.equalsIgnoreCase( "regular" ) ){
@@ -1584,12 +1607,12 @@ public class LogicalPlanBuilder {
"Only COLLECTED, REGULAR or MERGE are valid GROUP modifiers." );
}
}
-
+
JOINTYPE parseJoinType(String hint, SourceLocation loc) throws ParserValidationException {
String modifier = unquote( hint );
if( modifier.equalsIgnoreCase( "repl" ) || modifier.equalsIgnoreCase( "replicated" ) ) {
- return JOINTYPE.REPLICATED;
+ return JOINTYPE.REPLICATED;
} else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
return LOJoin.JOINTYPE.HASH;
} else if( modifier.equalsIgnoreCase( "skewed" ) ) {
@@ -1603,7 +1626,7 @@ public class LogicalPlanBuilder {
"Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
}
}
-
+
void putOperator(String alias, Operator op) {
operators.put(alias, op);
}
Modified: pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java Wed Nov 14 23:38:53 2012
@@ -18,24 +18,44 @@
package org.apache.pig.parser;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
import java.io.IOException;
-
-import junit.framework.Assert;
+import java.util.Iterator;
+import java.util.List;
import org.antlr.runtime.MismatchedTokenException;
-import org.antlr.runtime.NoViableAltException;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.test.Util;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
public class TestLogicalPlanGenerator {
static File command;
-
+
+ private PigServer pigServer;
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
@BeforeClass
public static void oneTimeSetup() throws IOException, Exception {
- // Perl script
- String[] script =
+ // Perl script
+ String[] script =
new String[] {
"#!/usr/bin/perl",
"open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -47,10 +67,10 @@ public class TestLogicalPlanGenerator {
};
command = Util.createInputFile("script", "pl", script);
}
-
+
@Test
- public void test1() {
- String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
+ public void test1() throws Exception {
+ String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
"B = limit A 100; " +
"C = filter B by 2 > 1; " +
"D = load 'y' as (d1, d2); " +
@@ -61,8 +81,8 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void test2() {
- String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
+ public void test2() throws Exception {
+ String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
"B = distinct A partition by org.apache.pig.Identity; " +
"C = sample B 0.49; " +
"D = order C by $0, $1; " +
@@ -76,25 +96,21 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void test3() {
- String query = "a = load '1.txt' as (name, age, gpa);" +
+ public void test3() throws Exception {
+ String query = "a = load '1.txt' as (name, age, gpa);" +
"b = group a by name PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner2;" +
"c = foreach b generate group, COUNT(a.age);" +
"store c into 'y';";
generateLogicalPlan( query );
}
-
- private void generateLogicalPlan(String query) {
- try {
- ParserTestingUtils.generateLogicalPlan( query );
- } catch(Exception ex) {
- Assert.fail( "Failed to generate logical plan for query [" + query + "] due to exception: " + ex );
- }
+
+ private void generateLogicalPlan(String query) throws Exception {
+ ParserTestingUtils.generateLogicalPlan( query );
}
@Test
- public void test4() {
- String query = "A = load 'x'; " +
+ public void test4() throws Exception {
+ String query = "A = load 'x'; " +
"B = mapreduce '" + "myjar.jar" + "' " +
"Store A into 'table_testNativeMRJobSimple_input' "+
"Load 'table_testNativeMRJobSimple_output' "+
@@ -107,7 +123,7 @@ public class TestLogicalPlanGenerator {
// Test define function.
@Test
- public void test5() {
+ public void test5() throws Exception {
String query = "define myudf org.apache.pig.builtin.PigStorage( ',' );" +
"A = load 'x' using myudf;" +
"store A into 'y';";
@@ -115,7 +131,7 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void test6() {
+ public void test6() throws Exception {
String query = "A = load 'x' as ( a : int, b, c : chararray );" +
"B = group A by ( a, $2 );" +
"store B into 'y';";
@@ -123,7 +139,7 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void test7() {
+ public void test7() throws Exception {
String query = "A = load 'x' as ( a : int, b, c : chararray );" +
"B = foreach A generate a, $2;" +
"store B into 'y';";
@@ -131,16 +147,16 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void test8() {
+ public void test8() throws Exception {
String query = "A = load 'x' as ( a : int, b, c : chararray );" +
"B = group A by a;" +
"C = foreach B { S = A.b; generate S; };" +
"store C into 'y';";
generateLogicalPlan( query );
}
-
+
@Test
- public void test9() {
+ public void test9() throws Exception {
String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
"B = foreach A { R = a; S = R.u; T = limit S 100; generate S, T, c + d/5; };" +
"store B into 'y';";
@@ -148,7 +164,7 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void test10() {
+ public void test10() throws Exception {
String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
"B = foreach A { S = a; T = limit S 100; generate T; };" +
"store B into 'y';";
@@ -156,7 +172,7 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void test11() {
+ public void test11() throws Exception {
String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
"B = foreach A { T = limit a 100; generate T; };" +
"store B into 'y';";
@@ -164,7 +180,7 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void test12() {
+ public void test12() throws Exception {
String query = "define CMD `perl GroupBy.pl '\t' 0 1` ship('"+Util.encodeEscape(command.toString())+"');" +
"A = load 'x';" +
"B = group A by $0;" +
@@ -176,18 +192,18 @@ public class TestLogicalPlanGenerator {
"store E into 'y';";
generateLogicalPlan( query );
}
-
+
@Test
- public void test13() {
+ public void test13() throws Exception {
String query = "define CMD `perl PigStreaming.pl` ship('"+Util.encodeEscape(command.toString())+"') stderr('CMD');" +
"A = load 'x';" +
"C = stream A through CMD;" +
"store C into 'y';";
generateLogicalPlan( query );
}
-
+
@Test
- public void test14() {
+ public void test14() throws Exception {
String query = "a = load 'x1' using PigStorage() as (name, age:int, gpa);" +
"b = load 'x2' as (name, age, registration, contributions);" +
"e = cogroup a by name, b by name parallel 8;" +
@@ -196,9 +212,9 @@ public class TestLogicalPlanGenerator {
"store g into 'y';";
generateLogicalPlan( query );
}
-
+
@Test
- public void test15() {
+ public void test15() throws Exception {
String query = "a = load 'x1' using PigStorage() as (name, age, gpa);" +
"b = group a all;" +
"c = foreach b generate AVG(a.age) as avg; " +
@@ -209,9 +225,9 @@ public class TestLogicalPlanGenerator {
"store y into 'y';";
generateLogicalPlan( query );
}
-
+
@Test
- public void test16() {
+ public void test16() throws Exception {
String query = "AA = load 'x';" +
"A = foreach (group (filter AA by $0 > 0) all) generate flatten($1);" +
"store A into 'y';";
@@ -219,48 +235,48 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void test17() {
+ public void test17() throws Exception {
String query = "store ( load 'x' ) into 'y';";
generateLogicalPlan( query );
}
@Test
- public void test18() {
+ public void test18() throws Exception {
String query = "A = load 'x';\n" +
"C = group (foreach A generate $0 parallel 5) all;";
generateLogicalPlan( query );
}
@Test
- public void test19() {
+ public void test19() throws Exception {
String query = "A = load 'x' as (u:map[], v);\n" +
"B = foreach A { T = (chararray)u#'hello'#'world'; generate T; };";
generateLogicalPlan( query );
}
-
+
@Test
- public void test20() {
+ public void test20() throws Exception {
String query = "A = load 'x' using PigStorage() as (a:int,b:chararray);\n" +
"B = foreach A { C = TOMAP()#'key1'; generate C as C; };";
generateLogicalPlan( query );
}
-
+
@Test
- public void test21() {
+ public void test21() throws Exception {
String query = "A = load 'x' as (u, v);\n" +
"B = foreach A { S = u; T = org.apache.pig.builtin.TOMAP(); generate S, T;};";
generateLogicalPlan( query );
}
-
+
@Test
- public void test22() {
+ public void test22() throws Exception {
String query = "A = (load 'x' as (u, v));\n" +
"B = (group (foreach A generate $0 parallel 5) all);";
generateLogicalPlan( query );
}
-
+
@Test
- public void test23() {
+ public void test23() throws Exception {
String query = "a = (load 'x1' using PigStorage() as (name, age, gpa));" +
"b = (group a all);" +
"c = (foreach b generate AVG(a.age) as avg); " +
@@ -271,9 +287,9 @@ public class TestLogicalPlanGenerator {
"store y into 'y';";
generateLogicalPlan( query );
}
-
+
@Test
- public void test24() {
+ public void test24() throws Exception {
String query = "a = (load 'x1' using PigStorage() as (name, age:int, gpa));" +
"b = (load 'x2' as (name, age, registration, contributions));" +
"e = (cogroup a by name, b by name parallel 8);" +
@@ -282,10 +298,10 @@ public class TestLogicalPlanGenerator {
"(store g into 'y');";
generateLogicalPlan( query );
}
-
+
@Test
- public void test25() {
- String query = "A = (load 'x' as ( u:int, v:long, w:bytearray)); " +
+ public void test25() throws Exception {
+ String query = "A = (load 'x' as ( u:int, v:long, w:bytearray)); " +
"B = (distinct A partition by org.apache.pig.Identity); " +
"C = (sample B 0.49); " +
"D = (order C by $0, $1); " +
@@ -297,48 +313,48 @@ public class TestLogicalPlanGenerator {
"L = (store J into 'output');";
generateLogicalPlan( query );
}
-
+
+ @Test
+ public void testCubeBasic() throws Exception {
+ String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
+ + "b = cube a by cube(x,y);"
+ + "c = foreach b generate flatten(group) as (x,y), COUNT(cube) as count, SUM(cube.z) as total;"
+ + "store c into 'output';";
+ generateLogicalPlan(query);
+ }
+
+ @Test
+ public void testCubeMultipleIAlias() throws Exception {
+ String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
+ + "a = load 'input' as (x,y:chararray,z:long);"
+ + "a = load 'input' as (x:chararray,y:chararray,z:long);"
+ + "b = cube a by rollup(x,y);"
+ + "c = foreach b generate flatten(group) as (x,y), COUNT(cube) as count, SUM(cube.z) as total;"
+ + "store c into 'c';";
+ generateLogicalPlan(query);
+ }
+
@Test
- public void testCubeBasic() {
- String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
- + "b = cube a by cube(x,y);"
- + "c = foreach b generate flatten(group) as (x,y), COUNT(cube) as count, SUM(cube.z) as total;"
- + "store c into 'output';";
- generateLogicalPlan(query);
- }
-
- @Test
- public void testCubeMultipleIAlias() {
- String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
- + "a = load 'input' as (x,y:chararray,z:long);"
- + "a = load 'input' as (x:chararray,y:chararray,z:long);"
- + "b = cube a by rollup(x,y);"
- + "c = foreach b generate flatten(group) as (x,y), COUNT(cube) as count, SUM(cube.z) as total;"
- + "store c into 'c';";
- generateLogicalPlan(query);
- }
-
- @Test
- public void testCubeAfterForeach() {
- String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
- + "b = foreach a generate x as type,y as location,z as number;"
- + "c = cube b by cube(type,location);"
- + "d = foreach c generate flatten(group) as (type,location), COUNT(cube) as count, SUM(cube.number) as total;"
- + "store d into 'd';";
- generateLogicalPlan(query);
+ public void testCubeAfterForeach() throws Exception {
+ String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
+ + "b = foreach a generate x as type,y as location,z as number;"
+ + "c = cube b by cube(type,location);"
+ + "d = foreach c generate flatten(group) as (type,location), COUNT(cube) as count, SUM(cube.number) as total;"
+ + "store d into 'd';";
+ generateLogicalPlan(query);
}
-
+
@Test
- public void testFilter() {
- String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
+ public void testFilter() throws Exception {
+ String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
"B = filter A by 2 > 1;\n" +
"store B into 'y';";
generateLogicalPlan( query );
}
@Test
- public void testScopedAlias() {
- String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" +
+ public void testScopedAlias() throws Exception {
+ String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" +
"B = load 'y' as ( u:int, x:int, y:chararray);" +
"C = join A by u, B by u;" +
"D = foreach C generate A::u, B::u, v, x;" +
@@ -347,93 +363,83 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void testConstantWithNegativeValue() {
- String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" +
+ public void testConstantWithNegativeValue() throws Exception {
+ String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" +
"B = foreach A generate u, { ( -1, -15L, -3.5, -4.03F, -2.3e3 ) };" +
"store B into 'y';";
generateLogicalPlan ( query );
}
- @Test
- public void testNegative1() {
+ @Test(expected = NonProjectExpressionException.class)
+ public void testNegative1() throws Exception {
String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
"B = foreach A { S = c * 2; T = limit S 100; generate T; };" +
"store B into 'y';";
- try {
- ParserTestingUtils.generateLogicalPlan( query );
- } catch(Exception ex) {
- Assert.assertTrue( ex instanceof NonProjectExpressionException );
- return;
- }
- Assert.fail( "Query is supposed to be failing." );
+ ParserTestingUtils.generateLogicalPlan( query );
}
-
- @Test
- public void testNegative2() {
- String query = "ship = load 'x';";
- try {
+
+ @Test(expected = MismatchedTokenException.class)
+ public void testNegative2() throws Exception {
+ String query = "ship = load 'x';";
+ try {
ParserTestingUtils.generateLogicalPlan( query );
} catch(Exception ex) {
- Assert.assertTrue( ex instanceof MismatchedTokenException );
MismatchedTokenException mex = (MismatchedTokenException)ex;
- Assert.assertTrue( mex.token.getText().equals("ship") );
- return;
+ assertTrue( mex.token.getText().equals("ship") );
+ throw ex;
}
- Assert.fail( "Query is supposed to be failing." );
}
- @Test
- public void testNegative3() {
+ @Test(expected = MismatchedTokenException.class)
+ public void testNegative3() throws Exception {
String query = "A = load 'y'; all = load 'x';";
try {
ParserTestingUtils.generateLogicalPlan( query );
} catch(Exception ex) {
- Assert.assertTrue( ex instanceof MismatchedTokenException );
MismatchedTokenException mex = (MismatchedTokenException)ex;
- Assert.assertTrue( mex.token.getText().equals("all") );
- return;
+ assertTrue( mex.token.getText().equals("all") );
+ throw ex;
}
- Assert.fail( "Query is supposed to be failing." );
}
@Test
- public void testMultilineFunctionArgument() {
+ public void testMultilineFunctionArgument() throws Exception {
String query = "LOAD 'testIn' \n" +
"USING PigStorage ('\n');";
generateLogicalPlan( query );
}
-
+
@Test
// See PIG-2320
- public void testInlineOpInGroup() {
+ public void testInlineOpInGroup() throws Exception {
String query = "a = load 'data1' as (x:int); \n" +
"a_1 = filter (group a by x) by COUNT(a) > 0;";
generateLogicalPlan( query );
}
@Test
- public void testRank01() {
+ public void testRank01() throws Exception {
String query = "A = LOAD 'data4' AS (name:chararray,surname:chararray,sales:double,code:int);"
+ "B = rank A by sales;" + "store B into 'rank01_test';";
generateLogicalPlan(query);
}
@Test
- public void testRank02() {
+ public void testRank02() throws Exception {
String query = "A = LOAD 'data4' AS (name:chararray,surname:chararray,sales:double,code:int);"
+ "C = rank A by sales DENSE;" + "store C into 'rank02_test';";
generateLogicalPlan(query);
}
@Test
- public void testRank03() {
+ public void testRank03() throws Exception {
String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+ "B = rank A;" + "store B into 'rank03_test';";
generateLogicalPlan(query);
}
@Test
- public void testRank04() {
+ public void testRank04() throws Exception {
String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+ "C = rank A by postalcode DESC;"
+ "store C into 'rank04_test';";
@@ -441,7 +447,7 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void testRank05() {
+ public void testRank05() throws Exception {
String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+ "D = rank A by postalcode DENSE;"
+ "store D into 'rank05_test';";
@@ -449,23 +455,61 @@ public class TestLogicalPlanGenerator {
}
@Test
- public void testRank06() {
+ public void testRank06() throws Exception {
String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+ "C = rank A by x..rz;";
generateLogicalPlan(query);
}
@Test
- public void testRank07() {
+ public void testRank07() throws Exception {
String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+ "C = rank A by x ASC, y DESC;";
generateLogicalPlan(query);
}
@Test
- public void testRank08() {
+ public void testRank08() throws Exception {
String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+ "C = rank A;";
generateLogicalPlan(query);
}
+
+ // See: PIG-2937
+ @Test
+ public void testRelationAliasInNestedForeachWhereUnspecified() throws Exception {
+ Data data = resetData(pigServer);
+ List<Tuple> values = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ values.add(tuple(i % 3 == 0 ? null : "a", "b"));
+ }
+ data.set("foo", values);
+ pigServer.registerQuery("raw_data = load 'foo' using mock.Storage() as (field_a:chararray, field_b:chararray);");
+ pigServer.registerQuery("records = foreach raw_data {" +
+ " generated_field = (field_a is null ? '-' : field_b);"+
+ " GENERATE" +
+ " field_a," +
+ " field_b," +
+ " generated_field; }");
+ pigServer.registerQuery("use_records = foreach records generate generated_field, CONCAT(generated_field,generated_field);");
+ Schema expectedSchema = Utils.getSchemaFromString("field_a:chararray, field_b:chararray, generated_field:chararray");
+ assertEquals(expectedSchema, pigServer.dumpSchema("records"));
+ for (Iterator<Tuple> it = pigServer.openIterator("records"); it.hasNext();) {
+ Tuple t = it.next();
+ String a = (String)t.get(0);
+ String b = (String)t.get(1);
+ assertEquals("b", b);
+ if (a == null) {
+ assertEquals("-", t.get(2));
+ } else {
+ assertEquals("a", a);
+ assertEquals(b, t.get(2));
+ }
+ }
+ for (Iterator<Tuple> it = pigServer.openIterator("use_records"); it.hasNext();) {
+ Tuple t = it.next();
+ String x = (String)t.get(0);
+ assertEquals(x+x, t.get(1));
+ }
+ }
}