You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/04/08 21:59:25 UTC
svn commit: r1465757 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/
Author: daijy
Date: Mon Apr 8 19:59:24 2013
New Revision: 1465757
URL: http://svn.apache.org/r1465757
Log:
PIG-2265: Test case TestSecondarySort failure
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
pig/trunk/test/org/apache/pig/test/TestSecondarySort.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1465757&r1=1465756&r2=1465757&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 8 19:59:24 2013
@@ -154,6 +154,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-2265: Test case TestSecondarySort failure (daijy)
+
PIG-3060: FLATTEN in nested foreach fails when the input contains an empty bag (daijy)
PIG-2247: Pig parser does not detect multiple arguments with the same name passed to macro (dreambird via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1465757&r1=1465756&r2=1465757&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Apr 8 19:59:24 2013
@@ -2396,7 +2396,7 @@ public class MRCompiler extends PhyPlanV
prj.setResultType(DataType.TUPLE);
ep.add(prj);
eps1.add(ep);
- flat1.add(true);
+ flat1.add(false);
} else {
for (Pair<POProject, Byte> sortProj : sortProjs) {
// Check for proj being null, null is used by getSortCols for a non POProject
@@ -2419,7 +2419,7 @@ public class MRCompiler extends PhyPlanV
}
ep.add(prj);
eps1.add(ep);
- flat1.add(true);
+ flat1.add(false);
}
}
}else{
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java?rev=1465757&r1=1465756&r2=1465757&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java Mon Apr 8 19:59:24 2013
@@ -481,10 +481,10 @@ public class SecondaryKeyOptimizer exten
sawInvalidPhysicalOper = processSort((POSort)currentNode);
else if (currentNode instanceof POProject)
sawInvalidPhysicalOper = processProject((POProject)currentNode);
- else if (currentNode instanceof POForEach)
- sawInvalidPhysicalOper = processForEach((POForEach)currentNode);
else if (currentNode instanceof POUserFunc ||
- currentNode instanceof POUnion)
+ currentNode instanceof POUnion ||
+ // We don't process foreach, since foreach is too complex to get right
+ currentNode instanceof POForEach)
break;
if (sawInvalidPhysicalOper)
@@ -543,25 +543,6 @@ public class SecondaryKeyOptimizer exten
return false;
}
- // Accumulate column info from nested project
- public boolean processForEach(POForEach fe) throws FrontendException {
- if (fe.getInputPlans().size() > 1) {
- // We don't optimize the case when POForEach has more than 1 input plan
- return true;
- }
- boolean r = false;
- try {
- r = collectColumnChain(fe.getInputPlans().get(0),
- columnChainInfo);
- } catch (PlanException e) {
- int errorCode = 2205;
- throw new FrontendException("Error visiting POForEach inner plan",
- errorCode, e);
- }
- // See something other than POProject in POForEach, set the flag to stop further processing
- return r;
- }
-
// We see POSort, check which key it is using
public boolean processSort(POSort sort) throws FrontendException{
SortKeyInfo keyInfo = new SortKeyInfo();
Modified: pig/trunk/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSecondarySort.java?rev=1465757&r1=1465756&r2=1465757&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSecondarySort.java Mon Apr 8 19:59:24 2013
@@ -68,27 +68,8 @@ public class TestSecondarySort {
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
}
-// @Test // Currently failing due to PIG-2009
-// public void testDistinctOptimization1() throws Exception {
-// // Limit in the foreach plan
-// String query = ("A=LOAD 'input1' AS (a0, a1, a2);"+
-// "B = LOAD 'input2' AS (b0, b1, b2);" +
-// "C = cogroup A by a0, B by b0;" +
-// "D = foreach C { E = limit A 10; F = E.a1; G = DISTINCT F; generate group, COUNT(G);};" +
-// "store D into 'output';");
-// PhysicalPlan pp = Util.buildPp(pigServer, query);
-// MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-//
-// SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
-// so.visit();
-//
-// assertEquals( 1, so.getNumMRUseSecondaryKey() );
-// assertTrue(so.getNumSortRemoved() == 0);
-// assertTrue(so.getDistinctChanged() == 1);
-// }
-
@Test
- public void testDistinctOptimization2() throws Exception {
+ public void testDistinctOptimization1() throws Exception {
// Distinct on one entire input
String query = ("A=LOAD 'input1' AS (a0, a1, a2);"+
"B = group A by $0;"+
@@ -107,139 +88,6 @@ public class TestSecondarySort {
}
@Test
- public void testDistinctOptimization3() throws Exception {
- // Distinct on the prefix of main sort key
- String query = ("A=LOAD 'input1' AS (a0, a1, a2);"+
- "B = group A by $0;"+
- "C = foreach B { D = A.a0; E = distinct D; generate group, E;};"+
-
- "store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertEquals(0, so.getNumMRUseSecondaryKey());
- assertEquals(0, so.getNumSortRemoved());
- assertEquals(1, so.getDistinctChanged());
- }
-
- @Test
- public void testDistinctOptimization4() throws Exception {
- // Distinct on secondary key again, should remove
- String query = ("A=LOAD 'input1' AS (a0, a1, a2);"+
- "B = group A by $0;"+
- "C = foreach B { D = A.a1; E = distinct D; F = distinct E; generate group, F;};"+
-
- "store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertEquals(1, so.getNumMRUseSecondaryKey());
- assertEquals(0, so.getNumSortRemoved());
- assertEquals(2, so.getDistinctChanged());
- }
-
- @Test
- public void testDistinctOptimization5() throws Exception {
- // Filter in foreach plan
- String query = ("A=LOAD 'input1' AS (a0, a1, a2);" +
- "B = group A by $0;" +
- "C = foreach B { D = A.a1; E = distinct D; F = filter E by $0=='1'; generate group, F;};" +
-
- "store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertTrue(so.getNumMRUseSecondaryKey() == 1);
- assertTrue(so.getNumSortRemoved() == 0);
- assertTrue(so.getDistinctChanged() == 1);
- }
-
- @Test
- public void testDistinctOptimization6() throws Exception {
- // group by * with no schema, and distinct key is not part of main key
- String query = ("A=LOAD 'input1';" +
- "B = group A by *;" +
- "C = foreach B { D = limit A 10; E = D.$1; F = DISTINCT E; generate group, COUNT(F);};" +
-
- "store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertEquals(1, so.getNumMRUseSecondaryKey());
- assertEquals(0, so.getNumSortRemoved());
- assertEquals(1, so.getDistinctChanged());
- }
-
- @Test
- public void testDistinctOptimization7() throws Exception {
- // group by * with no schema, distinct key is more specific than the main key
- String query = ("A=LOAD 'input1';" +
- "B = group A by *;" +
- "C = foreach B { D = limit A 10; E = D.$0; F = DISTINCT E; generate group, COUNT(F);};" +
-
- "store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertEquals(1, so.getNumMRUseSecondaryKey());
- assertEquals(0, so.getNumSortRemoved());
- assertEquals(1, so.getDistinctChanged());
- }
-
- @Test
- public void testDistinctOptimization8() throws Exception {
- // local arrange plan is an expression
- String query = ("A=LOAD 'input1' AS (a0, a1, a2);" +
- "B = group A by $0+$1;" +
- "C = foreach B { D = limit A 10; E = D.$0; F = DISTINCT E; generate group, COUNT(F);};" +
-
- "store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertEquals(1, so.getNumMRUseSecondaryKey());
- assertEquals(0, so.getNumSortRemoved());
- assertEquals(1, so.getDistinctChanged());
- }
-
- @Test
- public void testDistinctOptimization9() throws Exception {
- // local arrange plan is nested project
- String query = ("A=LOAD 'input1' as (a:tuple(a0:int, a1:chararray));" +
- "B = group A by a.a1;" +
- "C = foreach B { D = A.a; E = DISTINCT D; generate group, COUNT(E);};" +
-
- "store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertEquals(1, so.getNumMRUseSecondaryKey());
- assertEquals(0, so.getNumSortRemoved());
- assertEquals(1, so.getDistinctChanged());
- }
-
- @Test
public void testSortOptimization1() throws Exception {
// Sort on something other than the main key
String query = ("A=LOAD 'input1' AS (a0, a1, a2);" +
@@ -516,7 +364,7 @@ public class TestSecondarySort {
Util.deleteFile(cluster, clusterPath);
}
-// @Test
+ @Test
public void testNestedSortEndToEnd3() throws Exception {
File tmpFile1 = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));