You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/01/15 01:55:17 UTC
svn commit: r1059219 - in /pig/trunk: src/org/apache/pig/newplan/logical/
src/org/apache/pig/newplan/logical/expression/
src/org/apache/pig/newplan/logical/relational/ src/org/apache/pig/parser/
test/org/apache/pig/parser/
Author: thejas
Date: Sat Jan 15 00:55:16 2011
New Revision: 1059219
URL: http://svn.apache.org/viewvc?rev=1059219&view=rev
Log:
PIG-1618: Switch to new parser generator technology - NewParser-11.patch - (xuefuz via thejas)
Added:
pig/trunk/src/org/apache/pig/parser/InvalidCommandException.java
pig/trunk/src/org/apache/pig/parser/InvalidPathException.java
pig/trunk/src/org/apache/pig/parser/NonProjectExpressionException.java
pig/trunk/src/org/apache/pig/parser/PlanGenerationFailureException.java
Modified:
pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
pig/trunk/src/org/apache/pig/parser/AstValidator.g
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
pig/trunk/src/org/apache/pig/parser/QueryParser.g
pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
Modified: pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Sat Jan 15 00:55:16 2011
@@ -163,7 +163,7 @@ public class LogicalPlanMigrationVistor
switch(type) {
case REPLICATED:
newType = org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE.REPLICATED;
- break;
+ break;
case SKEWED:
newType = org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE.SKEWED;
break;
@@ -367,7 +367,7 @@ public class LogicalPlanMigrationVistor
public void visit(LOStore store) throws VisitorException{
org.apache.pig.newplan.logical.relational.LOStore newStore =
- new org.apache.pig.newplan.logical.relational.LOStore(logicalPlan, store.getOutputFile());
+ new org.apache.pig.newplan.logical.relational.LOStore(logicalPlan, store.getOutputFile());
newStore.setAlias(store.getAlias());
newStore.setRequestedParallelism(store.getRequestedParallelism());
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java Sat Jan 15 00:55:16 2011
@@ -47,22 +47,28 @@ import org.apache.pig.newplan.logical.re
*
*/
public class DereferenceExpression extends ColumnExpression {
+ private List<Object> rawColumns = new ArrayList<Object>();
- private List<Integer> columns;// The column in the input bag which the project references.
+ private List<Integer> columns = new ArrayList<Integer>();// The column in the input bag which the project references.
// Count is zero based.
+
+ public DereferenceExpression(OperatorPlan plan) {
+ super( "Dereference", plan );
+ plan.add( this );
+ }
public DereferenceExpression(OperatorPlan plan, int colNum) {
- super( "Dereference", plan );
- columns = new ArrayList<Integer>();
+ this( plan );
columns.add(colNum);
- plan.add(this);
}
public DereferenceExpression(OperatorPlan plan, List<Integer> columnNums) {
- super( "Dereference", plan );
- columns = new ArrayList<Integer>();
+ this( plan );
columns.addAll(columnNums);
- plan.add(this);
+ }
+
+ public void setRawColumns(List<Object> cols) {
+ rawColumns.addAll( cols );
}
/**
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java Sat Jan 15 00:55:16 2011
@@ -45,6 +45,8 @@ public class ProjectExpression extends C
// greater than 0.
private int col; // The column in the input which the project references.
// Count is zero based.
+ private String alias; // The alias of the projected field.
+
private LogicalRelationalOperator attachedRelationalOp;
@@ -64,6 +66,15 @@ public class ProjectExpression extends C
this.attachedRelationalOp = attachedRelationalOp;
}
+ public ProjectExpression(OperatorPlan plan, int inputNum, String alias,
+ LogicalRelationalOperator attachedRelationalOp) {
+ super("Project", plan);
+ input = inputNum;
+ this.alias = alias;
+ plan.add(this);
+ this.attachedRelationalOp = attachedRelationalOp;
+ }
+
/**
* @link org.apache.pig.newplan.Operator#accept(org.apache.pig.newplan.PlanVisitor)
*/
@@ -101,6 +112,10 @@ public class ProjectExpression extends C
return col;
}
+ public String getColAlias() {
+ return alias;
+ }
+
/**
* Set the column number for this project. This should only be called by
* ProjectionPatcher. Stupid Java needs friends.
@@ -236,7 +251,9 @@ public class ProjectExpression extends C
else
msg.append("null");
msg.append(" Input: " + input + " Column: ");
- if (isProjectStar())
+ if( alias != null )
+ msg.append( alias );
+ else if (isProjectStar())
msg.append("(*)");
else
msg.append(col);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Sat Jan 15 00:55:16 2011
@@ -283,6 +283,20 @@ public class LOCogroup extends LogicalRe
return mExpressionPlans;
}
+ public void setExpressionPlans(MultiMap<Integer,LogicalExpressionPlan> plans) {
+ this.mExpressionPlans = plans;
+ }
+
+ public void setGroupType(GROUPTYPE gt) {
+ mGroupType = gt;
+ }
+
+ public void setInnerFlags(boolean[] flags) {
+ if( flags != null ) {
+ mIsInner = Arrays.copyOf( flags, flags.length );
+ }
+ }
+
public boolean[] getInner() {
return mIsInner;
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Sat Jan 15 00:55:16 2011
@@ -41,10 +41,18 @@ public class LOGenerate extends LogicalR
private List<LogicalSchema> uidOnlySchemas = null;
public LOGenerate(OperatorPlan plan, List<LogicalExpressionPlan> ps, boolean[] flatten) {
- super("LOGenerate", plan);
+ this( plan );
outputPlans = ps;
flattenFlags = flatten;
}
+
+ public void setOutputPlans(List<LogicalExpressionPlan> plans) {
+ this.outputPlans = plans;
+ }
+
+ public LOGenerate(OperatorPlan plan) {
+ super( "LOGenerate", plan );
+ }
@Override
public LogicalSchema getSchema() throws FrontendException {
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java Sat Jan 15 00:55:16 2011
@@ -50,6 +50,17 @@ public class LOInnerLoad extends Logical
this.foreach = foreach;
}
+ public LOInnerLoad(OperatorPlan plan, LOForEach foreach, String colAlias) {
+ super("LOInnerLoad", plan);
+
+ // store column number as a ProjectExpression in a plan
+ // to be able to dynamically adjust column number during optimization
+ LogicalExpressionPlan exp = new LogicalExpressionPlan();
+
+ this.prj = new ProjectExpression( exp, 0, colAlias, foreach );
+ this.foreach = foreach;
+ }
+
@Override
public LogicalSchema getSchema() throws FrontendException {
if (schema!=null)
@@ -130,7 +141,9 @@ public class LOInnerLoad extends Logical
}
msg.append("(Name: " + name);
msg.append("[");
- if (getProjection().getColNum()==-1)
+ if( getProjection().getColAlias() != null )
+ msg.append( getProjection().getColAlias() );
+ else if (getProjection().getColNum()==-1)
msg.append("*");
else
msg.append(getProjection().getColNum());
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Sat Jan 15 00:55:16 2011
@@ -69,6 +69,18 @@ public class LOJoin extends LogicalRelat
mJoinType = jt;
mInnerFlags = isInner;
}
+
+ public void setJoinPlans(MultiMap<Integer, LogicalExpressionPlan> joinPlans) {
+ this.mJoinPlans = joinPlans;
+ }
+
+ public void setInnerFlags(boolean[] isInner) {
+ this.mInnerFlags = isInner;
+ }
+
+ public void setJoinType(JOINTYPE jt) {
+ this.mJoinType = jt;
+ }
public boolean isInner(int inputIndex) {
return mInnerFlags[inputIndex];
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java Sat Jan 15 00:55:16 2011
@@ -25,7 +25,6 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.SortColInfo;
import org.apache.pig.SortInfo;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanVisitor;
@@ -36,17 +35,20 @@ public class LOSort extends LogicalRelat
private List<Boolean> mAscCols;
private FuncSpec mSortFunc;
private boolean mIsStar = false;
- private long limit;
+ private long limit = -1;
private List<LogicalExpressionPlan> mSortColPlans;
+
+ public LOSort(OperatorPlan plan) {
+ super("LOSort", plan);
+ }
public LOSort(OperatorPlan plan, List<LogicalExpressionPlan> sortColPlans,
List<Boolean> ascCols,
FuncSpec sortFunc ) {
- super("LOSort", plan);
+ this( plan );
mSortColPlans = sortColPlans;
mAscCols = ascCols;
mSortFunc = sortFunc;
- limit = -1;
}
public List<LogicalExpressionPlan> getSortColPlans() {
Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Sat Jan 15 00:55:16 2011
@@ -268,7 +268,16 @@ var_expr : projectable_expr ( dot_proj |
projectable_expr: func_eval | col_ref | bin_expr
;
-dot_proj : ^( PERIOD col_ref+ )
+dot_proj : ^( PERIOD col_alias_or_index+ )
+;
+
+col_alias_or_index : col_alias | col_index
+;
+
+col_alias : GROUP | IDENTIFIER
+;
+
+col_index : DOLLAR^ INTEGER
;
pound_proj : ^( POUND ( QUOTEDSTRING | NULL ) )
@@ -331,7 +340,7 @@ foreach_clause : ^( FOREACH rel foreach_
;
foreach_plan : ^( FOREACH_PLAN nested_blk )
- | ^( FOREACH_PLAN generate_clause parallel_clause? )
+ | ^( FOREACH_PLAN_SIMPLE generate_clause parallel_clause? )
;
nested_blk
@@ -344,7 +353,11 @@ generate_clause : ^( GENERATE flatten_ge
;
nested_command
- : ^( NESTED_CMD IDENTIFIER ( expr | nested_op ) )
+ : ^( NESTED_CMD IDENTIFIER nested_op )
+ {
+ $nested_blk::ids.add( $IDENTIFIER.text );
+ }
+ | ^( NESTED_CMD_ASSI IDENTIFIER expr )
{
$nested_blk::ids.add( $IDENTIFIER.text );
}
@@ -357,30 +370,23 @@ nested_op : nested_proj
| nested_limit
;
-nested_proj : ^( NESTED_PROJ col_ref col_ref_list )
-;
-
-col_ref_list : col_ref+
+nested_proj : ^( NESTED_PROJ col_ref col_ref+ )
;
-nested_alias_ref
- : IDENTIFIER
- {
- validateAliasRef( $nested_blk::ids, $IDENTIFIER.text );
- }
+nested_filter
+ : ^( FILTER nested_op_input cond )
;
-nested_filter
- : ^( FILTER ( nested_alias_ref | nested_proj | expr ) cond )
+nested_sort : ^( ORDER nested_op_input order_by_clause func_clause? )
;
-nested_sort : ^( ORDER ( nested_alias_ref | nested_proj | expr ) order_by_clause func_clause? )
+nested_distinct : ^( DISTINCT nested_op_input )
;
-nested_distinct : ^( DISTINCT ( nested_alias_ref | nested_proj | expr ) )
+nested_limit : ^( LIMIT nested_op_input INTEGER )
;
-nested_limit : ^( LIMIT ( nested_alias_ref | nested_proj | expr ) INTEGER )
+nested_op_input : col_ref | nested_proj
;
stream_clause : ^( STREAM rel ( EXECCOMMAND | IDENTIFIER ) as_clause? )
Added: pig/trunk/src/org/apache/pig/parser/InvalidCommandException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/InvalidCommandException.java?rev=1059219&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/InvalidCommandException.java (added)
+++ pig/trunk/src/org/apache/pig/parser/InvalidCommandException.java Sat Jan 15 00:55:16 2011
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import org.antlr.runtime.IntStream;
+import org.antlr.runtime.RecognitionException;
+
+public class InvalidCommandException extends RecognitionException {
+ private static final long serialVersionUID = 1L;
+
+ private String cmd;
+
+ public InvalidCommandException(IntStream input, String cmd) {
+ super( input );
+ this.cmd = cmd;
+ }
+
+ public String toString() {
+ return "Ill-formed command line: " + cmd;
+ }
+
+ public String getCmd() {
+ return cmd;
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/parser/InvalidPathException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/InvalidPathException.java?rev=1059219&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/InvalidPathException.java (added)
+++ pig/trunk/src/org/apache/pig/parser/InvalidPathException.java Sat Jan 15 00:55:16 2011
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import java.net.MalformedURLException;
+
+import org.antlr.runtime.IntStream;
+import org.antlr.runtime.RecognitionException;
+
+public class InvalidPathException extends RecognitionException {
+ private static final long serialVersionUID = 1L;
+
+ private MalformedURLException ex;
+
+ public InvalidPathException(IntStream input, MalformedURLException ex) {
+ super( input );
+ this.ex = ex;
+ }
+
+ public String toString() {
+ return "Malformed URL for JAR path in the command line: " + ex;
+ }
+
+ public MalformedURLException getEx() {
+ return ex;
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Sat Jan 15 00:55:16 2011
@@ -22,10 +22,12 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.antlr.runtime.IntStream;
import org.antlr.runtime.RecognitionException;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
@@ -58,6 +60,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
@@ -75,6 +78,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
public class LogicalPlanBuilder {
private LogicalPlan plan = new LogicalPlan();
@@ -105,12 +109,16 @@ public class LogicalPlanBuilder {
LogicalPlan getPlan() {
return plan;
}
+
+ LOFilter createFilterOp() {
+ return new LOFilter( plan );
+ }
- String buildFilterOp(String alias, Integer parallel, String inputAlias, LogicalExpressionPlan expr) {
- LOFilter op = new LOFilter( plan, expr );
+ String buildFilterOp(LOFilter op, String alias, Integer parallel, String inputAlias, LogicalExpressionPlan expr) {
+ op.setFilterPlan( expr );
return buildOp( op, alias, parallel, inputAlias, null );
}
-
+
String buildDistinctOp(String alias, Integer parallel, String inputAlias, String partitioner) {
LODistinct op = new LODistinct( plan );
return buildOp( op, alias, parallel, inputAlias, partitioner );
@@ -128,7 +136,7 @@ public class LogicalPlanBuilder {
new LogicalFieldSchema( null , null, DataType.DOUBLE ) );
UserFuncExpression udf = new UserFuncExpression( filterPlan, new FuncSpec( RANDOM.class.getName() ) );
new LessThanEqualExpression( filterPlan, udf, konst );
- return buildFilterOp( alias, parallel, inputAlias, filterPlan );
+ return buildFilterOp( new LOFilter( plan ), alias, parallel, inputAlias, filterPlan );
}
String buildUnionOp(String alias, Integer parallel, List<String> inputAliases) {
@@ -141,8 +149,13 @@ public class LogicalPlanBuilder {
return buildOp( op, null, null, inputAlias, null );
}
- String buildSplitOutputOp(String alias, Integer parallel, String inputAlias, LogicalExpressionPlan filterPlan) {
- LOSplitOutput op = new LOSplitOutput( plan, filterPlan );
+ LOSplitOutput createSplitOutputOp() {
+ return new LOSplitOutput( plan );
+ }
+
+ String buildSplitOutputOp(LOSplitOutput op, String alias, Integer parallel, String inputAlias,
+ LogicalExpressionPlan filterPlan) {
+ op.setFilterPlan( filterPlan );
return buildOp ( op, alias, parallel, inputAlias, null );
}
@@ -151,29 +164,48 @@ public class LogicalPlanBuilder {
return buildOp ( op, alias, parallel, inputAliases, partitioner );
}
- String buildOrderOp(String alias, Integer parallel, String inputAlias, List<LogicalExpressionPlan> plans,
+ LOSort createSortOp() {
+ return new LOSort( plan );
+ }
+
+ String buildSortOp(LOSort sort, String alias, Integer parallel, String inputAlias, List<LogicalExpressionPlan> plans,
List<Boolean> ascFlags, FuncSpec fs) {
- LOSort op = new LOSort( plan, plans, ascFlags, fs );
- return buildOp( op, alias, parallel, inputAlias, null );
+ sort.setSortColPlans( plans );
+ sort.setUserFunc( fs );
+ sort.setAscendingCols( ascFlags );
+ return buildOp( sort, alias, parallel, inputAlias, null );
+ }
+
+ LOJoin createJoinOp() {
+ return new LOJoin( plan );
}
- String buildJoinOp(String alias, Integer parallel, List<String> inputAliases, MultiMap<Integer, LogicalExpressionPlan> joinPlans,
+ String buildJoinOp(LOJoin op, String alias, Integer parallel, List<String> inputAliases,
+ MultiMap<Integer, LogicalExpressionPlan> joinPlans,
JOINTYPE jt, List<Boolean> innerFlags, String partitioner) {
boolean[] flags = new boolean[innerFlags.size()];
for( int i = 0; i < innerFlags.size(); i++ ) {
flags[i] = innerFlags.get( i );
}
- LOJoin op = new LOJoin( plan, joinPlans, jt, flags );
+ op.setJoinType( jt );
+ op.setInnerFlags( flags );
+ op.setJoinPlans( joinPlans );
return buildOp( op, alias, parallel, inputAliases, partitioner );
}
- String buildGroupOp(String alias, Integer parallel, List<String> inputAliases,
+ LOCogroup createGroupOp() {
+ return new LOCogroup( plan );
+ }
+
+ String buildGroupOp(LOCogroup op, String alias, Integer parallel, List<String> inputAliases,
MultiMap<Integer, LogicalExpressionPlan> expressionPlans, GROUPTYPE gt, List<Boolean> innerFlags) {
boolean[] flags = new boolean[innerFlags.size()];
for( int i = 0; i < innerFlags.size(); i++ ) {
flags[i] = innerFlags.get( i );
}
- LOCogroup op = new LOCogroup( plan, expressionPlans, gt, flags );
+ op.setExpressionPlans( expressionPlans );
+ op.setGroupType( gt );
+ op.setInnerFlags( flags );
return buildOp( op, alias, parallel, inputAliases, null );
}
@@ -211,28 +243,126 @@ public class LogicalPlanBuilder {
return buildOp( op, alias, parallel, inputAlias, null );
}
- String buildForeachOp(String alias, Integer parallel, String inputAlias, LogicalPlan innerPlan) {
- LOForEach op = new LOForEach( plan );
+ LOForEach createForeachOp() {
+ return new LOForEach( plan );
+ }
+
+ String buildForeachOp(LOForEach op, String alias, Integer parallel, String inputAlias, LogicalPlan innerPlan) {
op.setInnerPlan( innerPlan );
return buildOp( op, alias, parallel, inputAlias, null );
}
- void buildGenerateOp(LogicalPlan plan, List<LogicalExpressionPlan> exprPlans, List<Boolean> flattenFlags,
+ LOGenerate createGenerateOp(LogicalPlan plan) {
+ return new LOGenerate( plan );
+ }
+
+ static void buildGenerateOp(LOForEach foreach, LOGenerate gen, Map<String, Operator> operators,
+ List<LogicalExpressionPlan> exprPlans, List<Boolean> flattenFlags,
List<LogicalSchema> schemas) {
boolean[] flags = new boolean[ flattenFlags.size() ];
for( int i = 0; i < flattenFlags.size(); i++ )
flags[i] = flattenFlags.get( i );
- LOGenerate op = new LOGenerate( plan, exprPlans, flags );
- op.setUserDefinedSchema( schemas );
- plan.add( op );
+ LogicalPlan innerPlan = (LogicalPlan)gen.getPlan();
+ ArrayList<Operator> inputs = new ArrayList<Operator>();
+ for( LogicalExpressionPlan exprPlan : exprPlans ) {
+ processExpressionPlan( foreach, innerPlan, exprPlan, operators, inputs );
+ }
+
+ gen.setOutputPlans( exprPlans );
+ gen.setFlattenFlags( flags );
+ gen.setUserDefinedSchema( schemas );
+ innerPlan.add( gen );
+ for( Operator input : inputs ) {
+ innerPlan.connect( input, gen );
+ }
+ }
+
+ /**
+ * Process expression plans of LOGenerate and set inputs relation
+ * for the ProjectExpression
+ * @param foreach
+ * @param lp Logical plan in which the LOGenerate is in
+ * @param plan One of the output expression of the LOGenerate
+ * @param operators All logical operators in lp;
+ * @param inputs inputs of the LOGenerate
+ */
+ static void processExpressionPlan(LOForEach foreach,
+ LogicalPlan lp,
+ LogicalExpressionPlan plan,
+ Map<String, Operator> operators,
+ ArrayList<Operator> inputs ) {
+ List<Operator> sinks = plan.getSinks();
+ for( Operator sink : sinks ) {
+ //check all ProjectExpression
+ if( sink instanceof ProjectExpression ) {
+ ProjectExpression projExpr = (ProjectExpression)sink;
+ String colAlias = projExpr.getColAlias();
+ if( colAlias != null ) {
+ // the project is using a column alias
+ Operator op = operators.get( colAlias );
+ if( op != null ) {
+ // this means the project expression refers to a relation
+ // in the nested foreach
+
+ //add the relation to inputs of LOGenerate and set
+ // projection input
+ int index = inputs.indexOf( op );
+ if( index == -1 ) {
+ index = inputs.size();
+ inputs.add( op );
+ }
+ projExpr.setInputNum( index );
+ } else {
+ // this means the project expression refers to a column
+ // in the input of foreach. Add a LOInnerLoad and use that
+ // as input
+ projExpr.setInputNum( inputs.size() );
+ LOInnerLoad innerLoad = new LOInnerLoad( lp, foreach, colAlias );
+ lp.add( innerLoad );
+ inputs.add( innerLoad );
+ }
+ } else {
+ // the project expression is referring to column in ForEach input
+ // using position (eg $1)
+ projExpr.setInputNum( inputs.size() );
+ LOInnerLoad innerLoad = new LOInnerLoad( lp, foreach, projExpr.getColNum() );
+ lp.add( innerLoad );
+ inputs.add( innerLoad );
+ }
+ }
+ }
+ }
+
+ static Operator buildNestedOperatorInput(LogicalPlan innerPlan, LOForEach foreach,
+ Map<String, Operator> operators, LogicalExpression expr, IntStream input)
+ throws NonProjectExpressionException {
+ OperatorPlan plan = expr.getPlan();
+ Iterator<Operator> it = plan.getOperators();
+ if( !( it.next() instanceof ProjectExpression ) || it.hasNext() ) {
+ throw new NonProjectExpressionException( input, expr );
+ }
+ Operator op = null;
+ ProjectExpression projExpr = (ProjectExpression)expr;
+ String colAlias = projExpr.getColAlias();
+ if( colAlias != null ) {
+ op = operators.get( colAlias );
+ if( op == null ) {
+ op = new LOInnerLoad( innerPlan, foreach, colAlias );
+ innerPlan.add( op );
+ }
+ } else {
+ op = new LOInnerLoad( innerPlan, foreach, projExpr.getColNum() );
+ innerPlan.add( op );
+ }
+ return op;
}
StreamingCommand buildCommand(String cmd, List<String> shipPaths, List<String> cachePaths,
List<HandleSpec> inputHandleSpecs, List<HandleSpec> outputHandleSpecs,
- String logDir, Integer limit) throws RecognitionException {
+ String logDir, Integer limit, IntStream input) throws RecognitionException {
StreamingCommand command = null;
try {
- command = buildCommand( cmd );
+ command = buildCommand( cmd, input );
// Process ship paths
if( shipPaths.size() == 0 ) {
@@ -259,31 +389,32 @@ public class LogicalPlanBuilder {
if( limit != null )
command.setLogFilesLimit( limit );
} catch(IOException e) {
+ throw new PlanGenerationFailureException( input, e );
}
return command;
}
- StreamingCommand buildCommand(String cmd) throws RecognitionException {
+ StreamingCommand buildCommand(String cmd, IntStream input) throws RecognitionException {
try {
return new StreamingCommand( pigContext, splitArgs( cmd ) );
} catch (ParseException e) {
- throw new RecognitionException(); }
+ throw new InvalidCommandException( input, cmd ); }
}
- String buildStreamOp(String alias, Integer parallel, String inputAlias, StreamingCommand command, LogicalSchema schema)
+ String buildStreamOp(String alias, Integer parallel, String inputAlias, StreamingCommand command,
+ LogicalSchema schema, IntStream input)
throws RecognitionException {
try {
LOStream op = new LOStream( plan, pigContext.createExecutableManager(), command, schema );
return buildOp( op, alias, parallel, inputAlias, null );
} catch (ExecException ex) {
- throw new RecognitionException();
+ throw new PlanGenerationFailureException( input, ex );
}
}
- // TODO: create specific exceptions to throw.
String buildNativeOp(Integer parallel, String inputJar, String cmd,
- List<String> paths, String storeAlias, String loadAlias)
+ List<String> paths, String storeAlias, String loadAlias, IntStream input)
throws RecognitionException {
LONative op;
try {
@@ -297,9 +428,9 @@ public class LogicalPlanBuilder {
plan.connect( op, load );
return load.getAlias();
} catch (ParseException e) {
- throw new RecognitionException();
+ throw new InvalidCommandException( input, cmd );
} catch (MalformedURLException e) {
- throw new RecognitionException();
+ throw new InvalidPathException( input, e);
}
}
@@ -338,9 +469,32 @@ public class LogicalPlanBuilder {
return bagFactory.newDefaultBag();
}
- LogicalExpression buildProjectStar(LogicalExpressionPlan plan, String opAlias) {
- LogicalRelationalOperator op = (LogicalRelationalOperator)operators.get( opAlias );
- return new ProjectExpression( plan, 0, -1, op );
+ /**
+ * Build a project expression in foreach inner plan.
+ * The only difference here is that the projection can be for an expression alias, for which
+ * we will return whatever the expression alias represents.
+ */
+ LogicalExpression buildProjectExpr(LogicalExpressionPlan plan, LogicalRelationalOperator op,
+ Map<String, LogicalExpressionPlan> exprPlans, String colAlias, int col) {
+ if( colAlias != null ) {
+ LogicalExpressionPlan exprPlan = exprPlans.get( colAlias );
+ if( exprPlan != null ) {
+ return (LogicalExpression)exprPlan.getSources().get( 0 );// get the root of the plan
+ } else {
+ return new ProjectExpression( plan, 0, colAlias, op );
+ }
+ }
+ return new ProjectExpression( plan, 0, col, op );
+ }
+
+ /**
+ * Build a project expression for a projection present in global plan (not in nested foreach plan).
+ */
+ LogicalExpression buildProjectExpr(LogicalExpressionPlan plan, LogicalRelationalOperator relOp,
+ int input, String colAlias, int col) {
+ if( colAlias != null )
+ return new ProjectExpression( plan, input, colAlias, relOp );
+ return new ProjectExpression( plan, input, col, relOp );
}
LogicalExpression buildUDF(LogicalExpressionPlan plan, String funcName, List<LogicalExpression> args) {
@@ -395,4 +549,84 @@ public class LogicalPlanBuilder {
private static long getNextId() {
return nodeIdGen.getNextNodeId( "test" );
}
+
+ static LOFilter createNestedFilterOp(LogicalPlan plan) {
+ return new LOFilter( plan );
+ }
+
+ // Build operator for foreach inner plan.
+ static Operator buildNestedFilterOp(LOFilter op, LogicalPlan plan, String alias,
+ Operator inputOp, LogicalExpressionPlan expr) {
+ op.setFilterPlan( expr );
+ buildNestedOp( plan, op, alias, inputOp );
+ return op;
+ }
+
+ static Operator buildNestedDistinctOp(LogicalPlan plan, String alias, Operator inputOp) {
+ LODistinct op = new LODistinct( plan );
+ buildNestedOp( plan, op, alias, inputOp );
+ return op;
+ }
+
+ static Operator buildNestedLimitOp(LogicalPlan plan, String alias, Operator inputOp, long limit) {
+ LOLimit op = new LOLimit( plan, limit );
+ buildNestedOp( plan, op, alias, inputOp );
+ return op;
+ }
+
+ private static void buildNestedOp(LogicalPlan plan, LogicalRelationalOperator op, String alias, Operator inputOp) {
+ setAlias( op, alias );
+ plan.add( op );
+ plan.connect( inputOp, op );
+ }
+
+ static LOSort createNestedSortOp(LogicalPlan plan) {
+ return new LOSort( plan );
+ }
+
+ static Operator buildNestedSortOp(LOSort op, LogicalPlan plan, String alias, Operator inputOp,
+ List<LogicalExpressionPlan> plans,
+ List<Boolean> ascFlags, FuncSpec fs) {
+ op.setSortColPlans( plans );
+ op.setAscendingCols( ascFlags );
+ op.setUserFunc( fs );
+ buildNestedOp( plan, op, alias, inputOp );
+ return op;
+ }
+
+ static Operator buildNestedProjectOp(LogicalPlan innerPlan, LOForEach foreach,
+ Map<String, Operator> operators,
+ String alias, ProjectExpression projExpr, List<LogicalExpressionPlan> exprPlans) {
+ Operator input = null;
+ boolean foreachNeeded = !exprPlans.isEmpty();
+ String colAlias = projExpr.getColAlias();
+ if( colAlias != null ) {
+ Operator op = operators.get( colAlias );
+ if( op != null ) {
+ input = op ;
+ if( !foreachNeeded )
+ return op;
+ } else {
+ input = new LOInnerLoad( innerPlan, foreach, colAlias );
+ if( !foreachNeeded && alias != null ) {
+ operators.put( alias , input );
+ }
+ }
+ } else {
+ input = new LOInnerLoad( innerPlan, foreach, projExpr.getColNum() );
+ }
+
+ LogicalPlan lp = new LogicalPlan();
+ boolean[] flatten = new boolean[exprPlans.size()];
+ LOGenerate gen = new LOGenerate( lp, exprPlans, flatten );
+ lp.add( gen );
+ LOForEach f = new LOForEach( innerPlan );
+ f.setInnerPlan( lp );
+ setAlias( f, alias );
+ innerPlan.add( input );
+ innerPlan.add( f );
+ innerPlan.connect( input, f );
+ return f;
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Sat Jan 15 00:55:16 2011
@@ -41,11 +41,13 @@ import org.apache.pig.impl.builtin.GFAny
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.expression.AddExpression;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.BinCondExpression;
import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.DereferenceExpression;
import org.apache.pig.newplan.logical.expression.DivideExpression;
import org.apache.pig.newplan.logical.expression.EqualExpression;
import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
@@ -55,16 +57,26 @@ import org.apache.pig.newplan.logical.ex
import org.apache.pig.newplan.logical.expression.LessThanExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.MapLookupExpression;
import org.apache.pig.newplan.logical.expression.ModExpression;
import org.apache.pig.newplan.logical.expression.MultiplyExpression;
import org.apache.pig.newplan.logical.expression.NegativeExpression;
import org.apache.pig.newplan.logical.expression.NotEqualExpression;
import org.apache.pig.newplan.logical.expression.NotExpression;
import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.expression.RegexExpression;
import org.apache.pig.newplan.logical.expression.SubtractExpression;
import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE;
@@ -81,9 +93,27 @@ private static Log log = LogFactory.getL
private LogicalPlanBuilder builder = new LogicalPlanBuilder();
+private boolean inForeachPlan = false;
+private LogicalRelationalOperator currentOp = null; // Current relational operator that's being built.
+
public LogicalPlan getLogicalPlan() {
return builder.getPlan();
}
+
+public String getErrorMessage(RecognitionException e, String[] tokenNames) {
+ String msg = e.getMessage();
+ if ( e instanceof NonProjectExpressionException ) {
+ NonProjectExpressionException pee = (NonProjectExpressionException)e;
+ msg = "For input to a nested operator, if it's an expression, it can only be " +
+ "a projection expression. The given expression is: " +
+ pee.getExpression().getPlan() + ".";
+ } else {
+ msg = e.toString();
+ }
+
+ return msg;
+}
+
} // End of @members
query : ^( QUERY statement* )
@@ -95,6 +125,10 @@ scope {
String alias; // The alias of the current operator, either given or generated by the parser.
Integer parallel; // Parallelism
String inputAlias; // The alias of the input operator
+ int inputIndex;
+}
+@init {
+ $statement::inputIndex = 0;
}
: general_statement
| foreach_statement
@@ -160,7 +194,7 @@ cmd[String alias] returns[StreamingComma
{
$command = builder.buildCommand( $EXECCOMMAND.text, shipPaths,
cachePaths, $input_clause.inputHandleSpecs, $output_clause.outputHandleSpecs,
- $error_clause.dir == null? $alias : $error_clause.dir, $error_clause.limit );
+ $error_clause.dir == null? $alias : $error_clause.dir, $error_clause.limit, input );
}
;
@@ -349,6 +383,7 @@ scope {
List<Boolean> innerFlags;
}
@init {
+ currentOp = builder.createGroupOp();
$group_clause::groupPlans = new MultiMap<Integer, LogicalExpressionPlan>();
$group_clause::inputAliases = new ArrayList<String>();
$group_clause::innerFlags = new ArrayList<Boolean>();
@@ -356,12 +391,12 @@ scope {
}
: ^( GROUP group_item+ ( group_type { groupType = $group_type.type; } )? )
{
- $alias = builder.buildGroupOp( $statement::alias, $statement::parallel,
+ $alias = builder.buildGroupOp( (LOCogroup)currentOp, $statement::alias, $statement::parallel,
$group_clause::inputAliases, $group_clause::groupPlans, groupType, $group_clause::innerFlags );
}
| ^( COGROUP group_item+ ( group_type { groupType = $group_type.type; } )? )
{
- $alias = builder.buildGroupOp( $statement::alias, $statement::parallel,
+ $alias = builder.buildGroupOp( (LOCogroup)currentOp, $statement::alias, $statement::parallel,
$group_clause::inputAliases, $group_clause::groupPlans, groupType, $group_clause::innerFlags );
}
;
@@ -399,6 +434,7 @@ group_item
$group_clause::inputAliases.add( $statement::inputAlias );
$group_clause::innerFlags.add( inner );
$group_clause::inputIndex++;
+ $statement::inputIndex++;
}
;
@@ -419,7 +455,12 @@ flatten_generated_item returns[LogicalEx
}
: flatten_clause[$plan] { $flattenFlag = true; }
( as_clause { $schema = $as_clause.logicalSchema; } )?
- | ( expr[$plan] | STAR { builder.buildProjectStar( $plan, $statement::inputAlias ); } )
+ | ( expr[$plan]
+ | STAR
+ {
+ builder.buildProjectExpr( $plan, currentOp, $statement::inputIndex, null, -1 );
+ }
+ )
( field { $schema = new LogicalSchema(); $schema.addField( $field.fieldSchema ); } )?
;
@@ -436,10 +477,13 @@ store_clause returns[String alias]
;
filter_clause returns[String alias]
-@init { LogicalExpressionPlan exprPlan = new LogicalExpressionPlan(); }
+@init {
+ LogicalExpressionPlan exprPlan = new LogicalExpressionPlan();
+ currentOp = builder.createFilterOp();
+}
: ^( FILTER rel cond[exprPlan] )
{
- $alias = builder.buildFilterOp( $statement::alias,
+ $alias = builder.buildFilterOp( (LOFilter)currentOp, $statement::alias,
$statement::parallel, $statement::inputAlias, exprPlan );
}
;
@@ -511,7 +555,7 @@ real_arg [LogicalExpressionPlan plan] re
: e = expr[$plan] { $expr = $e.expr; }
| STAR
{
- $expr = builder.buildProjectStar( $plan, $statement::inputAlias );
+ $expr = builder.buildProjectExpr( $plan, currentOp, $statement::inputIndex, null, -1 );
}
;
@@ -538,7 +582,7 @@ expr[LogicalExpressionPlan plan] returns
}
| const_expr[$plan]
{
- $expr = new ConstantExpression( $plan, $const_expr.expr, null );
+ $expr = $const_expr.expr;
}
| var_expr[$plan]
{
@@ -560,7 +604,24 @@ expr[LogicalExpressionPlan plan] returns
;
var_expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
- : p = projectable_expr[$plan] ( dot_proj[$plan] | pound_proj[$plan] )*
+@init {
+ List<Object> columns = new ArrayList<Object>();
+}
+ : projectable_expr[$plan] { $expr = $projectable_expr.expr; }
+ ( dot_proj
+ {
+ DereferenceExpression e = new DereferenceExpression( $plan );
+ e.setRawColumns( $dot_proj.cols );
+ $plan.connect( $expr, e );
+ $expr = e;
+ }
+ | pound_proj
+ {
+ MapLookupExpression e = new MapLookupExpression( $plan, $pound_proj.key, null );
+ $plan.connect( $expr, e );
+ $expr = e;
+ }
+ )*
;
projectable_expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
@@ -570,6 +631,7 @@ projectable_expr[LogicalExpressionPlan p
}
| col_ref[$plan]
{
+ $expr = $col_ref.expr;
}
| bin_expr[$plan]
{
@@ -577,12 +639,27 @@ projectable_expr[LogicalExpressionPlan p
}
;
-dot_proj[LogicalExpressionPlan plan] returns[LogicalExpression expr]
- : ^( PERIOD col_ref[$plan]+ )
+dot_proj returns[List<Object> cols]
+@init {
+ $cols = new ArrayList<Object>();
+}
+ : ^( PERIOD ( col_alias_or_index { $cols.add( $col_alias_or_index.col ); } )+ )
+;
+
+col_alias_or_index returns[Object col]
+ : col_alias { $col = $col_alias.col; } | col_index { $col = $col_index.col; }
+;
+
+col_alias returns[Object col]
+ : GROUP { $col = $GROUP.text; } | IDENTIFIER { $col = $IDENTIFIER.text; }
;
-pound_proj[LogicalExpressionPlan plan] returns[LogicalExpression expr]
- : ^( POUND ( QUOTEDSTRING | NULL ) )
+col_index returns[Object col]
+ : DOLLAR^ INTEGER { $col = Integer.valueOf( $INTEGER.text ); }
+;
+
+pound_proj returns[String key]
+ : ^( POUND ( QUOTEDSTRING { $key = builder.unquote( $QUOTEDSTRING.text ); } | NULL ) )
;
bin_expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
@@ -614,9 +691,12 @@ sample_clause returns[String alias]
;
order_clause returns[String alias]
+@init {
+ currentOp = builder.createSortOp();
+}
: ^( ORDER rel order_by_clause func_clause? )
{
- $alias = builder.buildOrderOp( $statement::alias,
+ $alias = builder.buildSortOp( (LOSort)currentOp, $statement::alias,
$statement::parallel, $statement::inputAlias, $order_by_clause.plans,
$order_by_clause.ascFlags, $func_clause.funcSpec );
}
@@ -629,7 +709,7 @@ order_by_clause returns[List<LogicalExpr
}
: STAR {
LogicalExpressionPlan plan = new LogicalExpressionPlan();
- builder.buildProjectStar( plan, $statement::inputAlias );
+ builder.buildProjectExpr( plan, currentOp, $statement::inputIndex, null, -1 );
$plans.add( plan );
}
( ASC { $ascFlags.add( true ); } | DESC { $ascFlags.add( false ); } )?
@@ -684,13 +764,14 @@ scope {
List<Boolean> innerFlags;
}
@init {
+ currentOp = builder.createJoinOp();
$join_clause::joinPlans = new MultiMap<Integer, LogicalExpressionPlan>();
$join_clause::inputAliases = new ArrayList<String>();
$join_clause::innerFlags = new ArrayList<Boolean>();
}
: ^( JOIN join_sub_clause join_type? partition_clause? )
{
- $alias = builder.buildJoinOp( $statement::alias,
+ $alias = builder.buildJoinOp( (LOJoin)currentOp, $statement::alias,
$statement::parallel, $join_clause::inputAliases, $join_clause::joinPlans,
$join_type.type, $join_clause::innerFlags, $partition_clause.partitioner );
}
@@ -721,6 +802,7 @@ join_item
$join_clause::inputAliases.add( $statement::inputAlias );
$join_clause::joinPlans.put( $join_clause::inputIndex, $join_group_by_clause.plans );
$join_clause::inputIndex++;
+ $statement::inputIndex++;
}
;
@@ -738,7 +820,7 @@ join_group_by_expr returns[LogicalExpres
: expr[$plan]
| STAR
{
- builder.buildProjectStar( $plan, $statement::inputAlias );
+ builder.buildProjectExpr( $plan, currentOp, $statement::inputIndex, null, -1 );
}
;
@@ -752,22 +834,37 @@ union_clause returns[String alias]
;
foreach_clause returns[String alias]
+scope {
+ LOForEach foreachOp;
+}
+@init {
+ $foreach_clause::foreachOp = builder.createForeachOp();
+}
: ^( FOREACH rel foreach_plan )
{
- $alias = builder.buildForeachOp( $statement::alias,
- $statement::parallel, $statement::inputAlias, $foreach_plan::innerPlan );
+ $alias = builder.buildForeachOp( $foreach_clause::foreachOp, $statement::alias,
+ $statement::parallel, $statement::inputAlias, $foreach_plan.plan );
}
;
-foreach_plan
+foreach_plan returns[LogicalPlan plan]
scope {
LogicalPlan innerPlan;
+ Map<String, LogicalExpressionPlan> exprPlans;
+ Map<String, Operator> operators;
}
@init {
- $foreach_plan::innerPlan = new LogicalPlan();
+ inForeachPlan = true;
+ $foreach_plan::innerPlan = new LogicalPlan();
+ $foreach_plan::exprPlans = new HashMap<String, LogicalExpressionPlan>();
+ $foreach_plan::operators = new HashMap<String, Operator>();
+}
+@after {
+ $plan = $foreach_plan::innerPlan;
+ inForeachPlan = false;
}
: ^( FOREACH_PLAN nested_blk )
- | ^( FOREACH_PLAN generate_clause parallel_clause? )
+ | ^( FOREACH_PLAN_SIMPLE generate_clause parallel_clause? )
;
nested_blk : nested_command* generate_clause
@@ -775,6 +872,7 @@ nested_blk : nested_command* generate_cl
generate_clause
@init {
+ currentOp = builder.createGenerateOp( $foreach_plan::innerPlan );
List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>();
List<Boolean> flattenFlags = new ArrayList<Boolean>();
List<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
@@ -788,50 +886,124 @@ generate_clause
)+
)
{
- builder.buildGenerateOp( $foreach_plan::innerPlan, plans, flattenFlags, schemas );
+ builder.buildGenerateOp( $foreach_clause::foreachOp, (LOGenerate)currentOp,
+ $foreach_plan::operators,
+ plans, flattenFlags, schemas );
}
;
-nested_command : ^( NESTED_CMD IDENTIFIER ( expr[null] | nested_op ) )
-;
-
-nested_op : nested_proj
- | nested_filter
- | nested_sort
- | nested_distinct
- | nested_limit
+nested_command
+@init {
+ LogicalExpressionPlan exprPlan = new LogicalExpressionPlan();
+}
+ : ^( NESTED_CMD IDENTIFIER nested_op[$IDENTIFIER.text] )
+ {
+ $foreach_plan::operators.put( $IDENTIFIER.text, $nested_op.op );
+ }
+ |
+ ^( NESTED_CMD_ASSI IDENTIFIER expr[exprPlan] )
+ {
+ $foreach_plan::exprPlans.put( $IDENTIFIER.text, exprPlan );
+ }
;
-nested_proj : ^( NESTED_PROJ col_ref[null] col_ref_list )
+nested_op[String alias] returns[Operator op]
+ : nested_proj[$alias] { $op = $nested_proj.op; }
+ | nested_filter[$alias] { $op = $nested_filter.op; }
+ | nested_sort [$alias] { $op = $nested_sort.op; }
+ | nested_distinct[$alias] { $op = $nested_distinct.op; }
+ | nested_limit[$alias] { $op = $nested_limit.op; }
;
-col_ref_list : col_ref[null]+
+nested_proj[String alias] returns[Operator op]
+@init {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan();
+ List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>();
+}
+ : ^( NESTED_PROJ
+ cr0 = col_ref[plan]
+ ( cr = col_ref[new LogicalExpressionPlan()]
+ {
+ plans.add( (LogicalExpressionPlan)( $cr.expr.getPlan() ) );
+ }
+ )+ )
+ {
+ $op = builder.buildNestedProjectOp( $foreach_plan::innerPlan, $foreach_clause::foreachOp,
+ $foreach_plan::operators, $alias, (ProjectExpression)$cr0.expr, plans );
+ }
;
-nested_alias_ref : IDENTIFIER
+nested_filter[String alias] returns[Operator op]
+@init {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan();
+ Operator inputOp = null;
+ currentOp = builder.createNestedFilterOp( $foreach_plan::innerPlan );
+}
+ : ^( FILTER nested_op_input cond[plan] )
+ {
+ $op = builder.buildNestedFilterOp( (LOFilter)currentOp, $foreach_plan::innerPlan, $alias,
+ $nested_op_input.op, plan );
+ }
;
-nested_filter : ^( FILTER ( nested_alias_ref | nested_proj | expr[null] ) cond[null] )
+nested_sort[String alias] returns[Operator op]
+@init {
+ Operator inputOp = null;
+ currentOp = builder.createNestedSortOp( $foreach_plan::innerPlan );
+}
+ : ^( ORDER nested_op_input order_by_clause func_clause? )
+ {
+ $op = builder.buildNestedSortOp( (LOSort)currentOp, $foreach_plan::innerPlan, $alias,
+ $nested_op_input.op,
+ $order_by_clause.plans, $order_by_clause.ascFlags, $func_clause.funcSpec );
+ }
;
-nested_sort : ^( ORDER ( nested_alias_ref | nested_proj | expr[null] ) order_by_clause func_clause? )
+nested_distinct[String alias] returns[Operator op]
+@init {
+ Operator inputOp = null;
+}
+ : ^( DISTINCT nested_op_input )
+ {
+ $op = builder.buildNestedDistinctOp( $foreach_plan::innerPlan, $alias, $nested_op_input.op );
+ }
;
-nested_distinct : ^( DISTINCT ( nested_alias_ref | nested_proj | expr[null] ) )
+nested_limit[String alias] returns[Operator op]
+@init {
+ Operator inputOp = null;
+}
+ : ^( LIMIT nested_op_input INTEGER )
+ {
+ $op = builder.buildNestedLimitOp( $foreach_plan::innerPlan, $alias, $nested_op_input.op,
+ Integer.valueOf( $INTEGER.text ) );
+ }
;
-nested_limit : ^( LIMIT ( nested_alias_ref | nested_proj | expr[null] ) INTEGER )
+nested_op_input returns[Operator op]
+@init {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan();
+}
+ : col_ref[plan]
+ {
+ $op = builder.buildNestedOperatorInput( $foreach_plan::innerPlan,
+ $foreach_clause::foreachOp, $foreach_plan::operators, $col_ref.expr, input );
+ }
+ | nested_proj[null]
+ {
+ $op = $nested_proj.op;
+ }
;
stream_clause returns[String alias]
@init {
StreamingCommand cmd = null;
}
- : ^( STREAM rel ( EXECCOMMAND { cmd = builder.buildCommand( $EXECCOMMAND.text ); }
+ : ^( STREAM rel ( EXECCOMMAND { cmd = builder.buildCommand( $EXECCOMMAND.text, input ); }
| IDENTIFIER { cmd = builder.lookupCommand( $IDENTIFIER.text ); } ) as_clause? )
{
$alias = builder.buildStreamOp( $statement::alias, $statement::parallel,
- $statement::inputAlias, cmd, $as_clause.logicalSchema );
+ $statement::inputAlias, cmd, $as_clause.logicalSchema, input );
}
;
@@ -847,7 +1019,7 @@ mr_clause returns[String alias]
{
$alias = builder.buildNativeOp( $statement::parallel,
builder.unquote( $QUOTEDSTRING.text ), builder.unquote( $EXECCOMMAND.text ),
- paths, $store_clause.alias, $load_clause.alias );
+ paths, $store_clause.alias, $load_clause.alias, input );
}
;
@@ -857,10 +1029,13 @@ split_clause
;
split_branch
-@init { LogicalExpressionPlan splitPlan = new LogicalExpressionPlan(); }
+@init {
+ LogicalExpressionPlan splitPlan = new LogicalExpressionPlan();
+ currentOp = builder.createSplitOutputOp();
+}
: ^( SPLIT_BRANCH IDENTIFIER cond[splitPlan] )
{
- builder.buildSplitOutputOp( $IDENTIFIER.text,
+ builder.buildSplitOutputOp( (LOSplitOutput)currentOp, $IDENTIFIER.text,
$statement::parallel, $statement::inputAlias, splitPlan );
}
;
@@ -872,11 +1047,29 @@ col_ref[LogicalExpressionPlan plan] retu
alias_col_ref[LogicalExpressionPlan plan] returns[LogicalExpression expr]
: GROUP
+ {
+ $expr = builder.buildProjectExpr( $plan, currentOp,
+ $statement::inputIndex, $GROUP.text, 0 );
+ }
| IDENTIFIER
+ {
+ if( inForeachPlan ) {
+ $expr = builder.buildProjectExpr( $plan, currentOp,
+ $foreach_plan::exprPlans, $IDENTIFIER.text, 0 );
+ } else {
+ $expr = builder.buildProjectExpr( $plan, currentOp,
+ $statement::inputIndex, $IDENTIFIER.text, 0 );
+ }
+ }
;
dollar_col_ref[LogicalExpressionPlan plan] returns[LogicalExpression expr]
: ^( DOLLAR INTEGER )
+ {
+ int col = Integer.valueOf( $INTEGER.text );
+ $expr = builder.buildProjectExpr( $plan, currentOp,
+ $statement::inputIndex, null, col );
+ }
;
const_expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
Added: pig/trunk/src/org/apache/pig/parser/NonProjectExpressionException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/NonProjectExpressionException.java?rev=1059219&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/NonProjectExpressionException.java (added)
+++ pig/trunk/src/org/apache/pig/parser/NonProjectExpressionException.java Sat Jan 15 00:55:16 2011
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import org.antlr.runtime.IntStream;
+import org.antlr.runtime.RecognitionException;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+
+public class NonProjectExpressionException extends RecognitionException {
+ private static final long serialVersionUID = 1L;
+
+ private LogicalExpression expr;
+
+ public NonProjectExpressionException(IntStream input, LogicalExpression expr) {
+ super( input );
+ this.expr = expr;
+ }
+
+ public String toString() {
+ return "Expression is not a project expression: " + expr;
+ }
+
+ public LogicalExpression getExpression() {
+ return expr;
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/parser/PlanGenerationFailureException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/PlanGenerationFailureException.java?rev=1059219&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/PlanGenerationFailureException.java (added)
+++ pig/trunk/src/org/apache/pig/parser/PlanGenerationFailureException.java Sat Jan 15 00:55:16 2011
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import org.antlr.runtime.IntStream;
+import org.antlr.runtime.RecognitionException;
+
+public class PlanGenerationFailureException extends RecognitionException {
+ private static final long serialVersionUID = 1L;
+
+ private Exception ex;
+
+ public PlanGenerationFailureException(IntStream input, Exception ex) {
+ super( input );
+ this.ex = ex;
+ }
+
+ public String toString() {
+ return "Failed to generate logical plan. Nested exception: " + ex;
+ }
+
+ public Exception getEx() {
+ return ex;
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Sat Jan 15 00:55:16 2011
@@ -46,10 +46,12 @@ tokens {
KEY_VAL_PAIR;
TUPLE_DEF;
FIELD;
+ NESTED_CMD_ASSI;
NESTED_CMD;
NESTED_PROJ;
SPLIT_BRANCH;
FOREACH_PLAN;
+ FOREACH_PLAN_SIMPLE;
MAP_TYPE;
TUPLE_TYPE;
BAG_TYPE;
@@ -334,8 +336,18 @@ var_expr : projectable_expr ( dot_proj |
projectable_expr: func_eval | col_ref | bin_expr
;
-dot_proj : PERIOD ( col_ref | ( LEFT_PAREN col_ref ( COMMA col_ref )* RIGHT_PAREN ) )
- -> ^( PERIOD col_ref+ )
+dot_proj : PERIOD ( col_alias_or_index
+ | ( LEFT_PAREN col_alias_or_index ( COMMA col_alias_or_index )* RIGHT_PAREN ) )
+ -> ^( PERIOD col_alias_or_index+ )
+;
+
+col_alias_or_index : col_alias | col_index
+;
+
+col_alias : GROUP | IDENTIFIER
+;
+
+col_index : DOLLAR^ INTEGER
;
pound_proj : POUND^ ( QUOTEDSTRING | NULL )
@@ -420,7 +432,7 @@ foreach_clause : FOREACH^ rel foreach_pl
foreach_plan : nested_blk SEMI_COLON?
-> ^( FOREACH_PLAN nested_blk )
| ( generate_clause parallel_clause? SEMI_COLON )
- -> ^( FOREACH_PLAN generate_clause parallel_clause? )
+ -> ^( FOREACH_PLAN_SIMPLE generate_clause parallel_clause? )
;
nested_blk : LEFT_CURLY! nested_command_list ( generate_clause SEMI_COLON! ) RIGHT_CURLY!
@@ -435,10 +447,10 @@ nested_command_list : ( nested_command S
|
;
-nested_command : IDENTIFIER EQUAL expr
- -> ^( NESTED_CMD IDENTIFIER expr )
- | IDENTIFIER EQUAL nested_op
+nested_command : IDENTIFIER EQUAL nested_op
-> ^( NESTED_CMD IDENTIFIER nested_op )
+ | IDENTIFIER EQUAL expr
+ -> ^( NESTED_CMD_ASSI IDENTIFIER expr )
;
nested_op : nested_proj
@@ -456,19 +468,19 @@ col_ref_list : ( col_ref | ( LEFT_PAREN
-> col_ref+
;
-nested_alias_ref : IDENTIFIER
+nested_filter : FILTER^ nested_op_input BY! cond
;
-nested_filter : FILTER^ ( nested_alias_ref | nested_proj | expr_eval ) BY! cond
+nested_sort : ORDER^ nested_op_input BY! order_by_clause ( USING! func_clause )?
;
-nested_sort : ORDER^ ( nested_alias_ref | nested_proj | expr_eval ) BY! order_by_clause ( USING! func_clause )?
+nested_distinct : DISTINCT^ nested_op_input
;
-nested_distinct : DISTINCT^ ( nested_alias_ref | nested_proj | expr_eval )
+nested_limit : LIMIT^ nested_op_input INTEGER
;
-nested_limit : LIMIT^ ( nested_alias_ref | nested_proj | expr_eval ) INTEGER
+nested_op_input : col_ref | nested_proj
;
stream_clause : STREAM^ rel THROUGH! ( EXECCOMMAND | IDENTIFIER ) as_clause?
Modified: pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java?rev=1059219&r1=1059218&r2=1059219&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java Sat Jan 15 00:55:16 2011
@@ -19,8 +19,11 @@
package org.apache.pig.parser;
+import java.io.IOException;
+
import junit.framework.Assert;
+import org.antlr.runtime.RecognitionException;
import org.junit.Test;
public class TestLogicalPlanGenerator {
@@ -83,4 +86,80 @@ public class TestLogicalPlanGenerator {
"store A into 'y';";
generateLogicalPlan( query );
}
+
+ @Test
+ public void test6() {
+ String query = "A = load 'x' as ( a : int, b, c : chararray );" +
+ "B = group A by ( a, $2 );" +
+ "store B into 'y';";
+ generateLogicalPlan( query );
+ }
+
+ @Test
+ public void test7() {
+ String query = "A = load 'x' as ( a : int, b, c : chararray );" +
+ "B = foreach A generate a, $2;" +
+ "store B into 'y';";
+ generateLogicalPlan( query );
+ }
+
+ @Test
+ public void test8() {
+ String query = "A = load 'x' as ( a : int, b, c : chararray );" +
+ "B = group A by a;" +
+ "C = foreach B { S = A.b; generate S; };" +
+ "store C into 'y';";
+ generateLogicalPlan( query );
+ }
+
+ @Test
+ public void test9() {
+ String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
+ "B = foreach A { R = a; S = R.u; T = limit S 100; generate S, T, c + d/5; };" +
+ "store B into 'y';";
+ generateLogicalPlan( query );
+ }
+
+ @Test
+ public void test10() {
+ String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
+ "B = foreach A { S = a; T = limit S 100; generate T; };" +
+ "store B into 'y';";
+ generateLogicalPlan( query );
+ }
+
+ @Test
+ public void test11() {
+ String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
+ "B = foreach A { T = limit a 100; generate T; };" +
+ "store B into 'y';";
+ generateLogicalPlan( query );
+ }
+
+ @Test
+ public void testFilter() {
+ String query = "A = load 'x' using org.apache.pig.TextLoader( 'a', 'b' ) as ( u:int, v:long, w:bytearray); " +
+ "B = filter A by 2 > 1; ";
+ generateLogicalPlan( query );
+ }
+
+ @Test
+ public void testNegative1() {
+ String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
+ "B = foreach A { S = c * 2; T = limit S 100; generate T; };" +
+ "store B into 'y';";
+ try {
+ ParserTestingUtils.generateLogicalPlan( query );
+ } catch (RecognitionException e) {
+ e.printStackTrace();
+ } catch (ParsingFailureException e) {
+ // Expected exception.
+ e.printStackTrace();
+ Assert.assertEquals( e.getParsingClass(), LogicalPlanGenerator.class );
+ return;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Assert.fail( "Query is supposed to be failing." );
+ }
}