You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/19 18:55:36 UTC

svn commit: r882221 - in /hadoop/pig/trunk: ./ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ test/org/apache/pig/test/

Author: pradeepkth
Date: Thu Nov 19 17:55:36 2009
New Revision: 882221

URL: http://svn.apache.org/viewvc?rev=882221&view=rev
Log:
PIG-1064: Behaviour of COGROUP with and without schema when using * operator (pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=882221&r1=882220&r2=882221&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Nov 19 17:55:36 2009
@@ -33,6 +33,9 @@
 
 BUG FIXES
 
+PIG-1064: Behaviour of COGROUP with and without schema when using "*" operator
+(pradeepkth)
+
 Release 0.6.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=882221&r1=882220&r2=882221&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Thu Nov 19 17:55:36 2009
@@ -57,7 +57,6 @@
         //get the attributes of cogroup that are modified during the trnalsation
         
         MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cg.getGroupByPlans();
-
         for(LogicalOperator op: cg.getInputs()) {
             ArrayList<LogicalPlan> newGByPlans = new ArrayList<LogicalPlan>();
             for(LogicalPlan lp: mapGByPlans.get(op)) {
@@ -70,9 +69,41 @@
                     newGByPlans.add(lp);
                 }
             }
+            
+            
             mapGByPlans.removeKey(op);
             mapGByPlans.put(op, newGByPlans);
         }
+        
+        // check if after translation none of group by plans in a cogroup
+        // have a project(*) - if they still do it's because the input
+        // for the project(*) did not have a schema - in this case, we should
+        // error out since we could have different number/types of 
+        // cogroup keys
+        if(cg.getInputs().size() > 1) { // only for cogroups
+            for(LogicalOperator op: cg.getInputs()) {
+                for(LogicalPlan lp: mapGByPlans.get(op)) {
+                    if(checkPlanForProjectStar(lp)) {
+                        // not following Error handling guidelines to give error code
+                        // and error source since this will get swallowed by the parser
+                        // which will just return a ParseException
+                        throw new VisitorException("Cogroup/Group by * is only allowed if " +
+                        		"the input has a schema");
+                    }
+                }
+            }
+            // check if after translation all group by plans have same arity
+            int arity = mapGByPlans.get(cg.getInputs().get(0)).size();
+            for(LogicalOperator op: cg.getInputs()) {
+                if(arity != mapGByPlans.get(op).size()) {
+                    // not following Error handling guidelines to give error code
+                    // and error source since this will get swallowed by the parser
+                    // which will just return a ParseException
+                    throw new VisitorException("The arity of cogroup/group by columns " +
+                    		"do not match");
+                }
+            }
+        }
     }
     
     /* (non-Javadoc)

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=882221&r1=882220&r2=882221&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Nov 19 17:55:36 2009
@@ -1029,19 +1029,19 @@
 	)
 	{ 
 		if(null != root) {
-            log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
-
-            //Translate all the project(*) leaves in the plan to a sequence of projections
-            ProjectStarTranslator translate = new ProjectStarTranslator(lp);
-            translate.visit();
-
-            addLogicalPlan(root, lp);
-
             try {
-			    log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
+                log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
+    
+                //Translate all the project(*) leaves in the plan to a sequence of projections
+                ProjectStarTranslator translate = new ProjectStarTranslator(lp);
+                translate.visit();
+    
+                addLogicalPlan(root, lp);
+            
+                log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
             } catch(FrontendException fee) {
-            	ParseException pe = new ParseException(fee.getMessage());
-            	pe.initCause(fee);  
+                ParseException pe = new ParseException(fee.getMessage());
+                pe.initCause(fee);  
                 throw pe;
             }
 		}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=882221&r1=882220&r2=882221&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu Nov 19 17:55:36 2009
@@ -291,7 +291,7 @@
     
     @Test
     public void testQuery22Fail() {
-        buildPlan("A = load 'a';");
+        buildPlan("A = load 'a' as (a:int, b: double);");
         try {
             buildPlan("B = group A by (*, $0);");
         } catch (AssertionFailedError e) {
@@ -323,15 +323,50 @@
 
     @Test
     public void testQuery23Fail() {
+        buildPlan("A = load 'a' as (a: int, b:double);");
+        buildPlan("B = load 'b';");
+        boolean exceptionThrown = false;
+        try {
+            buildPlan("C = cogroup A by (*, $0), B by ($0, $1);");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+                        "do not match"));
+            exceptionThrown = true;
+        }
+        assertTrue(exceptionThrown);
+    }
+
+    @Test
+    public void testQuery23Fail2() {
         buildPlan("A = load 'a';");
         buildPlan("B = load 'b';");
+        boolean exceptionThrown = false;
         try {
-            buildPlan("C = group A by (*, $0), B by ($0, $1);");
+            buildPlan("C = cogroup A by (*, $0), B by ($0, $1);");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
+            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+            "the input has a schema"));
+            exceptionThrown = true;
         }
+        assertTrue(exceptionThrown);
+    }
+    
+    @Test
+    public void testQuery23Fail3() {
+        buildPlan("A = load 'a' as (a: int, b:double);");
+        buildPlan("B = load 'b' as (a:int);");
+        boolean exceptionThrown = false;
+        try {
+            buildPlan("C = cogroup A by *, B by *;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+                        "do not match"));
+            exceptionThrown = true;
+        }
+        assertTrue(exceptionThrown);
     }
 
+    
     @Test
     public void testQuery24() {
         buildPlan("a = load 'a';");
@@ -1642,7 +1677,7 @@
     }
 
     @Test
-    public void testQuery110()  throws FrontendException, ParseException {
+    public void testQuery110Fail()  throws FrontendException, ParseException {
         LogicalPlan lp;
         LOLoad load;
         LOCogroup cogroup;
@@ -1651,13 +1686,16 @@
         lp = buildPlan("b = load 'two';");
 
         load = (LOLoad) lp.getLeaves().get(0);
-
+        boolean exceptionThrown = false;
+        try{
         lp = buildPlan("c = cogroup a by $0, b by *;");
-        cogroup = (LOCogroup) lp.getLeaves().get(0);
-
-        MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cogroup.getGroupByPlans();
-        LogicalPlan cogroupPlan = (LogicalPlan)(mapGByPlans.get(load).toArray())[0];
-        assertTrue(checkPlanForProjectStar(cogroupPlan) == true);
+        } catch(AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+                    "the input has a schema"));
+            exceptionThrown = true;
+        }
+        assertTrue(exceptionThrown);
+        
 
     }
 
@@ -2102,6 +2140,37 @@
         fail();
     }
 
+    @Test
+    public void testCogroupByStarFailure1() {
+        boolean exceptionThrown = false;
+        try {
+            buildPlan(" a = load '1.txt' as (a0:int, a1:int);");
+            buildPlan(" b = load '2.txt'; ");
+            buildPlan("c = cogroup a by *, b by *;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+                    "the input has a schema"));
+            exceptionThrown = true;
+        }
+        assertEquals("An exception was expected but did " +
+                "not occur", true, exceptionThrown);
+    }
+
+    @Test
+    public void testCogroupByStarFailure2() {
+        boolean exceptionThrown = false;
+        try {
+            buildPlan(" a = load '1.txt' ;");
+            buildPlan(" b = load '2.txt' as (b0:int, b1:int); ");
+            buildPlan("c = cogroup a by *, b by *;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+            "the input has a schema"));
+            exceptionThrown = true;
+        }
+        assertEquals("An exception was expected but did " +
+                "not occur", true, exceptionThrown);
+    }
     private void printPlan(LogicalPlan lp) {
         LOPrinter graphPrinter = new LOPrinter(System.err, lp);
         System.err.println("Printing the logical plan");

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=882221&r1=882220&r2=882221&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java Thu Nov 19 17:55:36 2009
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.ArrayList;
 
+import junit.framework.AssertionFailedError;
 import junit.framework.TestCase;
 
 import org.apache.pig.EvalFunc;
@@ -49,7 +50,18 @@
 
 public class TestTypeCheckingValidator extends TestCase {
 
-    LogicalPlanTester planTester = new LogicalPlanTester() ;
+    LogicalPlanTester planTester;
+    
+    /* (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+    @Override
+    protected void setUp() throws Exception {
+        // create a new instance of the plan tester
+        // for each test so that different tests do not
+        // interact with each other's plans
+        planTester = new LogicalPlanTester() ;
+    }
     
 	private static final String simpleEchoStreamingCommand;
         static {
@@ -3287,77 +3299,19 @@
     }
 
     @Test
-    public void testCogroupStarLineageNoSchema() throws Throwable {
-        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
-        planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
-        planTester.buildPlan("c = cogroup a by *, b by * ;") ;
-        planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
-        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, $1 + 1, $2 + 2.0;") ;
-
-        // validate
-        CompilationMessageCollector collector = new CompilationMessageCollector() ;
-        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
-        try {
-            typeValidator.validate(plan, collector) ;
-        }
-        catch (PlanValidationException pve) {
-            //not good
-        }
-
-        printMessageCollector(collector) ;
-        printTypeGraph(plan) ;
-        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
-
-        if (collector.hasError()) {
-            throw new AssertionError("Expect no  error") ;
-        }
-
-
-        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
-        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
-
-        LogicalOperator exOp = foreachPlan.getRoots().get(0);
-
-        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
-
-        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
-        assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("BinStorage"));
-
-        foreachPlan = foreach.getForEachPlans().get(2);
-        exOp = foreachPlan.getRoots().get(0);
-        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
-        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
-        assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
-
-    }
-
-    @Test
     public void testCogroupStarLineageNoSchemaFail() throws Throwable {
         planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
         planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
-        planTester.buildPlan("c = cogroup a by *, b by * ;") ;
-        planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
-        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, $1 + 1, $2 + 2.0;") ;
-
-        // validate
-        CompilationMessageCollector collector = new CompilationMessageCollector() ;
-        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        boolean exceptionThrown = false;
         try {
-            typeValidator.validate(plan, collector) ;
-            fail("Exception expected") ;
-        }
-        catch (PlanValidationException pve) {
-            //not good
-        }
-
-        printMessageCollector(collector) ;
-        printTypeGraph(plan) ;
-        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
-
-        if (!collector.hasError()) {
-            throw new AssertionError("Expect error") ;
+            LogicalPlan lp = planTester.buildPlan("c = cogroup a by *, b by *;");
+        } catch(AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+            "the input has a schema"));
+            exceptionThrown = true;
         }
-
+        assertTrue(exceptionThrown);
+        
     }
 
     @Test