You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/08/07 09:47:51 UTC
svn commit: r1154666 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/parser/ test/org/apache/pig/parser/
test/org/apache/pig/test/
Author: daijy
Date: Sun Aug 7 07:47:50 2011
New Revision: 1154666
URL: http://svn.apache.org/viewvc?rev=1154666&view=rev
Log:
PIG-1631: Support to 2 level nested foreach
Added:
pig/trunk/test/org/apache/pig/test/TestNestedForeach.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
pig/trunk/src/org/apache/pig/parser/AliasMasker.g
pig/trunk/src/org/apache/pig/parser/AstPrinter.g
pig/trunk/src/org/apache/pig/parser/AstValidator.g
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
pig/trunk/src/org/apache/pig/parser/QueryParser.g
pig/trunk/test/org/apache/pig/parser/TestErrorHandling.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Aug 7 07:47:50 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1631: Support to 2 level nested foreach (aniket486 via daijy)
+
PIG-2191: Reduce amount of log spam generated by UDFs (dvryaboy)
PIG-2200: Piggybank cannot be built from the Git mirror (dvryaboy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java Sun Aug 7 07:47:50 2011
@@ -628,9 +628,7 @@ public class SecondaryKeyOptimizer exten
static private boolean collectColumnChain(PhysicalPlan plan,
ColumnChainInfo columnChainInfo) throws PlanException {
if (plan.getRoots().size() != 1) {
- int errorCode = 2207;
- throw new PlanException(
- "POForEach inner plan has more than 1 root", errorCode);
+ return true;
}
PhysicalOperator currentNode = plan.getRoots().get(0);
Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Sun Aug 7 07:47:50 2011
@@ -439,6 +439,7 @@ nested_op : nested_proj
| nested_distinct
| nested_limit
| nested_cross
+ | nested_foreach
;
nested_proj
@@ -464,6 +465,9 @@ nested_limit
nested_cross : ^( CROSS nested_op_input_list )
;
+nested_foreach : ^( FOREACH nested_op_input generate_clause )
+;
+
nested_op_input_list : nested_op_input+
;
Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Sun Aug 7 07:47:50 2011
@@ -416,6 +416,7 @@ nested_op : nested_proj
| nested_distinct
| nested_limit
| nested_cross
+ | nested_foreach
;
nested_proj
@@ -443,6 +444,10 @@ nested_cross
: ^( CROSS { sb.append($CROSS.text).append(" "); } nested_op_input_list )
;
+nested_foreach
+ : ^( FOREACH { sb.append($FOREACH.text).append(" "); } nested_op_input generate_clause )
+;
+
nested_op_input : col_ref | nested_proj
;
Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Sun Aug 7 07:47:50 2011
@@ -437,6 +437,7 @@ nested_op : nested_proj
| nested_distinct
| nested_limit
| nested_cross
+ | nested_foreach
;
nested_proj : ^( NESTED_PROJ col_ref col_ref+ )
@@ -458,6 +459,9 @@ nested_limit : ^( LIMIT nested_op_input
nested_cross : ^( CROSS nested_op_input_list )
;
+nested_foreach : ^( FOREACH nested_op_input generate_clause )
+;
+
nested_op_input : col_ref | nested_proj
;
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Sun Aug 7 07:47:50 2011
@@ -1048,6 +1048,10 @@ public class LogicalPlanBuilder {
}
}
+ static LOForEach createNestedForeachOp(LogicalPlan plan) {
+ return new LOForEach(plan);
+ }
+
Operator buildNestedSortOp(SourceLocation loc, LOSort op, LogicalPlan plan, String alias, Operator inputOp,
List<LogicalExpressionPlan> plans,
List<Boolean> ascFlags, FuncSpec fs) {
@@ -1062,6 +1066,15 @@ public class LogicalPlanBuilder {
return op;
}
+ Operator buildNestedForeachOp(SourceLocation loc, LOForEach op, LogicalPlan plan, String alias,
+ Operator inputOp, LogicalPlan innerPlan)
+ throws ParserValidationException
+ {
+ op.setInnerPlan(innerPlan);
+ buildNestedOp(loc, plan, op, alias, inputOp);
+ return op;
+ }
+
Operator buildNestedProjectOp(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
Map<String, Operator> operators,
String alias,
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Sun Aug 7 07:47:50 2011
@@ -104,6 +104,8 @@ private LogicalPlanBuilder builder = nul
private boolean inForeachPlan = false;
+private boolean inNestedCommand = false;
+
public LogicalPlan getLogicalPlan() {
return builder.getPlan();
}
@@ -1119,32 +1121,13 @@ scope {
nested_blk : nested_command* generate_clause
;
-generate_clause
-scope GScope;
-@init {
- $GScope::currentOp = builder.createGenerateOp( $foreach_plan::innerPlan );
- List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>();
- List<Boolean> flattenFlags = new ArrayList<Boolean>();
- List<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
-}
- : ^( GENERATE ( flatten_generated_item
- {
- plans.add( $flatten_generated_item.plan );
- flattenFlags.add( $flatten_generated_item.flattenFlag );
- schemas.add( $flatten_generated_item.schema );
- }
- )+
- )
- {
- builder.buildGenerateOp( new SourceLocation( (PigParserNode)$GENERATE ), $foreach_clause::foreachOp,
- (LOGenerate)$GScope::currentOp, $foreach_plan::operators,
- plans, flattenFlags, schemas );
- }
-;
-
nested_command
@init {
LogicalExpressionPlan exprPlan = new LogicalExpressionPlan();
+ inNestedCommand = true;
+}
+@after {
+ inNestedCommand = false;
}
: ^( NESTED_CMD IDENTIFIER nested_op[$IDENTIFIER.text] )
{
@@ -1164,6 +1147,7 @@ nested_op[String alias] returns[Operator
| nested_distinct[$alias] { $op = $nested_distinct.op; }
| nested_limit[$alias] { $op = $nested_limit.op; }
| nested_cross[$alias] { $op = $nested_cross.op; }
+ | nested_foreach[$alias] { $op = $nested_foreach.op; }
;
nested_proj[String alias] returns[Operator op]
@@ -1259,6 +1243,48 @@ nested_cross[String alias] returns[Opera
}
;
+nested_foreach[String alias] returns[Operator op]
+scope {
+ LogicalPlan innerPlan;
+ LOForEach foreachOp;
+}
+@init {
+ Operator inputOp = null;
+ $nested_foreach::innerPlan = new LogicalPlan();
+ $nested_foreach::foreachOp = builder.createNestedForeachOp( $foreach_plan::innerPlan );
+}
+ : ^( FOREACH nested_op_input generate_clause )
+ {
+ SourceLocation loc = new SourceLocation( (PigParserNode)$FOREACH );
+ $op = builder.buildNestedForeachOp( loc, (LOForEach)$nested_foreach::foreachOp, $foreach_plan::innerPlan,
+ $alias, $nested_op_input.op, $nested_foreach::innerPlan);
+ }
+;
+
+generate_clause
+scope GScope;
+@init {
+ $GScope::currentOp = builder.createGenerateOp(inNestedCommand ? $nested_foreach::innerPlan : $foreach_plan::innerPlan );
+ List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>();
+ List<Boolean> flattenFlags = new ArrayList<Boolean>();
+ List<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
+}
+ : ^( GENERATE ( flatten_generated_item
+ {
+ plans.add( $flatten_generated_item.plan );
+ flattenFlags.add( $flatten_generated_item.flattenFlag );
+ schemas.add( $flatten_generated_item.schema );
+ }
+ )+
+ )
+ {
+ builder.buildGenerateOp( new SourceLocation( (PigParserNode)$GENERATE ),
+ inNestedCommand ? $nested_foreach::foreachOp : $foreach_clause::foreachOp,
+ (LOGenerate)$GScope::currentOp, $foreach_plan::operators,
+ plans, flattenFlags, schemas );
+ }
+;
+
nested_op_input returns[Operator op]
@init {
LogicalExpressionPlan plan = new LogicalExpressionPlan();
Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Sun Aug 7 07:47:50 2011
@@ -570,6 +570,7 @@ nested_op : nested_filter
| nested_distinct
| nested_limit
| nested_cross
+ | nested_foreach
;
nested_proj : col_ref PERIOD col_ref_list
@@ -595,6 +596,9 @@ nested_limit : LIMIT^ nested_op_input (
nested_cross : CROSS^ nested_op_input_list
;
+nested_foreach: FOREACH^ nested_op_input generate_clause
+;
+
nested_op_input : col_ref | nested_proj
;
Modified: pig/trunk/test/org/apache/pig/parser/TestErrorHandling.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestErrorHandling.java?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestErrorHandling.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestErrorHandling.java Sun Aug 7 07:47:50 2011
@@ -158,19 +158,22 @@ public class TestErrorHandling {
@Test // PIG-1956, 1957
public void tesNegative9() throws IOException {
- String query = "A = load 'x' as (name, age, gpa);\n" +
- "B = group A by name;\n" +
- "C = foreach B { ba = filter A by age < '25'; bb = foreach ba generate gpa; generate group, flatten(bb);}";
- try {
- pig.registerQuery( query );
- } catch(FrontendException ex) {
- String msg = ex.getMessage();
- System.out.println( msg );
- Assert.assertEquals( 1200, ex.getErrorCode() );
- Assert.assertTrue( msg.contains( "line 3, column 58" ) );
- Assert.assertTrue( msg.contains( "mismatched input 'ba' expecting LEFT_PAREN" ) );
- return;
- }
+ pig.registerQuery("a = load 'temp' as (a0:int, a1:int);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ try {
+ pig.registerQuery("c = foreach b { " +
+ " c1 = foreach a { " +
+ " c11 = filter a by a1 > 0; " +
+ " generate c11; " +
+ " } " +
+ " generate c1; " +
+ " }\n");
+ } catch (FrontendException ex) {
+ String msg = ex.getMessage();
+ Assert.assertTrue( msg.contains( "line 5, column 32" ) );
+ Assert.assertTrue( msg.contains( "mismatched input '{' expecting GENERATE"));
+ return;
+ }
Assert.fail( "Testcase should fail" );
}
Added: pig/trunk/test/org/apache/pig/test/TestNestedForeach.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNestedForeach.java?rev=1154666&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNestedForeach.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestNestedForeach.java Sun Aug 7 07:47:50 2011
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+public class TestNestedForeach {
+ static MiniCluster cluster = MiniCluster.buildCluster();
+
+ private PigServer pig ;
+
+ public TestNestedForeach() throws Throwable {
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()) ;
+ }
+
+ Boolean[] nullFlags = new Boolean[]{ false, true };
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ }
+
+ @Test
+ public void testNestedForeachProj() throws Exception {
+ String[] input = {
+ "1\t2",
+ "2\t7",
+ "1\t3"
+ };
+
+ Util.createInputFile(cluster, "table_nf_proj", input);
+
+ pig.registerQuery("a = load 'table_nf_proj' as (a0:int, a1:int);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { c1 = foreach a generate a1; generate c1; }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(2),(3)})"));
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(7)})"));
+ }
+
+ @Test
+ public void testNestedForeachExpression() throws Exception {
+ String[] input = {
+ "1\t2",
+ "2\t7",
+ "1\t3"
+ };
+
+ Util.createInputFile(cluster, "table_nf_expr", input);
+
+ pig.registerQuery("a = load 'table_nf_expr' as (a0:int, a1:int);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { c1 = foreach a generate 2 * a1; generate c1; }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(4),(6)})"));
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(14)})"));
+ }
+
+ @Test
+ public void testNestedForeachUDF() throws Exception {
+ String[] input = {
+ "1\thello",
+ "2\tpig",
+ "1\tworld"
+ };
+
+ Util.createInputFile(cluster, "table_nf_udf", input);
+
+ pig.registerQuery("a = load 'table_nf_udf' as (a0:int, a1:chararray);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { c1 = foreach a generate UPPER(a1); generate c1; }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(HELLO),(WORLD)})"));
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(PIG)})"));
+ }
+
+ @Test
+ public void testNestedForeachFlatten() throws Exception {
+ String[] input = {
+ "1\thello world pig",
+ "2\thadoop world",
+ "1\thello pig"
+ };
+
+ Util.createInputFile(cluster, "table_nf_flatten", input);
+
+ pig.registerQuery("a = load 'table_nf_flatten' as (a0:int, a1:chararray);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { c1 = foreach a generate FLATTEN(TOKENIZE(a1)); generate c1; }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(hello),(world),(pig),(hello),(pig)})"));
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(hadoop),(world)})"));
+ }
+
+ @Test
+ public void testNestedForeachInnerFilter() throws Exception {
+ String[] input = {
+ "1\t2",
+ "2\t7",
+ "1\t3"
+ };
+
+ Util.createInputFile(cluster, "table_nf_filter", input);
+
+ pig.registerQuery("a = load 'table_nf_filter' as (a0:int, a1:int);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { " +
+ " c1 = filter a by a1 >= 3; " +
+ " c2 = foreach c1 generate a1; " +
+ " generate c2; " +
+ " }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(3)})"));
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(7)})"));
+ }
+
+ @Test
+ public void testNestedForeachInnerOrder() throws Exception {
+ String[] input = {
+ "1\t3",
+ "2\t7",
+ "1\t2"
+ };
+
+ Util.createInputFile(cluster, "table_nf_order", input);
+
+ pig.registerQuery("a = load 'table_nf_order' as (a0:int, a1:int);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { " +
+ " c1 = order a by a1; " +
+ " c2 = foreach c1 generate a1; " +
+ " generate c2; " +
+ " }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(2),(3)})"));
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(7)})"));
+ }
+}