You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/19 18:55:36 UTC
svn commit: r882221 - in /hadoop/pig/trunk: ./
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/parser/ test/org/apache/pig/test/
Author: pradeepkth
Date: Thu Nov 19 17:55:36 2009
New Revision: 882221
URL: http://svn.apache.org/viewvc?rev=882221&view=rev
Log:
PIG-1064: Behaviour of COGROUP with and without schema when using * operator (pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=882221&r1=882220&r2=882221&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Nov 19 17:55:36 2009
@@ -33,6 +33,9 @@
BUG FIXES
+PIG-1064: Behaviour of COGROUP with and without schema when using "*" operator
+(pradeepkth)
+
Release 0.6.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=882221&r1=882220&r2=882221&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Thu Nov 19 17:55:36 2009
@@ -57,7 +57,6 @@
//get the attributes of cogroup that are modified during the trnalsation
MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cg.getGroupByPlans();
-
for(LogicalOperator op: cg.getInputs()) {
ArrayList<LogicalPlan> newGByPlans = new ArrayList<LogicalPlan>();
for(LogicalPlan lp: mapGByPlans.get(op)) {
@@ -70,9 +69,41 @@
newGByPlans.add(lp);
}
}
+
+
mapGByPlans.removeKey(op);
mapGByPlans.put(op, newGByPlans);
}
+
+ // check if after translation none of group by plans in a cogroup
+ // have a project(*) - if they still do it's because the input
+ // for the project(*) did not have a schema - in this case, we should
+ // error out since we could have different number/types of
+ // cogroup keys
+ if(cg.getInputs().size() > 1) { // only for cogroups
+ for(LogicalOperator op: cg.getInputs()) {
+ for(LogicalPlan lp: mapGByPlans.get(op)) {
+ if(checkPlanForProjectStar(lp)) {
+ // not following Error handling guidelines to give error code
+ // and error source since this will get swallowed by the parser
+ // which will just return a ParseException
+ throw new VisitorException("Cogroup/Group by * is only allowed if " +
+ "the input has a schema");
+ }
+ }
+ }
+ // check if after translation all group by plans have same arity
+ int arity = mapGByPlans.get(cg.getInputs().get(0)).size();
+ for(LogicalOperator op: cg.getInputs()) {
+ if(arity != mapGByPlans.get(op).size()) {
+ // not following Error handling guidelines to give error code
+ // and error source since this will get swallowed by the parser
+ // which will just return a ParseException
+ throw new VisitorException("The arity of cogroup/group by columns " +
+ "do not match");
+ }
+ }
+ }
}
/* (non-Javadoc)
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=882221&r1=882220&r2=882221&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Nov 19 17:55:36 2009
@@ -1029,19 +1029,19 @@
)
{
if(null != root) {
- log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
-
- //Translate all the project(*) leaves in the plan to a sequence of projections
- ProjectStarTranslator translate = new ProjectStarTranslator(lp);
- translate.visit();
-
- addLogicalPlan(root, lp);
-
try {
- log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
+ log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
+
+ //Translate all the project(*) leaves in the plan to a sequence of projections
+ ProjectStarTranslator translate = new ProjectStarTranslator(lp);
+ translate.visit();
+
+ addLogicalPlan(root, lp);
+
+ log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
} catch(FrontendException fee) {
- ParseException pe = new ParseException(fee.getMessage());
- pe.initCause(fee);
+ ParseException pe = new ParseException(fee.getMessage());
+ pe.initCause(fee);
throw pe;
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=882221&r1=882220&r2=882221&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu Nov 19 17:55:36 2009
@@ -291,7 +291,7 @@
@Test
public void testQuery22Fail() {
- buildPlan("A = load 'a';");
+ buildPlan("A = load 'a' as (a:int, b: double);");
try {
buildPlan("B = group A by (*, $0);");
} catch (AssertionFailedError e) {
@@ -323,15 +323,50 @@
@Test
public void testQuery23Fail() {
+ buildPlan("A = load 'a' as (a: int, b:double);");
+ buildPlan("B = load 'b';");
+ boolean exceptionThrown = false;
+ try {
+ buildPlan("C = cogroup A by (*, $0), B by ($0, $1);");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+ "do not match"));
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void testQuery23Fail2() {
buildPlan("A = load 'a';");
buildPlan("B = load 'b';");
+ boolean exceptionThrown = false;
try {
- buildPlan("C = group A by (*, $0), B by ($0, $1);");
+ buildPlan("C = cogroup A by (*, $0), B by ($0, $1);");
} catch (AssertionFailedError e) {
- assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
+ assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+ "the input has a schema"));
+ exceptionThrown = true;
}
+ assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void testQuery23Fail3() {
+ buildPlan("A = load 'a' as (a: int, b:double);");
+ buildPlan("B = load 'b' as (a:int);");
+ boolean exceptionThrown = false;
+ try {
+ buildPlan("C = cogroup A by *, B by *;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+ "do not match"));
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
}
+
@Test
public void testQuery24() {
buildPlan("a = load 'a';");
@@ -1642,7 +1677,7 @@
}
@Test
- public void testQuery110() throws FrontendException, ParseException {
+ public void testQuery110Fail() throws FrontendException, ParseException {
LogicalPlan lp;
LOLoad load;
LOCogroup cogroup;
@@ -1651,13 +1686,16 @@
lp = buildPlan("b = load 'two';");
load = (LOLoad) lp.getLeaves().get(0);
-
+ boolean exceptionThrown = false;
+ try{
lp = buildPlan("c = cogroup a by $0, b by *;");
- cogroup = (LOCogroup) lp.getLeaves().get(0);
-
- MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cogroup.getGroupByPlans();
- LogicalPlan cogroupPlan = (LogicalPlan)(mapGByPlans.get(load).toArray())[0];
- assertTrue(checkPlanForProjectStar(cogroupPlan) == true);
+ } catch(AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+ "the input has a schema"));
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+
}
@@ -2102,6 +2140,37 @@
fail();
}
+ @Test
+ public void testCogroupByStarFailure1() {
+ boolean exceptionThrown = false;
+ try {
+ buildPlan(" a = load '1.txt' as (a0:int, a1:int);");
+ buildPlan(" b = load '2.txt'; ");
+ buildPlan("c = cogroup a by *, b by *;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+ "the input has a schema"));
+ exceptionThrown = true;
+ }
+ assertEquals("An exception was expected but did " +
+ "not occur", true, exceptionThrown);
+ }
+
+ @Test
+ public void testCogroupByStarFailure2() {
+ boolean exceptionThrown = false;
+ try {
+ buildPlan(" a = load '1.txt' ;");
+ buildPlan(" b = load '2.txt' as (b0:int, b1:int); ");
+ buildPlan("c = cogroup a by *, b by *;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+ "the input has a schema"));
+ exceptionThrown = true;
+ }
+ assertEquals("An exception was expected but did " +
+ "not occur", true, exceptionThrown);
+ }
private void printPlan(LogicalPlan lp) {
LOPrinter graphPrinter = new LOPrinter(System.err, lp);
System.err.println("Printing the logical plan");
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=882221&r1=882220&r2=882221&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java Thu Nov 19 17:55:36 2009
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.ArrayList;
+import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.pig.EvalFunc;
@@ -49,7 +50,18 @@
public class TestTypeCheckingValidator extends TestCase {
- LogicalPlanTester planTester = new LogicalPlanTester() ;
+ LogicalPlanTester planTester;
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#setUp()
+ */
+ @Override
+ protected void setUp() throws Exception {
+ // create a new instance of the plan tester
+ // for each test so that different tests do not
+ // interact with each other's plans
+ planTester = new LogicalPlanTester() ;
+ }
private static final String simpleEchoStreamingCommand;
static {
@@ -3287,77 +3299,19 @@
}
@Test
- public void testCogroupStarLineageNoSchema() throws Throwable {
- planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
- planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
- planTester.buildPlan("c = cogroup a by *, b by * ;") ;
- planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
- LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, $1 + 1, $2 + 2.0;") ;
-
- // validate
- CompilationMessageCollector collector = new CompilationMessageCollector() ;
- TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
- try {
- typeValidator.validate(plan, collector) ;
- }
- catch (PlanValidationException pve) {
- //not good
- }
-
- printMessageCollector(collector) ;
- printTypeGraph(plan) ;
- planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
-
- if (collector.hasError()) {
- throw new AssertionError("Expect no error") ;
- }
-
-
- LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
- LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
-
- LogicalOperator exOp = foreachPlan.getRoots().get(0);
-
- if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
-
- LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
- assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("BinStorage"));
-
- foreachPlan = foreach.getForEachPlans().get(2);
- exOp = foreachPlan.getRoots().get(0);
- if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
- cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
- assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
-
- }
-
- @Test
public void testCogroupStarLineageNoSchemaFail() throws Throwable {
planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
- planTester.buildPlan("c = cogroup a by *, b by * ;") ;
- planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
- LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, $1 + 1, $2 + 2.0;") ;
-
- // validate
- CompilationMessageCollector collector = new CompilationMessageCollector() ;
- TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ boolean exceptionThrown = false;
try {
- typeValidator.validate(plan, collector) ;
- fail("Exception expected") ;
- }
- catch (PlanValidationException pve) {
- //not good
- }
-
- printMessageCollector(collector) ;
- printTypeGraph(plan) ;
- planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
-
- if (!collector.hasError()) {
- throw new AssertionError("Expect error") ;
+ LogicalPlan lp = planTester.buildPlan("c = cogroup a by *, b by *;");
+ } catch(AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+ "the input has a schema"));
+ exceptionThrown = true;
}
-
+ assertTrue(exceptionThrown);
+
}
@Test