You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/08/07 09:47:51 UTC

svn commit: r1154666 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/parser/ test/org/apache/pig/parser/ test/org/apache/pig/test/

Author: daijy
Date: Sun Aug  7 07:47:50 2011
New Revision: 1154666

URL: http://svn.apache.org/viewvc?rev=1154666&view=rev
Log:
PIG-1631: Support to 2 level nested foreach

Added:
    pig/trunk/test/org/apache/pig/test/TestNestedForeach.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
    pig/trunk/src/org/apache/pig/parser/AliasMasker.g
    pig/trunk/src/org/apache/pig/parser/AstPrinter.g
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g
    pig/trunk/test/org/apache/pig/parser/TestErrorHandling.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Aug  7 07:47:50 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1631: Support to 2 level nested foreach (aniket486 via daijy)
+
 PIG-2191: Reduce amount of log spam generated by UDFs (dvryaboy)
 
 PIG-2200: Piggybank cannot be built from the Git mirror (dvryaboy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java Sun Aug  7 07:47:50 2011
@@ -628,9 +628,7 @@ public class SecondaryKeyOptimizer exten
     static private boolean collectColumnChain(PhysicalPlan plan,
             ColumnChainInfo columnChainInfo) throws PlanException {
         if (plan.getRoots().size() != 1) {
-            int errorCode = 2207;
-            throw new PlanException(
-                    "POForEach inner plan has more than 1 root", errorCode);
+        	return true;
         }
 
         PhysicalOperator currentNode = plan.getRoots().get(0);

Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Sun Aug  7 07:47:50 2011
@@ -439,6 +439,7 @@ nested_op : nested_proj
           | nested_distinct
           | nested_limit
           | nested_cross
+          | nested_foreach
 ;
 
 nested_proj 
@@ -464,6 +465,9 @@ nested_limit 
 nested_cross : ^( CROSS nested_op_input_list )
 ;
 
+nested_foreach : ^( FOREACH nested_op_input generate_clause )
+;
+
 nested_op_input_list : nested_op_input+
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Sun Aug  7 07:47:50 2011
@@ -416,6 +416,7 @@ nested_op : nested_proj
           | nested_distinct
           | nested_limit
           | nested_cross
+          | nested_foreach
 ;
 
 nested_proj 
@@ -443,6 +444,10 @@ nested_cross
     : ^( CROSS { sb.append($CROSS.text).append(" "); }  nested_op_input_list )
 ;
 
+nested_foreach
+	: ^( FOREACH { sb.append($FOREACH.text).append(" "); }  nested_op_input generate_clause )
+;
+
 nested_op_input : col_ref | nested_proj
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Sun Aug  7 07:47:50 2011
@@ -437,6 +437,7 @@ nested_op : nested_proj
           | nested_distinct
           | nested_limit
           | nested_cross
+          | nested_foreach
 ;
 
 nested_proj : ^( NESTED_PROJ col_ref col_ref+ )
@@ -458,6 +459,9 @@ nested_limit : ^( LIMIT nested_op_input 
 nested_cross : ^( CROSS nested_op_input_list )
 ;
 
+nested_foreach : ^( FOREACH nested_op_input generate_clause )
+;
+
 nested_op_input : col_ref | nested_proj
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Sun Aug  7 07:47:50 2011
@@ -1048,6 +1048,10 @@ public class LogicalPlanBuilder {
         }
     }
     
+    static LOForEach createNestedForeachOp(LogicalPlan plan) {
+    	return new LOForEach(plan);
+    }
+    
     Operator buildNestedSortOp(SourceLocation loc, LOSort op, LogicalPlan plan, String alias, Operator inputOp,
             List<LogicalExpressionPlan> plans, 
             List<Boolean> ascFlags, FuncSpec fs) {
@@ -1062,6 +1066,15 @@ public class LogicalPlanBuilder {
         return op;
     }
     
+    Operator buildNestedForeachOp(SourceLocation loc, LOForEach op, LogicalPlan plan, String alias, 
+    		Operator inputOp, LogicalPlan innerPlan)
+    throws ParserValidationException
+    {
+    	op.setInnerPlan(innerPlan);
+    	buildNestedOp(loc, plan, op, alias, inputOp);
+    	return op;
+    }
+    
     Operator buildNestedProjectOp(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach, 
             Map<String, Operator> operators,
             String alias,

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Sun Aug  7 07:47:50 2011
@@ -104,6 +104,8 @@ private LogicalPlanBuilder builder = nul
 
 private boolean inForeachPlan = false;
 
+private boolean inNestedCommand = false;
+
 public LogicalPlan getLogicalPlan() {
     return builder.getPlan();
 }
@@ -1119,32 +1121,13 @@ scope {
 nested_blk : nested_command* generate_clause
 ;
 
-generate_clause
-scope GScope;
-@init {
-    $GScope::currentOp = builder.createGenerateOp( $foreach_plan::innerPlan );
-    List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>();
-    List<Boolean> flattenFlags = new ArrayList<Boolean>();
-    List<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
-}
- : ^( GENERATE ( flatten_generated_item
-                 {
-                     plans.add( $flatten_generated_item.plan );
-                     flattenFlags.add( $flatten_generated_item.flattenFlag );
-                     schemas.add( $flatten_generated_item.schema );
-                 }
-               )+
-    )
-   {   
-       builder.buildGenerateOp( new SourceLocation( (PigParserNode)$GENERATE ), $foreach_clause::foreachOp, 
-           (LOGenerate)$GScope::currentOp, $foreach_plan::operators,
-           plans, flattenFlags, schemas );
-   }
-;
-
 nested_command
 @init {
     LogicalExpressionPlan exprPlan = new LogicalExpressionPlan();
+    inNestedCommand = true;
+}
+@after {
+	inNestedCommand = false;
 }
  : ^( NESTED_CMD IDENTIFIER nested_op[$IDENTIFIER.text] )
    {
@@ -1164,6 +1147,7 @@ nested_op[String alias] returns[Operator
  | nested_distinct[$alias] { $op = $nested_distinct.op; }
  | nested_limit[$alias] { $op = $nested_limit.op; }
  | nested_cross[$alias] { $op = $nested_cross.op; }
+ | nested_foreach[$alias] { $op = $nested_foreach.op; }
 ;
 
 nested_proj[String alias] returns[Operator op]
@@ -1259,6 +1243,48 @@ nested_cross[String alias] returns[Opera
    }
 ;
 
+nested_foreach[String alias] returns[Operator op]
+scope {
+    LogicalPlan innerPlan;
+    LOForEach foreachOp;
+}
+@init {
+	Operator inputOp = null;
+	$nested_foreach::innerPlan = new LogicalPlan();
+	$nested_foreach::foreachOp = builder.createNestedForeachOp( $foreach_plan::innerPlan );
+}
+ : ^( FOREACH nested_op_input generate_clause )
+   {
+   		SourceLocation loc = new SourceLocation( (PigParserNode)$FOREACH );
+   		$op = builder.buildNestedForeachOp( loc, (LOForEach)$nested_foreach::foreachOp, $foreach_plan::innerPlan,
+   							$alias, $nested_op_input.op, $nested_foreach::innerPlan);
+   }
+;
+
+generate_clause
+scope GScope;
+@init {
+	$GScope::currentOp = builder.createGenerateOp(inNestedCommand ? $nested_foreach::innerPlan : $foreach_plan::innerPlan );
+    List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>();
+    List<Boolean> flattenFlags = new ArrayList<Boolean>();
+    List<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
+}
+ : ^( GENERATE ( flatten_generated_item
+                 {
+                     plans.add( $flatten_generated_item.plan );
+                     flattenFlags.add( $flatten_generated_item.flattenFlag );
+                     schemas.add( $flatten_generated_item.schema );
+                 }
+               )+
+    )
+   {   
+       builder.buildGenerateOp( new SourceLocation( (PigParserNode)$GENERATE ), 
+       	   inNestedCommand ? $nested_foreach::foreachOp : $foreach_clause::foreachOp, 
+           (LOGenerate)$GScope::currentOp, $foreach_plan::operators,
+           plans, flattenFlags, schemas );
+   }
+;
+
 nested_op_input returns[Operator op]
 @init {
     LogicalExpressionPlan plan = new LogicalExpressionPlan();

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Sun Aug  7 07:47:50 2011
@@ -570,6 +570,7 @@ nested_op : nested_filter
           | nested_distinct
           | nested_limit
           | nested_cross
+          | nested_foreach
 ;
 
 nested_proj : col_ref PERIOD col_ref_list
@@ -595,6 +596,9 @@ nested_limit : LIMIT^ nested_op_input ( 
 nested_cross : CROSS^ nested_op_input_list
 ;
 
+nested_foreach: FOREACH^ nested_op_input generate_clause
+;
+
 nested_op_input : col_ref | nested_proj
 ;
 

Modified: pig/trunk/test/org/apache/pig/parser/TestErrorHandling.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestErrorHandling.java?rev=1154666&r1=1154665&r2=1154666&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestErrorHandling.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestErrorHandling.java Sun Aug  7 07:47:50 2011
@@ -158,19 +158,22 @@ public class TestErrorHandling {
 
     @Test // PIG-1956, 1957
     public void tesNegative9() throws IOException {
-        String query = "A = load 'x' as (name, age, gpa);\n" +
-                       "B = group A by name;\n" +
-                       "C = foreach B { ba = filter A by age < '25'; bb = foreach ba generate gpa; generate group, flatten(bb);}";
-        try {
-            pig.registerQuery( query );
-        } catch(FrontendException ex) {
-        	String msg = ex.getMessage();
-            System.out.println( msg );
-            Assert.assertEquals( 1200, ex.getErrorCode() );
-            Assert.assertTrue( msg.contains( "line 3, column 58" ) );
-            Assert.assertTrue( msg.contains( "mismatched input 'ba' expecting LEFT_PAREN" ) );
-            return;
-        }
+		pig.registerQuery("a = load 'temp' as (a0:int, a1:int);\n");
+		pig.registerQuery("b = group a by a0;\n");
+		try {
+			pig.registerQuery("c = foreach b { " +
+					" c1 = foreach a { " +
+					" c11 = filter a by a1 > 0; " +
+					" generate c11; " +
+					" } " +
+					" generate c1; " +
+			" }\n");
+		} catch (FrontendException ex) {
+			String msg = ex.getMessage();
+			Assert.assertTrue( msg.contains( "line 5, column 32" ) );
+			Assert.assertTrue( msg.contains( "mismatched input '{' expecting GENERATE"));
+			return;
+		}
         Assert.fail( "Testcase should fail" );
     }
 

Added: pig/trunk/test/org/apache/pig/test/TestNestedForeach.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNestedForeach.java?rev=1154666&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNestedForeach.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestNestedForeach.java Sun Aug  7 07:47:50 2011
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+public class TestNestedForeach {
+	static MiniCluster cluster = MiniCluster.buildCluster();
+
+	private PigServer pig ;
+
+	public TestNestedForeach() throws Throwable {
+		pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()) ;
+	}
+
+	Boolean[] nullFlags = new Boolean[]{ false, true };
+
+	@AfterClass
+	public static void oneTimeTearDown() throws Exception {
+		cluster.shutDown();
+	}
+
+	@Test
+	public void testNestedForeachProj() throws Exception {
+		String[] input = {
+				"1\t2",
+				"2\t7",
+				"1\t3"
+		};
+
+		Util.createInputFile(cluster, "table_nf_proj", input);
+
+		pig.registerQuery("a = load 'table_nf_proj' as (a0:int, a1:int);\n");
+		pig.registerQuery("b = group a by a0;\n");
+		pig.registerQuery("c = foreach b { c1 = foreach a generate a1; generate c1; }\n");
+
+		Iterator<Tuple> iter = pig.openIterator("c");
+		Tuple t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(2),(3)})"));
+
+		t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(7)})"));
+	}
+
+	@Test
+	public void testNestedForeachExpression() throws Exception {
+		String[] input = {
+				"1\t2",
+				"2\t7",
+				"1\t3"
+		};
+
+		Util.createInputFile(cluster, "table_nf_expr", input);
+
+		pig.registerQuery("a = load 'table_nf_expr' as (a0:int, a1:int);\n");
+		pig.registerQuery("b = group a by a0;\n");
+		pig.registerQuery("c = foreach b { c1 = foreach a generate 2 * a1; generate c1; }\n");
+
+		Iterator<Tuple> iter = pig.openIterator("c");
+		Tuple t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(4),(6)})"));
+
+		t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(14)})"));
+	}
+
+	@Test
+	public void testNestedForeachUDF() throws Exception {
+		String[] input = {
+				"1\thello",
+				"2\tpig",
+				"1\tworld"
+		};
+
+		Util.createInputFile(cluster, "table_nf_udf", input);
+
+		pig.registerQuery("a = load 'table_nf_udf' as (a0:int, a1:chararray);\n");
+		pig.registerQuery("b = group a by a0;\n");
+		pig.registerQuery("c = foreach b { c1 = foreach a generate UPPER(a1); generate c1; }\n");
+
+		Iterator<Tuple> iter = pig.openIterator("c");
+		Tuple t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(HELLO),(WORLD)})"));
+
+		t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(PIG)})"));
+	}
+
+	@Test
+	public void testNestedForeachFlatten() throws Exception {
+		String[] input = {
+				"1\thello world pig",
+				"2\thadoop world",
+				"1\thello pig"
+		};
+
+		Util.createInputFile(cluster, "table_nf_flatten", input);
+
+		pig.registerQuery("a = load 'table_nf_flatten' as (a0:int, a1:chararray);\n");
+		pig.registerQuery("b = group a by a0;\n");
+		pig.registerQuery("c = foreach b { c1 = foreach a generate FLATTEN(TOKENIZE(a1)); generate c1; }\n");
+
+		Iterator<Tuple> iter = pig.openIterator("c");
+		Tuple t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(hello),(world),(pig),(hello),(pig)})"));
+
+		t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(hadoop),(world)})"));
+	}
+
+	@Test
+	public void testNestedForeachInnerFilter() throws Exception {
+		String[] input = {
+				"1\t2",
+				"2\t7",
+				"1\t3"
+		};
+
+		Util.createInputFile(cluster, "table_nf_filter", input);
+
+		pig.registerQuery("a = load 'table_nf_filter' as (a0:int, a1:int);\n");
+		pig.registerQuery("b = group a by a0;\n");
+		pig.registerQuery("c = foreach b { " +
+				" c1 = filter a by a1 >= 3; " +
+				" c2 = foreach c1 generate a1; " +
+				" generate c2; " +
+		" }\n");
+
+		Iterator<Tuple> iter = pig.openIterator("c");
+		Tuple t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(3)})"));
+
+		t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(7)})"));
+	}
+
+	@Test
+	public void testNestedForeachInnerOrder() throws Exception {
+		String[] input = {
+				"1\t3",
+				"2\t7",
+				"1\t2"
+		};
+
+		Util.createInputFile(cluster, "table_nf_order", input);
+
+		pig.registerQuery("a = load 'table_nf_order' as (a0:int, a1:int);\n");
+		pig.registerQuery("b = group a by a0;\n");
+		pig.registerQuery("c = foreach b { " +
+				" c1 = order a by a1; " +
+				" c2 = foreach c1 generate a1; " +
+				" generate c2; " +
+		" }\n");
+
+		Iterator<Tuple> iter = pig.openIterator("c");
+		Tuple t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(2),(3)})"));
+
+		t = iter.next();
+		Assert.assertTrue(t.toString().equals("({(7)})"));
+	}
+}