You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/02/10 19:11:46 UTC
svn commit: r1729682 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/
test/e2e/pig/tests/ test/org/apache/pig/test/data/GoldenFiles/tez/
test/org/apache/pig/tez/
Author: rohini
Date: Wed Feb 10 18:11:46 2016
New Revision: 1729682
URL: http://svn.apache.org/viewvc?rev=1729682&view=rev
Log:
PIG-4790: Join after union fail due to UnionOptimizer (rohini)
Added:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/trunk/test/e2e/pig/tests/multiquery.conf
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld
pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1729682&r1=1729681&r2=1729682&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Feb 10 18:11:46 2016
@@ -89,6 +89,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4790: Join after union fail due to UnionOptimizer (rohini)
+
PIG-4686: Backend code should not call AvroStorageUtils.getPaths (mitdesai via rohini)
PIG-4795: Flushing ObjectOutputStream before calling toByteArray on the underlying ByteArrayOutputStream (emopers via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1729682&r1=1729681&r2=1729682&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Wed Feb 10 18:11:46 2016
@@ -167,6 +167,19 @@ public class UnionOptimizer extends TezO
List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null ? null
: new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
+
+ if (successors != null && uniqueUnionMembers.size() > 1) {
+ for (TezOperator succ : successors) {
+ for (TezOperator pred : predecessors) {
+ if (succ.inEdges.containsKey(pred.getOperatorKey())) {
+ // Stop here, we cannot convert the node into vertex group
+ // Otherwise, we will end up with a parallel edge between pred
+ // and succ
+ return;
+ }
+ }
+ }
+ }
if (predecessors.size() > unionOp.getUnionMembers().size()
&& uniqueUnionMembers.size() != 1) {
return; // TODO: PIG-3856
@@ -356,6 +369,7 @@ public class UnionOptimizer extends TezO
throws PlanException, VisitorException {
String unionOpKey = unionOp.getOperatorKey().toString();
String splitPredOpKey = splitPredOp.getOperatorKey().toString();
+ List<TezOperator> splitSuccessors = tezPlan.getSuccessors(splitPredOp);
if (successors != null) {
for (TezOperator succ : successors) {
TezOperator successorVertexGroup = null;
@@ -389,7 +403,10 @@ public class UnionOptimizer extends TezO
//Only splitPredOp is member of the vertex group. Get rid of the vertex group
removeSuccessorVertexGroup = true;
} else {
- tezPlan.connect(splitPredOp, successorVertexGroup);
+ // Avoid connecting multiple times in case of union + self join
+ if (splitSuccessors == null || !splitSuccessors.contains(successorVertexGroup)) {
+ tezPlan.connect(splitPredOp, successorVertexGroup);
+ }
}
} else {
actualSuccs.add(succ);
@@ -423,7 +440,10 @@ public class UnionOptimizer extends TezO
// to SplitOp -> Successor
tezPlan.disconnect(unionOp, actualSucc);
}
- tezPlan.connect(splitPredOp, actualSucc);
+ // Avoid connecting multiple times in case of union + self join
+ if (splitSuccessors == null || !splitSuccessors.contains(actualSucc)) {
+ tezPlan.connect(splitPredOp, actualSucc);
+ }
}
}
}
Modified: pig/trunk/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/multiquery.conf?rev=1729682&r1=1729681&r2=1729682&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/trunk/test/e2e/pig/tests/multiquery.conf Wed Feb 10 18:11:46 2016
@@ -821,6 +821,45 @@ a2 = filter a by gpa == 4.00;
b = union a1, a2;
c = JOIN a by name, b by name using 'replicated';
store c into ':OUTPATH:';\,
+ },
+ {
+ # Self join union
+ 'num' => 10,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+a1 = filter a by gpa == 0.00;
+a2 = filter a by gpa == 4.00;
+b = union a1, a2;
+c = JOIN a by name left, b by name;
+store c into ':OUTPATH:';\,
+ },
+ {
+ # Complex self join
+ 'num' => 11,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
+SPLIT a INTO b IF age > 40,
+ c IF age <= 40;
+
+d = FOREACH c GENERATE name, age, gpa;
+
+e = FILTER d BY gpa > 3;
+f = FILTER d BY gpa <= 3;
+
+g = JOIN e BY name LEFT, f BY name;
+h = FOREACH g GENERATE e::name as name, e::age as age, e::gpa as gpa;
+
+i = DISTINCT h;
+
+j = FILTER b BY gpa > 3;
+k = FILTER b by gpa <= 3;
+
+l = JOIN j BY name LEFT, k BY name;
+m = FOREACH l generate j::name as name, j::age as age, j::gpa as gpa;
+n = DISTINCT m;
+
+m = UNION e, i, j, n;
+
+n = JOIN a BY name, m BY name;
+store n into ':OUTPATH:';\,
}
] # end of tests
},
Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld?rev=1729682&r1=1729681&r2=1729682&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld (original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld Wed Feb 10 18:11:46 2016
@@ -4,106 +4,98 @@
#--------------------------------------------------
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
-Tez vertex scope-61 -> Tez vertex scope-65,Tez vertex scope-69,
-Tez vertex scope-65 -> Tez vertex scope-69,
-Tez vertex scope-57 -> Tez vertex scope-67,Tez vertex scope-69,
-Tez vertex scope-67 -> Tez vertex scope-69,
-Tez vertex scope-69
+Tez vertex scope-57 -> Tez vertex scope-60,
+Tez vertex scope-53 -> Tez vertex scope-58,Tez vertex scope-60,
+Tez vertex scope-58 -> Tez vertex scope-60,
+Tez vertex scope-60
-Tez vertex scope-61
+Tez vertex scope-57
# Plan on vertex
-b: Split - scope-72
+c: Local Rearrange[tuple]{tuple}(false) - scope-26 -> scope-60
| |
-| POValueOutputTez - scope-71 -> [scope-69]
+| Project[int][0] - scope-27
| |
-| POValueOutputTez - scope-62 -> [scope-65]
+| Project[int][1] - scope-28
|
-|---b: New For Each(false,false)[bag] - scope-7
+|---c: New For Each(true,true)[tuple] - scope-25
| |
- | Cast[int] - scope-2
+ | POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-23
| |
- | |---Project[bytearray][0] - scope-1
+ | |---Constant(2) - scope-21
| |
- | Cast[int] - scope-5
+ | |---Constant(0) - scope-22
| |
- | |---Project[bytearray][1] - scope-4
+ | Project[tuple][*] - scope-24
|
- |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex scope-65
+ |---b: New For Each(false)[bag] - scope-4
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-53
# Plan on vertex
-c: Local Rearrange[tuple]{tuple}(false) - scope-29 -> scope-69
+a: Split - scope-62
| |
-| Project[int][0] - scope-30
+| POValueOutputTez - scope-61 -> [scope-60]
| |
-| Project[int][1] - scope-31
+| POValueOutputTez - scope-54 -> [scope-58]
|
-|---c: New For Each(true,true)[tuple] - scope-28
+|---a: New For Each(false,false)[bag] - scope-12
| |
- | POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-26
+ | Cast[int] - scope-7
| |
- | |---Constant(2) - scope-24
+ | |---Project[bytearray][0] - scope-6
| |
- | |---Constant(0) - scope-25
+ | Cast[int] - scope-10
| |
- | Project[tuple][*] - scope-27
+ | |---Project[bytearray][1] - scope-9
|
- |---POValueInputTez - scope-66 <- scope-61
-Tez vertex scope-57
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-5
+Tez vertex scope-58
# Plan on vertex
-a: Split - scope-73
+c: Local Rearrange[tuple]{tuple}(false) - scope-34 -> scope-60
| |
-| POValueOutputTez - scope-70 -> [scope-69]
+| Project[int][0] - scope-35
| |
-| POValueOutputTez - scope-58 -> [scope-67]
+| Project[int][1] - scope-36
|
-|---a: New For Each(false,false)[bag] - scope-18
+|---c: New For Each(true,true)[tuple] - scope-33
| |
- | Cast[int] - scope-13
+ | POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-31
| |
- | |---Project[bytearray][0] - scope-12
+ | |---Constant(2) - scope-29
| |
- | Cast[int] - scope-16
+ | |---Constant(1) - scope-30
| |
- | |---Project[bytearray][1] - scope-15
+ | Project[tuple][*] - scope-32
|
- |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-11
-Tez vertex scope-67
+ |---a: New For Each(false)[bag] - scope-16
+ | |
+ | Project[int][0] - scope-14
+ |
+ |---POValueInputTez - scope-59 <- scope-53
+Tez vertex scope-60
# Plan on vertex
-c: Local Rearrange[tuple]{tuple}(false) - scope-37 -> scope-69
-| |
-| Project[int][0] - scope-38
-| |
-| Project[int][1] - scope-39
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-52
|
-|---c: New For Each(true,true)[tuple] - scope-36
- | |
- | POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-34
- | |
- | |---Constant(2) - scope-32
- | |
- | |---Constant(1) - scope-33
+|---d: New For Each(false,false,false)[bag] - scope-51
| |
- | Project[tuple][*] - scope-35
- |
- |---POValueInputTez - scope-68 <- scope-57
-Tez vertex scope-69
-# Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-56
-|
-|---d: New For Each(false,false)[bag] - scope-55
+ | POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-45
| |
- | POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-51
+ | |---Constant(0) - scope-43
| |
- | |---Constant(0) - scope-49
+ | POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-48
| |
- | POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-54
+ | |---Constant(1) - scope-46
| |
- | |---Constant(1) - scope-52
+ | Project[int][0] - scope-49
|
- |---c: New For Each(true,true)[tuple] - scope-42
+ |---c: New For Each(true,true)[tuple] - scope-39
| |
- | Project[bag][1] - scope-40
+ | Project[bag][1] - scope-37
| |
- | Project[bag][2] - scope-41
+ | Project[bag][2] - scope-38
|
- |---Package(Packager)[tuple]{tuple} - scope-23
+ |---Package(Packager)[tuple]{tuple} - scope-20
Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld?rev=1729682&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld Wed Feb 10 18:11:46 2016
@@ -0,0 +1,63 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-36 -> Tez vertex scope-48,
+Tez vertex scope-48
+
+Tez vertex scope-36
+# Plan on vertex
+a: Split - scope-49
+| |
+| c: Local Rearrange[tuple]{int}(false) - scope-50 -> scope-48
+| | |
+| | Project[int][0] - scope-51
+| |
+| |---a1: Filter[bag] - scope-11
+| | |
+| | Greater Than[boolean] - scope-14
+| | |
+| | |---Project[int][0] - scope-12
+| | |
+| | |---Constant(5) - scope-13
+| |
+| c: Local Rearrange[tuple]{int}(false) - scope-52 -> scope-48
+| | |
+| | Project[int][0] - scope-53
+| |
+| |---a2: Filter[bag] - scope-17
+| | |
+| | Less Than[boolean] - scope-20
+| | |
+| | |---Project[int][0] - scope-18
+| | |
+| | |---Constant(2) - scope-19
+| |
+| c: Local Rearrange[tuple]{int}(false) - scope-30 -> scope-48
+| | |
+| | Project[int][0] - scope-31
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-48
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-35
+|
+|---c: New For Each(true,true)[tuple] - scope-34
+ | |
+ | Project[bag][1] - scope-32
+ | |
+ | Project[bag][2] - scope-33
+ |
+ |---c: Package(Packager)[tuple]{int} - scope-27
Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld?rev=1729682&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld Wed Feb 10 18:11:46 2016
@@ -0,0 +1,99 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-58 -> Tez vertex scope-66,Tez vertex scope-67,Tez vertex scope-73,
+Tez vertex scope-66 -> Tez vertex scope-67,
+Tez vertex scope-67 -> Tez vertex scope-73,
+Tez vertex scope-73
+
+Tez vertex scope-58
+# Plan on vertex
+a: Split - scope-74
+| |
+| POValueOutputTez - scope-69 -> [scope-67]
+| |
+| |---a1: Filter[bag] - scope-11
+| | |
+| | Greater Than[boolean] - scope-14
+| | |
+| | |---Project[int][0] - scope-12
+| | |
+| | |---Constant(5) - scope-13
+| |
+| a4: Local Rearrange[tuple]{int}(false) - scope-31 -> scope-66
+| | |
+| | Project[int][0] - scope-32
+| |
+| |---a2: Filter[bag] - scope-17
+| | |
+| | Less Than[boolean] - scope-20
+| | |
+| | |---Project[int][0] - scope-18
+| | |
+| | |---Constant(2) - scope-19
+| |
+| a4: Local Rearrange[tuple]{int}(false) - scope-33 -> scope-66
+| | |
+| | Project[int][0] - scope-34
+| |
+| |---a3: Filter[bag] - scope-23
+| | |
+| | Equal To[boolean] - scope-26
+| | |
+| | |---Project[int][1] - scope-24
+| | |
+| | |---Constant(10) - scope-25
+| |
+| c: Local Rearrange[tuple]{int}(false) - scope-52 -> scope-73
+| | |
+| | Project[int][0] - scope-53
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-66
+# Plan on vertex
+POValueOutputTez - scope-70 -> [scope-67]
+|
+|---a5: New For Each(false,false)[bag] - scope-42
+ | |
+ | Project[int][0] - scope-38
+ | |
+ | Project[int][3] - scope-40
+ |
+ |---a4: New For Each(true,true)[tuple] - scope-37
+ | |
+ | Project[bag][1] - scope-35
+ | |
+ | Project[bag][2] - scope-36
+ |
+ |---a4: Package(Packager)[tuple]{int} - scope-30
+Tez vertex scope-67
+# Plan on vertex
+c: Local Rearrange[tuple]{int}(false) - scope-50 -> scope-73
+| |
+| Project[int][0] - scope-51
+|
+|---POShuffledValueInputTez - scope-68 <- [scope-58, scope-66]
+Tez vertex scope-73
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-57
+|
+|---c: New For Each(true,true)[tuple] - scope-56
+ | |
+ | Project[bag][1] - scope-54
+ | |
+ | Project[bag][2] - scope-55
+ |
+ |---c: Package(Packager)[tuple]{int} - scope-49
Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1729682&r1=1729681&r2=1729682&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Wed Feb 10 18:11:46 2016
@@ -173,6 +173,35 @@ public class TestTezCompiler {
}
@Test
+ public void testSelfJoinUnion() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "a1 = filter a by x > 5;" +
+ "a2 = filter a by x < 2;" +
+ "b = union a1, a2;" +
+ "c = join b by x, a by x;" +
+ "store c into 'file:///tmp/output';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld");
+ }
+
+ @Test
+ public void testSelfJoinUnionDifferentMembers() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "a1 = filter a by x > 5;" +
+ "a2 = filter a by x < 2;" +
+ "a3 = filter a by y == 10;" +
+ "a4 = join a2 by x, a3 by x;" +
+ "a5 = foreach a4 generate a2::x as x, a3::y as y;" +
+ "b = union a1, a5;" +
+ "c = join b by x, a by x;" +
+ "store c into 'file:///tmp/output';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld");
+ }
+
+ @Test
public void testCross() throws Exception {
String query =
"a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -202,7 +231,7 @@ public class TestTezCompiler {
"a = load 'file:///tmp/input1' as (x:int, y:int);" +
"b = load 'file:///tmp/input2' as (x:int, z:int);" +
"c = cross b, a;" +
- "d = foreach c generate a.x, b.z;" + //Scalar
+ "d = foreach c generate a.x, a.y, z;" + //Scalar
"store d into 'file:///tmp/output';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld");