You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/01/04 23:33:16 UTC
svn commit: r1055216 - in /pig/trunk: src/org/apache/pig/newplan/logical/
src/org/apache/pig/newplan/logical/relational/ src/org/apache/pig/parser/
test/org/apache/pig/parser/
Author: thejas
Date: Tue Jan 4 22:33:16 2011
New Revision: 1055216
URL: http://svn.apache.org/viewvc?rev=1055216&view=rev
Log:
PIG-1618: Switch to new parser generator technology - NewParser-10.patch - (xuefuz via thejas)
Modified:
pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
pig/trunk/src/org/apache/pig/parser/AstValidator.g
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
pig/trunk/src/org/apache/pig/parser/QueryLexer.g
pig/trunk/src/org/apache/pig/parser/QueryParser.g
pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
pig/trunk/test/org/apache/pig/parser/TestParser.pig
Modified: pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Tue Jan 4 22:33:16 2011
@@ -130,8 +130,7 @@ public class LogicalPlanMigrationVistor
org.apache.pig.newplan.logical.relational.LOCogroup newCogroup =
new org.apache.pig.newplan.logical.relational.LOCogroup
- (logicalPlan, newExpressionPlans, grouptype, cg.getInner(),
- cg.getRequestedParallelism() );
+ (logicalPlan, newExpressionPlans, grouptype, cg.getInner());
for( int i = 0; i < inputs.size(); i++ ) {
ArrayList<LogicalPlan> plans =
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Tue Jan 4 22:33:16 2011
@@ -76,11 +76,11 @@ public class LOCogroup extends LogicalRe
public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan>
expressionPlans, boolean[] isInner ) {
- this( plan, expressionPlans, GROUPTYPE.REGULAR, isInner, -1 );
+ this( plan, expressionPlans, GROUPTYPE.REGULAR, isInner );
}
public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan>
- expressionPlans, GROUPTYPE groupType, boolean[] isInner, int requestedParrellism) {
+ expressionPlans, GROUPTYPE groupType, boolean[] isInner) {
super("LOCogroup", plan);
this.mExpressionPlans = expressionPlans;
if( isInner != null ) {
Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Tue Jan 4 22:33:16 2011
@@ -84,10 +84,18 @@ private Set<String> aliases = new HashSe
query : ^( QUERY statement* )
;
-statement : general_statement | foreach_statement
+statement : general_statement
+ | foreach_statement
+ | split_statement
;
-general_statement : ^( STATEMENT ( alias { aliases.add( $alias.name ); } )? op_clause INTEGER? )
+split_statement : split_clause
+;
+
+general_statement : ^( STATEMENT ( alias { aliases.add( $alias.name ); } )? op_clause parallel_clause? )
+;
+
+parallel_clause : ^( PARALLEL INTEGER )
;
// We need to handle foreach specifically because of the ending ';', which is not required
@@ -130,10 +138,7 @@ path_list : QUOTEDSTRING+
cache_caluse : ^( CACHE path_list )
;
-input_clause : ^( INPUT stream_cmd_list )
-;
-
-stream_cmd_list : stream_cmd+
+input_clause : ^( INPUT stream_cmd+ )
;
stream_cmd : ^( STDIN func_clause? )
@@ -141,13 +146,10 @@ stream_cmd : ^( STDIN func_clause? )
| ^( QUOTEDSTRING func_clause? )
;
-output_clause : ^( OUTPUT stream_cmd_list )
+output_clause : ^( OUTPUT stream_cmd+ )
;
-error_clause : ^( ERROR error_cmd? )
-;
-
-error_cmd : ^( QUOTEDSTRING INTEGER? )
+error_clause : ^( ERROR QUOTEDSTRING INTEGER? )
;
load_clause : ^( LOAD filename func_clause? as_clause? )
@@ -194,7 +196,7 @@ map_type : MAP_TYPE
;
func_clause : ^( FUNC func_name func_args? )
- | ^( FUNC func_alias )
+ | ^( FUNC_REF func_alias )
;
func_name : eid ( ( PERIOD | DOLLAR ) eid )*
@@ -206,11 +208,11 @@ func_alias : IDENTIFIER
func_args : QUOTEDSTRING+
;
-group_clause : ^( GROUP group_item_list QUOTEDSTRING? )
- | ^( COGROUP group_item_list QUOTEDSTRING? )
+group_clause : ^( GROUP group_item+ group_type? )
+ | ^( COGROUP group_item+ group_type? )
;
-group_item_list : group_item+
+group_type : HINT_COLLECTED | HINT_MERGE | HINT_REGULAR
;
group_item : rel ( join_group_by_clause | ALL | ANY ) ( INNER | OUTER )?
@@ -220,7 +222,8 @@ rel : alias { validateAliasRef( aliases
| op_clause
;
-flatten_generated_item : ( flatten_clause | expr | STAR ) as_clause?
+flatten_generated_item : flatten_clause as_clause?
+ | ( expr | STAR ) field[new HashSet<String>()]?
;
flatten_clause : ^( FLATTEN expr )
@@ -305,7 +308,7 @@ rel_list : rel+
join_clause : ^( JOIN join_sub_clause join_type? partition_clause? )
;
-join_type : JOIN_TYPE_REPL | JOIN_TYPE_MERGE | JOIN_TYPE_SKEWED | JOIN_TYPE_DEFAULT
+join_type : HINT_REPL | HINT_MERGE | HINT_SKEWED | HINT_DEFAULT
;
join_sub_clause : join_item ( LEFT | RIGHT | FULL ) OUTER? join_item
@@ -324,29 +327,26 @@ join_group_by_expr : expr | STAR
union_clause : ^( UNION ONSCHEMA? rel_list )
;
-foreach_clause : ^( FOREACH rel nested_plan )
+foreach_clause : ^( FOREACH rel foreach_plan )
;
-nested_plan : ^( NESTED_PLAN foreach_blk )
- | ^( NESTED_PLAN generate_clause )
+foreach_plan : ^( FOREACH_PLAN nested_blk )
+ | ^( FOREACH_PLAN generate_clause parallel_clause? )
;
-foreach_blk : nested_command_list generate_clause
+nested_blk
+scope { Set<String> ids; }
+@init{ $nested_blk::ids = new HashSet<String>(); }
+ : nested_command* generate_clause
;
generate_clause : ^( GENERATE flatten_generated_item+ )
;
-nested_command_list
-scope { Set<String> ids; }
-@init{ $nested_command_list::ids = new HashSet<String>(); }
- : nested_command*
-;
-
nested_command
: ^( NESTED_CMD IDENTIFIER ( expr | nested_op ) )
{
- $nested_command_list::ids.add( $IDENTIFIER.text );
+ $nested_blk::ids.add( $IDENTIFIER.text );
}
;
@@ -366,7 +366,7 @@ col_ref_list : col_ref+
nested_alias_ref
: IDENTIFIER
{
- validateAliasRef( $nested_command_list::ids, $IDENTIFIER.text );
+ validateAliasRef( $nested_blk::ids, $IDENTIFIER.text );
}
;
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Tue Jan 4 22:33:16 2011
@@ -18,19 +18,32 @@
package org.apache.pig.parser;
+import java.io.IOException;
+import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+import org.antlr.runtime.RecognitionException;
+import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.RANDOM;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.StringUtils;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
@@ -43,17 +56,22 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LONative;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE;
import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import org.apache.pig.newplan.Operator;
@@ -62,14 +80,26 @@ public class LogicalPlanBuilder {
private LogicalPlan plan = new LogicalPlan();
private Map<String, Operator> operators = new HashMap<String, Operator>();
- private Map<String, FuncSpec> functions = new HashMap<String, FuncSpec>();
+ // TODO: hook up with the real PigContext instance instead of creating one here.
+ private PigContext pigContext = new PigContext( ExecType.LOCAL, new Properties() );
+
+ private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
+
FuncSpec lookupFunction(String alias) {
- return functions.get( alias );
+ return pigContext.getFuncSpecFromAlias( alias );
+ }
+
+ StreamingCommand lookupCommand(String alias) {
+ return pigContext.getCommandForAlias( alias );
+ }
+
+ void defineCommand(String alias, StreamingCommand command) {
+ pigContext.registerStreamCmd( alias, command );
}
void defineFunction(String alias, FuncSpec fs) {
- functions.put( alias, fs );
+ pigContext.registerFunction( alias, fs );
}
LogicalPlan getPlan() {
@@ -106,9 +136,9 @@ public class LogicalPlanBuilder {
return buildOp( op, alias, parallel, inputAliases, null );
}
- String buildSplitOp(String alias, Integer parallel, String inputAlias) {
+ String buildSplitOp(String inputAlias) {
LOSplit op = new LOSplit( plan );
- return buildOp( op, alias, parallel, inputAlias, null );
+ return buildOp( op, null, null, inputAlias, null );
}
String buildSplitOutputOp(String alias, Integer parallel, String inputAlias, LogicalExpressionPlan filterPlan) {
@@ -137,9 +167,14 @@ public class LogicalPlanBuilder {
return buildOp( op, alias, parallel, inputAliases, partitioner );
}
- String buildGroupOp(String alias, Integer parallel, String inputAlias, List<LogicalExpressionPlan> plans) {
- LOCogroup op = new LOCogroup( plan );
- return buildOp( op, alias, parallel, inputAlias, null );
+ String buildGroupOp(String alias, Integer parallel, List<String> inputAliases,
+ MultiMap<Integer, LogicalExpressionPlan> expressionPlans, GROUPTYPE gt, List<Boolean> innerFlags) {
+ boolean[] flags = new boolean[innerFlags.size()];
+ for( int i = 0; i < innerFlags.size(); i++ ) {
+ flags[i] = innerFlags.get( i );
+ }
+ LOCogroup op = new LOCogroup( plan, expressionPlans, gt, flags );
+ return buildOp( op, alias, parallel, inputAliases, null );
}
String buildLoadOp(String alias, Integer parallel, String filename, FuncSpec funcSpec, LogicalSchema schema) {
@@ -176,14 +211,104 @@ public class LogicalPlanBuilder {
return buildOp( op, alias, parallel, inputAlias, null );
}
- static void setAlias(LogicalRelationalOperator op, String alias) {
- if( alias != null ) {
- op.setAlias( alias );
- } else {
- // TODO: generate an alias.
+ String buildForeachOp(String alias, Integer parallel, String inputAlias, LogicalPlan innerPlan) {
+ LOForEach op = new LOForEach( plan );
+ op.setInnerPlan( innerPlan );
+ return buildOp( op, alias, parallel, inputAlias, null );
+ }
+
+ void buildGenerateOp(LogicalPlan plan, List<LogicalExpressionPlan> exprPlans, List<Boolean> flattenFlags,
+ List<LogicalSchema> schemas) {
+ boolean[] flags = new boolean[ flattenFlags.size() ];
+ for( int i = 0; i < flattenFlags.size(); i++ )
+ flags[i] = flattenFlags.get( i );
+ LOGenerate op = new LOGenerate( plan, exprPlans, flags );
+ op.setUserDefinedSchema( schemas );
+ plan.add( op );
+ }
+
+ StreamingCommand buildCommand(String cmd, List<String> shipPaths, List<String> cachePaths,
+ List<HandleSpec> inputHandleSpecs, List<HandleSpec> outputHandleSpecs,
+ String logDir, Integer limit) throws RecognitionException {
+ StreamingCommand command = null;
+ try {
+ command = buildCommand( cmd );
+
+ // Process ship paths
+ if( shipPaths.size() == 0 ) {
+ command.setShipFiles( false );
+ } else {
+ for( String path : shipPaths )
+ command.addPathToShip( path );
+ }
+
+ // Process cache paths
+ for( String path : cachePaths )
+ command.addPathToCache( path );
+
+ // Process input handle specs
+ for( HandleSpec spec : inputHandleSpecs )
+ command.addHandleSpec( Handle.INPUT, spec );
+
+ // Process output handle specs
+ for( HandleSpec spec : outputHandleSpecs )
+ command.addHandleSpec( Handle.OUTPUT, spec );
+
+ // error handling
+ command.setLogDir( logDir );
+ if( limit != null )
+ command.setLogFilesLimit( limit );
+ } catch(IOException e) {
+ }
+
+ return command;
+ }
+
+ StreamingCommand buildCommand(String cmd) throws RecognitionException {
+ try {
+ return new StreamingCommand( pigContext, splitArgs( cmd ) );
+ } catch (ParseException e) {
+ throw new RecognitionException(); }
+ }
+
+ String buildStreamOp(String alias, Integer parallel, String inputAlias, StreamingCommand command, LogicalSchema schema)
+ throws RecognitionException {
+ try {
+ LOStream op = new LOStream( plan, pigContext.createExecutableManager(), command, schema );
+ return buildOp( op, alias, parallel, inputAlias, null );
+ } catch (ExecException ex) {
+ throw new RecognitionException();
}
}
+ // TODO: create specific exceptions to throw.
+ String buildNativeOp(Integer parallel, String inputJar, String cmd,
+ List<String> paths, String storeAlias, String loadAlias)
+ throws RecognitionException {
+ LONative op;
+ try {
+ op = new LONative( plan, inputJar, splitArgs( cmd ) );
+ pigContext.addJar( inputJar );
+ for( String path : paths )
+ pigContext.addJar( path );
+ buildOp( op, null, parallel, new ArrayList<String>(), null );
+ plan.connect( operators.get( storeAlias ), op );
+ LOLoad load = (LOLoad)operators.get( loadAlias );
+ plan.connect( op, load );
+ return load.getAlias();
+ } catch (ParseException e) {
+ throw new RecognitionException();
+ } catch (MalformedURLException e) {
+ throw new RecognitionException();
+ }
+ }
+
+ static void setAlias(LogicalRelationalOperator op, String alias) {
+ if( alias == null )
+ alias = new OperatorKey( "test", getNextId() ).toString();
+ op.setAlias( alias );
+ }
+
static void setParallel(LogicalRelationalOperator op, Integer parallel) {
if( parallel != null )
op.setRequestedParallelism( parallel );
@@ -213,11 +338,61 @@ public class LogicalPlanBuilder {
return bagFactory.newDefaultBag();
}
- LogicalExpressionPlan buildProjectStar(String opAlias) {
- LogicalExpressionPlan plan = new LogicalExpressionPlan();
+ LogicalExpression buildProjectStar(LogicalExpressionPlan plan, String opAlias) {
LogicalRelationalOperator op = (LogicalRelationalOperator)operators.get( opAlias );
- new ProjectExpression( plan, 0, -1, op );
- return plan;
+ return new ProjectExpression( plan, 0, -1, op );
}
+ LogicalExpression buildUDF(LogicalExpressionPlan plan, String funcName, List<LogicalExpression> args) {
+ FuncSpec funcSpec = null;// TODO: get funcspec from function name.
+ return new UserFuncExpression( plan, funcSpec, args );
+ }
+
+ private static final char SINGLE_QUOTE = '\u005c'';
+ private static final char DOUBLE_QUOTE = '"';
+ private static String[] splitArgs(String command) throws ParseException {
+ List<String> argv = new ArrayList<String>();
+
+ int beginIndex = 0;
+
+ while (beginIndex < command.length()) {
+ // Skip spaces
+ while (Character.isWhitespace(command.charAt(beginIndex))) {
+ ++beginIndex;
+ }
+
+ char delim = ' ';
+ char charAtIndex = command.charAt(beginIndex);
+ if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
+ delim = charAtIndex;
+ }
+
+ int endIndex = command.indexOf(delim, beginIndex+1);
+ if (endIndex == -1) {
+ if (Character.isWhitespace(delim)) {
+ // Reached end of command-line
+ argv.add(command.substring(beginIndex));
+ break;
+ } else {
+ // Didn't find the ending quote/double-quote
+ throw new ParseException("Illegal command: " + command);
+ }
+ }
+
+ if (Character.isWhitespace(delim)) {
+ // Do not consume the space
+ argv.add(command.substring(beginIndex, endIndex));
+ } else {
+ argv.add(command.substring(beginIndex, endIndex+1));
+ }
+
+ beginIndex = endIndex + 1;
+ }
+
+ return argv.toArray(new String[argv.size()]);
+ }
+
+ private static long getNextId() {
+ return nodeIdGen.getNextNodeId( "test" );
+ }
}
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Tue Jan 4 22:33:16 2011
@@ -37,6 +37,9 @@ package org.apache.pig.parser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.builtin.GFAny;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.logical.expression.AddExpression;
import org.apache.pig.newplan.logical.expression.AndExpression;
@@ -60,12 +63,14 @@ import org.apache.pig.newplan.logical.ex
import org.apache.pig.newplan.logical.expression.OrExpression;
import org.apache.pig.newplan.logical.expression.RegexExpression;
import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
-import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE;
import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
import org.apache.pig.FuncSpec;
+import org.apache.pig.builtin.PigStreaming;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -84,23 +89,35 @@ public LogicalPlan getLogicalPlan() {
query : ^( QUERY statement* )
;
-statement : general_statement | foreach_statement
+statement
+scope {
+ // Parsing context
+ String alias; // The alias of the current operator, either given or generated by the parser.
+ Integer parallel; // Parallelism
+ String inputAlias; // The alias of the input operator
+}
+ : general_statement
+ | foreach_statement
+ | split_statement
+;
+
+split_statement : split_clause
;
general_statement
-scope {
- String alias;
- Integer parallel;
-}
-@init {
-}
-: ^( STATEMENT ( alias { $general_statement::alias = $alias.name; } )?
- op_clause ( INTEGER { $general_statement::parallel = Integer.parseInt( $INTEGER.text ); } )? )
+: ^( STATEMENT ( alias { $statement::alias = $alias.name; } )? op_clause parallel_clause? )
+;
+
+parallel_clause
+ : ^( PARALLEL INTEGER )
+ {
+ $statement::parallel = Integer.parseInt( $INTEGER.text );
+ }
;
// We need to handle foreach specifically because of the ending ';', which is not required
// if there is a nested block. This is ugly, but it gets the job done.
-foreach_statement : ^( STATEMENT alias? foreach_clause[$alias.text] )
+foreach_statement : ^( STATEMENT ( alias { $statement::alias = $alias.name; } )? foreach_clause )
;
alias returns[String name]: IDENTIFIER { $name = $IDENTIFIER.text; }
@@ -119,56 +136,95 @@ op_clause returns[String alias] :
| cross_clause { $alias = $cross_clause.alias; }
| join_clause { $alias = $join_clause.alias; }
| union_clause { $alias = $union_clause.alias; }
- | stream_clause
- | mr_clause
- | split_clause { $alias = $split_clause.alias; }
+ | stream_clause { $alias = $stream_clause.alias; }
+ | mr_clause { $alias = $mr_clause.alias; }
;
define_clause
- : ^( DEFINE alias cmd )
+ : ^( DEFINE alias cmd[$alias.name] )
+ {
+ builder.defineCommand( $alias.name, $cmd.command );
+ }
| ^( DEFINE alias func_clause )
{
builder.defineFunction( $alias.name, $func_clause.funcSpec );
}
;
-cmd : ^( EXECCOMMAND ( ship_clause | cache_caluse | input_clause | output_clause | error_clause )* )
-;
-
-ship_clause : ^( SHIP path_list? )
-;
-
-path_list : QUOTEDSTRING+
+cmd[String alias] returns[StreamingCommand command]
+@init {
+ List<String> shipPaths = new ArrayList<String>();
+ List<String> cachePaths = new ArrayList<String>();
+}
+ : ^( EXECCOMMAND ( ship_clause[shipPaths] | cache_caluse[cachePaths] | input_clause | output_clause | error_clause )* )
+ {
+ $command = builder.buildCommand( $EXECCOMMAND.text, shipPaths,
+ cachePaths, $input_clause.inputHandleSpecs, $output_clause.outputHandleSpecs,
+ $error_clause.dir == null? $alias : $error_clause.dir, $error_clause.limit );
+ }
;
-cache_caluse : ^( CACHE path_list )
+ship_clause[List<String> paths]
+ : ^( SHIP path_list[$paths]? )
;
-input_clause : ^( INPUT stream_cmd_list )
+path_list[List<String> paths]
+ : ( QUOTEDSTRING { $paths.add( builder.unquote( $QUOTEDSTRING.text ) ); } )+
;
-stream_cmd_list : stream_cmd+
+cache_caluse[List<String> paths]
+ : ^( CACHE path_list[$paths] )
;
-stream_cmd : ^( STDIN func_clause? )
- | ^( STDOUT func_clause? )
- | ^( QUOTEDSTRING func_clause? )
+input_clause returns[List<HandleSpec> inputHandleSpecs]
+@init {
+ $inputHandleSpecs = new ArrayList<HandleSpec>();
+}
+ : ^( INPUT ( stream_cmd { $inputHandleSpecs.add( $stream_cmd.handleSpec ); } )+ )
;
-output_clause : ^( OUTPUT stream_cmd_list )
+stream_cmd returns[HandleSpec handleSpec]
+@init {
+ String handleName = null;
+ FuncSpec fs = null;
+ String deserializer = PigStreaming.class.getName() + "()";
+ if( fs != null )
+ deserializer = fs.toString();
+}
+@final {
+ $handleSpec = new HandleSpec( handleName, deserializer );
+}
+ : ^( STDIN { handleName = "stdin"; }
+ ( func_clause { fs = $func_clause.funcSpec; } )? )
+ | ^( STDOUT { handleName = "stdout"; }
+ ( func_clause { fs = $func_clause.funcSpec; } )? )
+ | ^( QUOTEDSTRING { handleName = builder.unquote( $QUOTEDSTRING.text ); }
+ ( func_clause { fs = $func_clause.funcSpec; } )? )
;
-error_clause : ^( ERROR error_cmd? )
+output_clause returns[List<HandleSpec> outputHandleSpecs]
+@init {
+ $outputHandleSpecs = new ArrayList<HandleSpec>();
+}
+ : ^( OUTPUT ( stream_cmd { $outputHandleSpecs.add( $stream_cmd.handleSpec ); } )+ )
;
-error_cmd : ^( QUOTEDSTRING INTEGER? )
+error_clause returns[String dir, Integer limit]
+@init {
+ $limit = StreamingCommand.MAX_TASKS;
+}
+ : ^( ERROR QUOTEDSTRING INTEGER? )
+ {
+ $dir = builder.unquote( $QUOTEDSTRING.text );
+ $limit = Integer.parseInt( $INTEGER.text );
+ }
;
load_clause returns[String alias]
: ^( LOAD filename func_clause? as_clause? )
{
- $alias = builder.buildLoadOp( $general_statement::alias,
- $general_statement::parallel, $filename.filename, $func_clause.funcSpec, $as_clause.logicalSchema );
+ $alias = builder.buildLoadOp( $statement::alias,
+ $statement::parallel, $filename.filename, $func_clause.funcSpec, $as_clause.logicalSchema );
}
;
@@ -257,7 +313,7 @@ func_clause returns[FuncSpec funcSpec]
{
$funcSpec = builder.buildFuncSpec( $func_name.funcName, $func_args.args );
}
- | ^( FUNC func_alias )
+ | ^( FUNC_REF func_alias )
{
$funcSpec = builder.lookupFunction( $func_alias.alias );
}
@@ -286,33 +342,96 @@ func_args returns[List<String> args]
;
group_clause returns[String alias]
- : ^( GROUP group_item_list QUOTEDSTRING? )
- | ^( COGROUP group_item_list QUOTEDSTRING? )
+scope {
+ MultiMap<Integer, LogicalExpressionPlan> groupPlans;
+ int inputIndex;
+ List<String> inputAliases;
+ List<Boolean> innerFlags;
+}
+@init {
+ $group_clause::groupPlans = new MultiMap<Integer, LogicalExpressionPlan>();
+ $group_clause::inputAliases = new ArrayList<String>();
+ $group_clause::innerFlags = new ArrayList<Boolean>();
+ GROUPTYPE groupType = GROUPTYPE.REGULAR;
+}
+ : ^( GROUP group_item+ ( group_type { groupType = $group_type.type; } )? )
+ {
+ $alias = builder.buildGroupOp( $statement::alias, $statement::parallel,
+ $group_clause::inputAliases, $group_clause::groupPlans, groupType, $group_clause::innerFlags );
+ }
+ | ^( COGROUP group_item+ ( group_type { groupType = $group_type.type; } )? )
+ {
+ $alias = builder.buildGroupOp( $statement::alias, $statement::parallel,
+ $group_clause::inputAliases, $group_clause::groupPlans, groupType, $group_clause::innerFlags );
+ }
;
-group_item_list : group_item+
+group_type returns[GROUPTYPE type]
+ : HINT_COLLECTED { $type = GROUPTYPE.COLLECTED; }
+ | HINT_MERGE { $type = GROUPTYPE.MERGE; }
+ | HINT_REGULAR { $type = GROUPTYPE.REGULAR; }
;
-group_item : rel ( join_group_by_clause[$rel.name] | ALL | ANY ) ( INNER | OUTER )?
+group_item
+@init { boolean inner = false; }
+ : rel ( join_group_by_clause
+ {
+ $group_clause::groupPlans.put( $group_clause::inputIndex, $join_group_by_clause.plans );
+ }
+ | ALL
+ {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan();
+ new ConstantExpression( plan, "all", new LogicalFieldSchema( null , null, DataType.CHARARRAY ) );
+ List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>( 1 );
+ plans.add( plan );
+ $group_clause::groupPlans.put( $group_clause::inputIndex, plans );
+ }
+ | ANY
+ {
+ LogicalExpressionPlan plan = new LogicalExpressionPlan();
+ new UserFuncExpression( plan, new FuncSpec( GFAny.class.getName() ) );
+ List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>( 1 );
+ plans.add( plan );
+ $group_clause::groupPlans.put( $group_clause::inputIndex, plans );
+ }
+ ) ( INNER { inner = true; } | OUTER )?
+ {
+ $group_clause::inputAliases.add( $statement::inputAlias );
+ $group_clause::innerFlags.add( inner );
+ $group_clause::inputIndex++;
+ }
;
-rel returns[String name] : alias { $name = $alias.name; }
- | op_clause { $name = $op_clause.alias; }
+rel
+ : alias
+ {
+ $statement::inputAlias = $alias.name;
+ }
+ | op_clause
+ {
+ $statement::inputAlias = $op_clause.alias;
+ }
;
-flatten_generated_item
- : ( flatten_clause | expr[null] | STAR ) as_clause?
+flatten_generated_item returns[LogicalExpressionPlan plan, boolean flattenFlag, LogicalSchema schema]
+@init {
+ $plan = new LogicalExpressionPlan();
+}
+ : flatten_clause[$plan] { $flattenFlag = true; }
+ ( as_clause { $schema = $as_clause.logicalSchema; } )?
+ | ( expr[$plan] | STAR { builder.buildProjectStar( $plan, $statement::inputAlias ); } )
+ ( field { $schema = new LogicalSchema(); $schema.addField( $field.fieldSchema ); } )?
;
-flatten_clause
- : ^( FLATTEN expr[null] )
+flatten_clause[LogicalExpressionPlan plan]
+ : ^( FLATTEN expr[$plan] )
;
store_clause returns[String alias]
: ^( STORE alias filename func_clause? )
{
- $alias= builder.buildStoreOp( $general_statement::alias,
- $general_statement::parallel, $alias.name, $filename.filename, $func_clause.funcSpec );
+ $alias= builder.buildStoreOp( $statement::alias,
+ $statement::parallel, $alias.name, $filename.filename, $func_clause.funcSpec );
}
;
@@ -320,8 +439,8 @@ filter_clause returns[String alias]
@init { LogicalExpressionPlan exprPlan = new LogicalExpressionPlan(); }
: ^( FILTER rel cond[exprPlan] )
{
- $alias = builder.buildFilterOp( $general_statement::alias,
- $general_statement::parallel, $rel.name, exprPlan );
+ $alias = builder.buildFilterOp( $statement::alias,
+ $statement::parallel, $statement::inputAlias, exprPlan );
}
;
@@ -373,19 +492,27 @@ cond[LogicalExpressionPlan exprPlan] ret
$expr = new RegexExpression( $exprPlan, $e1.expr, $e2.expr );
}
| func_eval[$exprPlan]
+ {
+ $expr = $func_eval.expr;
+ }
;
func_eval[LogicalExpressionPlan plan] returns[LogicalExpression expr]
-@init { List<LogicalExpression> args = new ArrayList<LogicalExpression>(); }
+@init {
+ List<LogicalExpression> args = new ArrayList<LogicalExpression>();
+}
: ^( FUNC_EVAL func_name ( real_arg[$plan] { args.add( $real_arg.expr ); } )* )
{
- //TODO: FuncSpec funcSpec = builder.buildFuncSpec( $func_name.funcName, $func_args.args );
+ $expr = builder.buildUDF( $plan, $func_name.funcName, args );
}
;
real_arg [LogicalExpressionPlan plan] returns[LogicalExpression expr]
: e = expr[$plan] { $expr = $e.expr; }
| STAR
+ {
+ $expr = builder.buildProjectStar( $plan, $statement::inputAlias );
+ }
;
expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
@@ -433,19 +560,29 @@ expr[LogicalExpressionPlan plan] returns
;
var_expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
- : projectable_expr[$plan] ( dot_proj | pound_proj )*
+ : p = projectable_expr[$plan] ( dot_proj[$plan] | pound_proj[$plan] )*
;
projectable_expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
- : func_eval[$plan] { $expr = $func_eval.expr; }
- | col_ref[$plan] { }
- | bin_expr[$plan] { $expr = $bin_expr.expr; }
+ : func_eval[$plan]
+ {
+ $expr = $func_eval.expr;
+ }
+ | col_ref[$plan]
+ {
+ }
+ | bin_expr[$plan]
+ {
+ $expr = $bin_expr.expr;
+ }
;
-dot_proj : ^( PERIOD col_ref[null]+ )
+dot_proj[LogicalExpressionPlan plan] returns[LogicalExpression expr]
+ : ^( PERIOD col_ref[$plan]+ )
;
-pound_proj : ^( POUND ( QUOTEDSTRING | NULL ) )
+pound_proj[LogicalExpressionPlan plan] returns[LogicalExpression expr]
+ : ^( POUND ( QUOTEDSTRING | NULL ) )
;
bin_expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
@@ -458,43 +595,44 @@ bin_expr[LogicalExpressionPlan plan] ret
limit_clause returns[String alias]
: ^( LIMIT rel INTEGER )
{
- $alias = builder.buildLimitOp( $general_statement::alias,
- $general_statement::parallel, $rel.name, Long.valueOf( $INTEGER.text ) );
+ $alias = builder.buildLimitOp( $statement::alias,
+ $statement::parallel, $statement::inputAlias, Long.valueOf( $INTEGER.text ) );
}
| ^( LIMIT rel LONGINTEGER )
{
- $alias = builder.buildLimitOp( $general_statement::alias,
- $general_statement::parallel, $rel.name, Long.valueOf( $LONGINTEGER.text ) );
+ $alias = builder.buildLimitOp( $statement::alias,
+ $statement::parallel, $statement::inputAlias, Long.valueOf( $LONGINTEGER.text ) );
}
;
sample_clause returns[String alias]
: ^( SAMPLE rel DOUBLENUMBER )
{
- $alias = builder.buildSampleOp( $general_statement::alias,
- $general_statement::parallel, $rel.name, Double.valueOf( $DOUBLENUMBER.text ) );
+ $alias = builder.buildSampleOp( $statement::alias,
+ $statement::parallel, $statement::inputAlias, Double.valueOf( $DOUBLENUMBER.text ) );
}
;
order_clause returns[String alias]
- : ^( ORDER rel order_by_clause[$rel.name] func_clause? )
+ : ^( ORDER rel order_by_clause func_clause? )
{
- $alias = builder.buildOrderOp( $general_statement::alias,
- $general_statement::parallel, $rel.name, $order_by_clause.plans,
+ $alias = builder.buildOrderOp( $statement::alias,
+ $statement::parallel, $statement::inputAlias, $order_by_clause.plans,
$order_by_clause.ascFlags, $func_clause.funcSpec );
}
;
-order_by_clause[String opAlias] returns[List<LogicalExpressionPlan> plans, List<Boolean> ascFlags]
+order_by_clause returns[List<LogicalExpressionPlan> plans, List<Boolean> ascFlags]
@init {
$plans = new ArrayList<LogicalExpressionPlan>();
$ascFlags = new ArrayList<Boolean>();
}
: STAR {
- LogicalExpressionPlan plan = builder.buildProjectStar( $opAlias );
+ LogicalExpressionPlan plan = new LogicalExpressionPlan();
+ builder.buildProjectStar( plan, $statement::inputAlias );
$plans.add( plan );
}
- ( ASC | DESC { $ascFlags.add( false ); } )?
+ ( ASC { $ascFlags.add( true ); } | DESC { $ascFlags.add( false ); } )?
| ( order_col
{
$plans.add( $order_col.plan );
@@ -513,8 +651,8 @@ order_col returns[LogicalExpressionPlan
distinct_clause returns[String alias]
: ^( DISTINCT rel partition_clause? )
{
- $alias = builder.buildDistinctOp( $general_statement::alias,
- $general_statement::parallel, $rel.name, $partition_clause.partitioner );
+ $alias = builder.buildDistinctOp( $statement::alias,
+ $statement::parallel, $statement::inputAlias, $partition_clause.partitioner );
}
;
@@ -528,14 +666,14 @@ partition_clause returns[String partitio
cross_clause returns[String alias]
: ^( CROSS rel_list partition_clause? )
{
- $alias = builder.buildCrossOp( $general_statement::alias,
- $general_statement::parallel, $rel_list.aliasList, $partition_clause.partitioner );
+ $alias = builder.buildCrossOp( $statement::alias,
+ $statement::parallel, $rel_list.aliasList, $partition_clause.partitioner );
}
;
rel_list returns[List<String> aliasList]
@init { $aliasList = new ArrayList<String>(); }
- : ( rel { $aliasList.add( $rel.name ); } )+
+ : ( rel { $aliasList.add( $statement::inputAlias ); } )+
;
join_clause returns[String alias]
@@ -552,17 +690,17 @@ scope {
}
: ^( JOIN join_sub_clause join_type? partition_clause? )
{
- $alias = builder.buildJoinOp( $general_statement::alias,
- $general_statement::parallel, $join_clause::inputAliases, $join_clause::joinPlans,
+ $alias = builder.buildJoinOp( $statement::alias,
+ $statement::parallel, $join_clause::inputAliases, $join_clause::joinPlans,
$join_type.type, $join_clause::innerFlags, $partition_clause.partitioner );
}
;
join_type returns[JOINTYPE type]
- : JOIN_TYPE_REPL { $type = JOINTYPE.REPLICATED; }
- | JOIN_TYPE_MERGE { $type = JOINTYPE.MERGE; }
- | JOIN_TYPE_SKEWED { $type = JOINTYPE.SKEWED; }
- | JOIN_TYPE_DEFAULT { $type = JOINTYPE.HASH; }
+ : HINT_REPL { $type = JOINTYPE.REPLICATED; }
+ | HINT_MERGE { $type = JOINTYPE.MERGE; }
+ | HINT_SKEWED { $type = JOINTYPE.SKEWED; }
+ | HINT_DEFAULT { $type = JOINTYPE.HASH; }
;
join_sub_clause
@@ -578,53 +716,80 @@ join_sub_clause
;
join_item
- : ^( JOIN_ITEM rel join_group_by_clause[$rel.name] )
+ : ^( JOIN_ITEM rel join_group_by_clause )
{
- $join_clause::inputAliases.add( $rel.name );
+ $join_clause::inputAliases.add( $statement::inputAlias );
$join_clause::joinPlans.put( $join_clause::inputIndex, $join_group_by_clause.plans );
$join_clause::inputIndex++;
}
;
-join_group_by_clause[String alias] returns[List<LogicalExpressionPlan> plans]
-scope { String inputAlias; }
+join_group_by_clause returns[List<LogicalExpressionPlan> plans]
@init {
- $join_group_by_clause::inputAlias = $alias;
$plans = new ArrayList<LogicalExpressionPlan>();
}
: ^( BY ( join_group_by_expr { $plans.add( $join_group_by_expr.plan ); } )+ )
;
join_group_by_expr returns[LogicalExpressionPlan plan]
- : { $plan = new LogicalExpressionPlan(); } expr[$plan]
+@init {
+ $plan = new LogicalExpressionPlan();
+}
+ : expr[$plan]
| STAR
{
- $plan = builder.buildProjectStar( $join_group_by_clause::inputAlias );
+ builder.buildProjectStar( $plan, $statement::inputAlias );
}
;
union_clause returns[String alias]
: ^( UNION ONSCHEMA? rel_list )
{
- $alias = builder.buildUnionOp( $general_statement::alias,
- $general_statement::parallel, $rel_list.aliasList );
+ // TODO: onschema support
+ $alias = builder.buildUnionOp( $statement::alias,
+ $statement::parallel, $rel_list.aliasList );
}
;
-foreach_clause[String alias] returns[LogicalRelationalOperator op] : ^( FOREACH rel nested_plan )
-;
-
-nested_plan : ^( NESTED_PLAN foreach_blk )
- | ^( NESTED_PLAN generate_clause )
+foreach_clause returns[String alias]
+ : ^( FOREACH rel foreach_plan )
+ {
+ $alias = builder.buildForeachOp( $statement::alias,
+ $statement::parallel, $statement::inputAlias, $foreach_plan::innerPlan );
+ }
;
-foreach_blk : nested_command_list generate_clause
+foreach_plan
+scope {
+ LogicalPlan innerPlan;
+}
+@init {
+ $foreach_plan::innerPlan = new LogicalPlan();
+}
+ : ^( FOREACH_PLAN nested_blk )
+ | ^( FOREACH_PLAN generate_clause parallel_clause? )
;
-generate_clause : ^( GENERATE flatten_generated_item+ )
+nested_blk : nested_command* generate_clause
;
-nested_command_list : nested_command*
+generate_clause
+@init {
+ List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>();
+ List<Boolean> flattenFlags = new ArrayList<Boolean>();
+ List<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
+}
+ : ^( GENERATE ( flatten_generated_item
+ {
+ plans.add( $flatten_generated_item.plan );
+ flattenFlags.add( $flatten_generated_item.flattenFlag );
+ schemas.add( $flatten_generated_item.schema );
+ }
+ )+
+ )
+ {
+ builder.buildGenerateOp( $foreach_plan::innerPlan, plans, flattenFlags, schemas );
+ }
;
nested_command : ^( NESTED_CMD IDENTIFIER ( expr[null] | nested_op ) )
@@ -649,7 +814,7 @@ nested_alias_ref : IDENTIFIER
nested_filter : ^( FILTER ( nested_alias_ref | nested_proj | expr[null] ) cond[null] )
;
-nested_sort : ^( ORDER ( nested_alias_ref | nested_proj | expr[null] ) order_by_clause[null] func_clause? )
+nested_sort : ^( ORDER ( nested_alias_ref | nested_proj | expr[null] ) order_by_clause func_clause? )
;
nested_distinct : ^( DISTINCT ( nested_alias_ref | nested_proj | expr[null] ) )
@@ -658,25 +823,45 @@ nested_distinct : ^( DISTINCT ( nested_a
nested_limit : ^( LIMIT ( nested_alias_ref | nested_proj | expr[null] ) INTEGER )
;
-stream_clause : ^( STREAM rel ( EXECCOMMAND | IDENTIFIER ) as_clause? )
+stream_clause returns[String alias]
+@init {
+ StreamingCommand cmd = null;
+}
+ : ^( STREAM rel ( EXECCOMMAND { cmd = builder.buildCommand( $EXECCOMMAND.text ); }
+ | IDENTIFIER { cmd = builder.lookupCommand( $IDENTIFIER.text ); } ) as_clause? )
+ {
+ $alias = builder.buildStreamOp( $statement::alias, $statement::parallel,
+ $statement::inputAlias, cmd, $as_clause.logicalSchema );
+ }
;
-mr_clause : ^( MAPREDUCE QUOTEDSTRING path_list? store_clause load_clause EXECCOMMAND? )
+mr_clause returns[String alias]
+@init {
+ List<String> paths = new ArrayList<String>();
+ String alias = $statement::alias;
+}
+ : ^( MAPREDUCE QUOTEDSTRING path_list[paths]?
+ { $statement::alias = null; } store_clause
+ { $statement::alias = alias; } load_clause
+ EXECCOMMAND? )
+ {
+ $alias = builder.buildNativeOp( $statement::parallel,
+ builder.unquote( $QUOTEDSTRING.text ), builder.unquote( $EXECCOMMAND.text ),
+ paths, $store_clause.alias, $load_clause.alias );
+ }
;
-split_clause returns[String alias]
- : ^( SPLIT rel { $alias = builder.buildSplitOp( $general_statement::alias,
- $general_statement::parallel, $rel.name ); }
- split_branch[$alias]+ )
-
+split_clause
+ : ^( SPLIT rel { $statement::inputAlias = builder.buildSplitOp( $statement::inputAlias ); }
+ split_branch+ )
;
-split_branch[String inputAlias] returns[String alias]
+split_branch
@init { LogicalExpressionPlan splitPlan = new LogicalExpressionPlan(); }
: ^( SPLIT_BRANCH IDENTIFIER cond[splitPlan] )
{
- $alias = builder.buildSplitOutputOp( $IDENTIFIER.text,
- $general_statement::parallel, $inputAlias, splitPlan );
+ builder.buildSplitOutputOp( $IDENTIFIER.text,
+ $statement::parallel, $statement::inputAlias, splitPlan );
}
;
Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Tue Jan 4 22:33:16 2011
@@ -266,16 +266,22 @@ DOUBLENUMBER : FLOATINGPOINT ( 'E' ( MIN
FLOATNUMBER : DOUBLENUMBER ( 'F' )?
;
-JOIN_TYPE_REPL : '\'REPL\'' | '\'REPLICATED\''
+HINT_REPL : '\'REPL\'' | '\'REPLICATED\''
;
-JOIN_TYPE_SKEWED : '\'SKEWED\''
+HINT_SKEWED : '\'SKEWED\''
;
-JOIN_TYPE_MERGE : '\'MERGE\''
+HINT_MERGE : '\'MERGE\''
;
-JOIN_TYPE_DEFAULT : '\'HASH\'' | '\'DEFAULT\''
+HINT_DEFAULT : '\'HASH\'' | '\'DEFAULT\''
+;
+
+HINT_COLLECTED : '\'COLLECTED\''
+;
+
+HINT_REGULAR : '\'REGULAR\''
;
QUOTEDSTRING : '\'' ( ( ~ ( '\'' | '\\' | '\n' | '\r' ) )
Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Tue Jan 4 22:33:16 2011
@@ -36,6 +36,7 @@ tokens {
QUERY;
STATEMENT;
FUNC;
+ FUNC_REF;
FUNC_EVAL;
CAST_EXPR;
BIN_EXPR;
@@ -48,7 +49,7 @@ tokens {
NESTED_CMD;
NESTED_PROJ;
SPLIT_BRANCH;
- NESTED_PLAN;
+ FOREACH_PLAN;
MAP_TYPE;
TUPLE_TYPE;
BAG_TYPE;
@@ -112,11 +113,19 @@ query : statement*
-> ^( QUERY statement* )
;
-statement : general_statement | foreach_statement
+statement : general_statement
+ | foreach_statement
+ | split_statement
;
-general_statement : ( alias EQUAL )? op_clause ( PARALLEL INTEGER )? SEMI_COLON
- -> ^( STATEMENT alias? op_clause INTEGER? )
+split_statement : split_clause SEMI_COLON!
+;
+
+general_statement : ( alias EQUAL )? op_clause parallel_clause? SEMI_COLON
+ -> ^( STATEMENT alias? op_clause parallel_clause? )
+;
+
+parallel_clause : PARALLEL^ INTEGER
;
// We need to handle foreach specifically because of the ending ';', which is not required
@@ -142,7 +151,6 @@ op_clause : define_clause
| union_clause
| stream_clause
| mr_clause
- | split_clause
;
define_clause : DEFINE^ alias ( cmd | func_clause )
@@ -174,10 +182,7 @@ stream_cmd : ( STDIN | STDOUT | QUOTEDST
output_clause : OUTPUT^ LEFT_PAREN! stream_cmd_list RIGHT_PAREN!
;
-error_clause : ERROR^ LEFT_PAREN! error_cmd? RIGHT_PAREN!
-;
-
-error_cmd : QUOTEDSTRING^ ( LIMIT! INTEGER )?
+error_clause : ERROR^ LEFT_PAREN! QUOTEDSTRING ( LIMIT! INTEGER )? RIGHT_PAREN!
;
load_clause : LOAD^ filename ( USING! func_clause )? as_clause?
@@ -225,7 +230,7 @@ map_type : MAP LEFT_BRACKET RIGHT_BRACKE
func_clause : func_name LEFT_PAREN func_args? RIGHT_PAREN
-> ^( FUNC func_name func_args? )
| func_alias
- -> ^( FUNC func_alias )
+ -> ^( FUNC_REF func_alias )
;
func_name : eid ( ( PERIOD | DOLLAR ) eid )*
@@ -238,7 +243,10 @@ func_args : QUOTEDSTRING ( COMMA QUOTEDS
-> QUOTEDSTRING+
;
-group_clause : ( GROUP | COGROUP )^ group_item_list ( USING! QUOTEDSTRING )?
+group_clause : ( GROUP | COGROUP )^ group_item_list ( USING! group_type )?
+;
+
+group_type : HINT_COLLECTED | HINT_MERGE | HINT_REGULAR
;
group_item_list : group_item ( COMMA group_item )*
@@ -251,7 +259,8 @@ group_item : rel ( join_group_by_clause
rel : alias | LEFT_PAREN! op_clause RIGHT_PAREN!
;
-flatten_generated_item : ( flatten_clause | expr | STAR ) as_clause?
+flatten_generated_item : flatten_clause as_clause?
+ | ( expr | STAR ) ( AS! field )?
;
flatten_clause : FLATTEN^ LEFT_PAREN! expr RIGHT_PAREN!
@@ -377,7 +386,7 @@ rel_list : rel ( COMMA rel )*
join_clause : JOIN^ join_sub_clause ( USING! join_type )? partition_clause?
;
-join_type : JOIN_TYPE_REPL | JOIN_TYPE_MERGE | JOIN_TYPE_SKEWED | JOIN_TYPE_DEFAULT
+join_type : HINT_REPL | HINT_MERGE | HINT_SKEWED | HINT_DEFAULT
;
join_sub_clause : join_item ( LEFT | RIGHT | FULL ) OUTER? join_item
@@ -405,16 +414,16 @@ join_group_by_expr : expr | STAR
union_clause : UNION^ ONSCHEMA? rel_list
;
-foreach_clause : FOREACH^ rel nested_plan
+foreach_clause : FOREACH^ rel foreach_plan
;
-nested_plan : foreach_blk SEMI_COLON?
- -> ^( NESTED_PLAN foreach_blk )
- | ( generate_clause SEMI_COLON )
- -> ^( NESTED_PLAN generate_clause )
+foreach_plan : nested_blk SEMI_COLON?
+ -> ^( FOREACH_PLAN nested_blk )
+ | ( generate_clause parallel_clause? SEMI_COLON )
+ -> ^( FOREACH_PLAN generate_clause parallel_clause? )
;
-foreach_blk : LEFT_CURLY! nested_command_list ( generate_clause SEMI_COLON! ) RIGHT_CURLY!
+nested_blk : LEFT_CURLY! nested_command_list ( generate_clause SEMI_COLON! ) RIGHT_CURLY!
;
generate_clause : GENERATE flatten_generated_item ( COMMA flatten_generated_item )*
Modified: pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java Tue Jan 4 22:33:16 2011
@@ -18,45 +18,69 @@
package org.apache.pig.parser;
-import java.io.IOException;
import junit.framework.Assert;
-import org.antlr.runtime.RecognitionException;
import org.junit.Test;
public class TestLogicalPlanGenerator {
@Test
- public void test1() throws RecognitionException, IOException, ParsingFailureException {
- try {
- ParserTestingUtils.generateLogicalPlan(
- "A = load 'x' using org.apache.pig.TextLoader( 'a', 'b' ) as ( u:int, v:long, w:bytearray); " +
- "B = limit A 100; " +
- "C = filter B by 2 > 1; " +
- "D = load 'y' as (d1, d2); " +
- "E = join C by ( $0, $1 ), D by ( d1, d2 ) using 'replicated' parallel 16; " +
- "F = store E into 'output';" );
- } catch(Exception ex) {
- Assert.assertTrue( false );// should never come here.
- }
+ public void test1() {
+ String query = "A = load 'x' using org.apache.pig.TextLoader( 'a', 'b' ) as ( u:int, v:long, w:bytearray); " +
+ "B = limit A 100; " +
+ "C = filter B by 2 > 1; " +
+ "D = load 'y' as (d1, d2); " +
+ "E = join C by ( $0, $1 ), D by ( d1, d2 ) using 'replicated' parallel 16; " +
+ "F = store E into 'output';";
+ generateLogicalPlan( query );
}
@Test
- public void test2() throws RecognitionException, IOException, ParsingFailureException {
+ public void test2() {
+ String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
+ "B = distinct A partition by org.apache.pig.Identity; " +
+ "C = sample B 0.49; " +
+ "D = order C by $0, $1; " +
+ "E = load 'y' as (d1, d2); " +
+ "F = union onschema D, E; " +
+ "G = load 'z' as (g1:int, g2:tuple(g21, g22)); " +
+ "H = cross F, G; " +
+ "split H into I if 10 > 5, J if 'world' eq 'hello', K if 77 <= 200; " +
+ "L = store J into 'output';";
+ generateLogicalPlan( query );
+ }
+
+ @Test
+ public void test3() {
+ }
+
+ private void generateLogicalPlan(String query) {
try {
- ParserTestingUtils.generateLogicalPlan(
- "A = load 'x' as ( u:int, v:long, w:bytearray); " +
- "B = distinct A partition by org.apache.pig.Identity; " +
- "C = sample B 0.49; " +
- "D = order C by $0, $1; " +
- "E = load 'y' as (d1, d2); " +
- "F = union onschema D, E; " +
- "G = load 'z' as (g1:int, g2:tuple(g21, g22)); " +
- "H = cross F, G; " +
- "I = split H into I if 10 > 5, J if 'world' eq 'hello', K if 77 <= 200; " +
- "L = store J into 'output';" );
+ ParserTestingUtils.generateLogicalPlan( query );
} catch(Exception ex) {
- Assert.assertTrue( false );// should never come here.
+ Assert.fail( "Failed to generate logical plan for query [" + query + "] due to exception: " + ex );
}
}
+
+ @Test
+ public void test4() {
+ String query = "A = load 'x'; " +
+ "B = mapreduce '" + "myjar.jar" + "' " +
+ "Store A into 'table_testNativeMRJobSimple_input' "+
+ "Load 'table_testNativeMRJobSimple_output' "+
+ "`org.apache.pig.test.utils.WordCount -files " + "file " +
+ "table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output " +
+ "stopworld.file" + "`;" +
+ "C = Store B into 'output';";
+ generateLogicalPlan( query );
+ }
+
+ // Test define function.
+ @Test
+ public void test5() {
+ String query = "define myudf org.apache.pig.TextLoader( 'test', 'data' );" +
+ "A = load 'x' using myudf;" +
+ "store A into 'y';";
+ generateLogicalPlan( query );
+ }
}
Modified: pig/trunk/test/org/apache/pig/parser/TestParser.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestParser.pig?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestParser.pig (original)
+++ pig/trunk/test/org/apache/pig/parser/TestParser.pig Tue Jan 4 22:33:16 2011
@@ -77,7 +77,7 @@ store countInactiveAcct into '/user/kale
store inactiveAccounts into '/user/kaleidoscope/pow_stats/20080228/acct/InactiveAcct';
--split
-G = Split A into X if $0 > 0, Y if $0 == 0;
+Split A into X if $0 > 0, Y if $0 == 0;
--union
H = union onschema A, B;