You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/02/10 19:11:46 UTC

svn commit: r1729682 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ test/e2e/pig/tests/ test/org/apache/pig/test/data/GoldenFiles/tez/ test/org/apache/pig/tez/

Author: rohini
Date: Wed Feb 10 18:11:46 2016
New Revision: 1729682

URL: http://svn.apache.org/viewvc?rev=1729682&view=rev
Log:
PIG-4790: Join after union fail due to UnionOptimizer (rohini)

Added:
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/trunk/test/e2e/pig/tests/multiquery.conf
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1729682&r1=1729681&r2=1729682&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Feb 10 18:11:46 2016
@@ -89,6 +89,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4790: Join after union fail due to UnionOptimizer (rohini)
+
 PIG-4686: Backend code should not call AvroStorageUtils.getPaths (mitdesai via rohini)
 
 PIG-4795: Flushing ObjectOutputStream before calling toByteArray on the underlying ByteArrayOutputStream (emopers via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1729682&r1=1729681&r2=1729682&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Wed Feb 10 18:11:46 2016
@@ -167,6 +167,19 @@ public class UnionOptimizer extends TezO
         List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null ? null
                 : new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
 
+
+        if (successors != null && uniqueUnionMembers.size() > 1) {
+            for (TezOperator succ : successors) {
+                for (TezOperator pred : predecessors) {
+                    if (succ.inEdges.containsKey(pred.getOperatorKey())) {
+                        // Stop here, we cannot convert the node into vertex group
+                        // Otherwise, we will end up with a parallel edge between pred
+                        // and succ
+                        return;
+                    }
+                }
+            }
+        }
         if (predecessors.size() > unionOp.getUnionMembers().size()
                 && uniqueUnionMembers.size() != 1) {
             return; // TODO: PIG-3856
@@ -356,6 +369,7 @@ public class UnionOptimizer extends TezO
             throws PlanException, VisitorException {
         String unionOpKey = unionOp.getOperatorKey().toString();
         String splitPredOpKey = splitPredOp.getOperatorKey().toString();
+        List<TezOperator> splitSuccessors = tezPlan.getSuccessors(splitPredOp);
         if (successors != null) {
             for (TezOperator succ : successors) {
                 TezOperator successorVertexGroup = null;
@@ -389,7 +403,10 @@ public class UnionOptimizer extends TezO
                         //Only splitPredOp is member of the vertex group. Get rid of the vertex group
                         removeSuccessorVertexGroup = true;
                     } else {
-                        tezPlan.connect(splitPredOp, successorVertexGroup);
+                        // Avoid connecting multiple times in case of union + self join
+                        if (splitSuccessors == null || !splitSuccessors.contains(successorVertexGroup)) {
+                            tezPlan.connect(splitPredOp, successorVertexGroup);
+                        }
                     }
                 } else {
                     actualSuccs.add(succ);
@@ -423,7 +440,10 @@ public class UnionOptimizer extends TezO
                             // to SplitOp -> Successor
                             tezPlan.disconnect(unionOp, actualSucc);
                         }
-                        tezPlan.connect(splitPredOp, actualSucc);
+                        // Avoid connecting multiple times in case of union + self join
+                        if (splitSuccessors == null || !splitSuccessors.contains(actualSucc)) {
+                            tezPlan.connect(splitPredOp, actualSucc);
+                        }
                     }
                 }
             }

Modified: pig/trunk/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/multiquery.conf?rev=1729682&r1=1729681&r2=1729682&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/trunk/test/e2e/pig/tests/multiquery.conf Wed Feb 10 18:11:46 2016
@@ -821,6 +821,45 @@ a2 = filter a by gpa == 4.00;
 b = union a1, a2;
 c = JOIN a by name, b by name using 'replicated';
 store c into ':OUTPATH:';\,
+            },
+            {
+            # Self join union
+            'num' => 10,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+a1 = filter a by gpa == 0.00;
+a2 = filter a by gpa == 4.00;
+b = union a1, a2;
+c = JOIN a by name left, b by name;
+store c into ':OUTPATH:';\,
+            },
+            {
+            # Complex self join
+            'num' => 11,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
+SPLIT a INTO b IF age > 40,
+             c IF age <= 40;
+
+d = FOREACH c GENERATE name, age, gpa;
+
+e = FILTER d BY gpa > 3;
+f = FILTER d BY gpa <= 3;
+
+g = JOIN e BY name LEFT, f BY name;
+h = FOREACH g GENERATE e::name as name, e::age as age, e::gpa as gpa;
+
+i = DISTINCT h;
+
+j = FILTER b BY gpa > 3;
+k = FILTER b by gpa <= 3;
+
+l = JOIN j BY name LEFT, k BY name;
+m = FOREACH l generate j::name as name, j::age as age, j::gpa as gpa;
+n = DISTINCT m;
+
+m = UNION e, i, j, n;
+
+n = JOIN a BY name, m BY name;
+store n into ':OUTPATH:';\,
             }
             ] # end of tests
         },

Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld?rev=1729682&r1=1729681&r2=1729682&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld (original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld Wed Feb 10 18:11:46 2016
@@ -4,106 +4,98 @@
 #--------------------------------------------------
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-61	->	Tez vertex scope-65,Tez vertex scope-69,
-Tez vertex scope-65	->	Tez vertex scope-69,
-Tez vertex scope-57	->	Tez vertex scope-67,Tez vertex scope-69,
-Tez vertex scope-67	->	Tez vertex scope-69,
-Tez vertex scope-69
+Tez vertex scope-57	->	Tez vertex scope-60,
+Tez vertex scope-53	->	Tez vertex scope-58,Tez vertex scope-60,
+Tez vertex scope-58	->	Tez vertex scope-60,
+Tez vertex scope-60
 
-Tez vertex scope-61
+Tez vertex scope-57
 # Plan on vertex
-b: Split - scope-72
+c: Local Rearrange[tuple]{tuple}(false) - scope-26	->	 scope-60
 |   |
-|   POValueOutputTez - scope-71	->	 [scope-69]
+|   Project[int][0] - scope-27
 |   |
-|   POValueOutputTez - scope-62	->	 [scope-65]
+|   Project[int][1] - scope-28
 |
-|---b: New For Each(false,false)[bag] - scope-7
+|---c: New For Each(true,true)[tuple] - scope-25
     |   |
-    |   Cast[int] - scope-2
+    |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-23
     |   |
-    |   |---Project[bytearray][0] - scope-1
+    |   |---Constant(2) - scope-21
     |   |
-    |   Cast[int] - scope-5
+    |   |---Constant(0) - scope-22
     |   |
-    |   |---Project[bytearray][1] - scope-4
+    |   Project[tuple][*] - scope-24
     |
-    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex scope-65
+    |---b: New For Each(false)[bag] - scope-4
+        |   |
+        |   Cast[int] - scope-2
+        |   |
+        |   |---Project[bytearray][0] - scope-1
+        |
+        |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-53
 # Plan on vertex
-c: Local Rearrange[tuple]{tuple}(false) - scope-29	->	 scope-69
+a: Split - scope-62
 |   |
-|   Project[int][0] - scope-30
+|   POValueOutputTez - scope-61	->	 [scope-60]
 |   |
-|   Project[int][1] - scope-31
+|   POValueOutputTez - scope-54	->	 [scope-58]
 |
-|---c: New For Each(true,true)[tuple] - scope-28
+|---a: New For Each(false,false)[bag] - scope-12
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-26
+    |   Cast[int] - scope-7
     |   |
-    |   |---Constant(2) - scope-24
+    |   |---Project[bytearray][0] - scope-6
     |   |
-    |   |---Constant(0) - scope-25
+    |   Cast[int] - scope-10
     |   |
-    |   Project[tuple][*] - scope-27
+    |   |---Project[bytearray][1] - scope-9
     |
-    |---POValueInputTez - scope-66	<-	 scope-61
-Tez vertex scope-57
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-5
+Tez vertex scope-58
 # Plan on vertex
-a: Split - scope-73
+c: Local Rearrange[tuple]{tuple}(false) - scope-34	->	 scope-60
 |   |
-|   POValueOutputTez - scope-70	->	 [scope-69]
+|   Project[int][0] - scope-35
 |   |
-|   POValueOutputTez - scope-58	->	 [scope-67]
+|   Project[int][1] - scope-36
 |
-|---a: New For Each(false,false)[bag] - scope-18
+|---c: New For Each(true,true)[tuple] - scope-33
     |   |
-    |   Cast[int] - scope-13
+    |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-31
     |   |
-    |   |---Project[bytearray][0] - scope-12
+    |   |---Constant(2) - scope-29
     |   |
-    |   Cast[int] - scope-16
+    |   |---Constant(1) - scope-30
     |   |
-    |   |---Project[bytearray][1] - scope-15
+    |   Project[tuple][*] - scope-32
     |
-    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-11
-Tez vertex scope-67
+    |---a: New For Each(false)[bag] - scope-16
+        |   |
+        |   Project[int][0] - scope-14
+        |
+        |---POValueInputTez - scope-59	<-	 scope-53
+Tez vertex scope-60
 # Plan on vertex
-c: Local Rearrange[tuple]{tuple}(false) - scope-37	->	 scope-69
-|   |
-|   Project[int][0] - scope-38
-|   |
-|   Project[int][1] - scope-39
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-52
 |
-|---c: New For Each(true,true)[tuple] - scope-36
-    |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-34
-    |   |
-    |   |---Constant(2) - scope-32
-    |   |
-    |   |---Constant(1) - scope-33
+|---d: New For Each(false,false,false)[bag] - scope-51
     |   |
-    |   Project[tuple][*] - scope-35
-    |
-    |---POValueInputTez - scope-68	<-	 scope-57
-Tez vertex scope-69
-# Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-56
-|
-|---d: New For Each(false,false)[bag] - scope-55
+    |   POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-45
     |   |
-    |   POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-51
+    |   |---Constant(0) - scope-43
     |   |
-    |   |---Constant(0) - scope-49
+    |   POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-48
     |   |
-    |   POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-54
+    |   |---Constant(1) - scope-46
     |   |
-    |   |---Constant(1) - scope-52
+    |   Project[int][0] - scope-49
     |
-    |---c: New For Each(true,true)[tuple] - scope-42
+    |---c: New For Each(true,true)[tuple] - scope-39
         |   |
-        |   Project[bag][1] - scope-40
+        |   Project[bag][1] - scope-37
         |   |
-        |   Project[bag][2] - scope-41
+        |   Project[bag][2] - scope-38
         |
-        |---Package(Packager)[tuple]{tuple} - scope-23
+        |---Package(Packager)[tuple]{tuple} - scope-20

Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld?rev=1729682&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld Wed Feb 10 18:11:46 2016
@@ -0,0 +1,63 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-36	->	Tez vertex scope-48,
+Tez vertex scope-48
+
+Tez vertex scope-36
+# Plan on vertex
+a: Split - scope-49
+|   |
+|   c: Local Rearrange[tuple]{int}(false) - scope-50	->	 scope-48
+|   |   |
+|   |   Project[int][0] - scope-51
+|   |
+|   |---a1: Filter[bag] - scope-11
+|       |   |
+|       |   Greater Than[boolean] - scope-14
+|       |   |
+|       |   |---Project[int][0] - scope-12
+|       |   |
+|       |   |---Constant(5) - scope-13
+|   |
+|   c: Local Rearrange[tuple]{int}(false) - scope-52	->	 scope-48
+|   |   |
+|   |   Project[int][0] - scope-53
+|   |
+|   |---a2: Filter[bag] - scope-17
+|       |   |
+|       |   Less Than[boolean] - scope-20
+|       |   |
+|       |   |---Project[int][0] - scope-18
+|       |   |
+|       |   |---Constant(2) - scope-19
+|   |
+|   c: Local Rearrange[tuple]{int}(false) - scope-30	->	 scope-48
+|   |   |
+|   |   Project[int][0] - scope-31
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-48
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-35
+|
+|---c: New For Each(true,true)[tuple] - scope-34
+    |   |
+    |   Project[bag][1] - scope-32
+    |   |
+    |   Project[bag][2] - scope-33
+    |
+    |---c: Package(Packager)[tuple]{int} - scope-27

Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld?rev=1729682&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld Wed Feb 10 18:11:46 2016
@@ -0,0 +1,99 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-58	->	Tez vertex scope-66,Tez vertex scope-67,Tez vertex scope-73,
+Tez vertex scope-66	->	Tez vertex scope-67,
+Tez vertex scope-67	->	Tez vertex scope-73,
+Tez vertex scope-73
+
+Tez vertex scope-58
+# Plan on vertex
+a: Split - scope-74
+|   |
+|   POValueOutputTez - scope-69	->	 [scope-67]
+|   |
+|   |---a1: Filter[bag] - scope-11
+|       |   |
+|       |   Greater Than[boolean] - scope-14
+|       |   |
+|       |   |---Project[int][0] - scope-12
+|       |   |
+|       |   |---Constant(5) - scope-13
+|   |
+|   a4: Local Rearrange[tuple]{int}(false) - scope-31	->	 scope-66
+|   |   |
+|   |   Project[int][0] - scope-32
+|   |
+|   |---a2: Filter[bag] - scope-17
+|       |   |
+|       |   Less Than[boolean] - scope-20
+|       |   |
+|       |   |---Project[int][0] - scope-18
+|       |   |
+|       |   |---Constant(2) - scope-19
+|   |
+|   a4: Local Rearrange[tuple]{int}(false) - scope-33	->	 scope-66
+|   |   |
+|   |   Project[int][0] - scope-34
+|   |
+|   |---a3: Filter[bag] - scope-23
+|       |   |
+|       |   Equal To[boolean] - scope-26
+|       |   |
+|       |   |---Project[int][1] - scope-24
+|       |   |
+|       |   |---Constant(10) - scope-25
+|   |
+|   c: Local Rearrange[tuple]{int}(false) - scope-52	->	 scope-73
+|   |   |
+|   |   Project[int][0] - scope-53
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-66
+# Plan on vertex
+POValueOutputTez - scope-70	->	 [scope-67]
+|
+|---a5: New For Each(false,false)[bag] - scope-42
+    |   |
+    |   Project[int][0] - scope-38
+    |   |
+    |   Project[int][3] - scope-40
+    |
+    |---a4: New For Each(true,true)[tuple] - scope-37
+        |   |
+        |   Project[bag][1] - scope-35
+        |   |
+        |   Project[bag][2] - scope-36
+        |
+        |---a4: Package(Packager)[tuple]{int} - scope-30
+Tez vertex scope-67
+# Plan on vertex
+c: Local Rearrange[tuple]{int}(false) - scope-50	->	 scope-73
+|   |
+|   Project[int][0] - scope-51
+|
+|---POShuffledValueInputTez - scope-68	<-	 [scope-58, scope-66]
+Tez vertex scope-73
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-57
+|
+|---c: New For Each(true,true)[tuple] - scope-56
+    |   |
+    |   Project[bag][1] - scope-54
+    |   |
+    |   Project[bag][2] - scope-55
+    |
+    |---c: Package(Packager)[tuple]{int} - scope-49

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1729682&r1=1729681&r2=1729682&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Wed Feb 10 18:11:46 2016
@@ -173,6 +173,35 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testSelfJoinUnion() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "a1 = filter a by x > 5;" +
+                "a2 = filter a by x < 2;" +
+                "b = union a1, a2;" +
+                "c = join b by x, a by x;" +
+                "store c into 'file:///tmp/output';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld");
+    }
+
+    @Test
+    public void testSelfJoinUnionDifferentMembers() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "a1 = filter a by x > 5;" +
+                "a2 = filter a by x < 2;" +
+                "a3 = filter a by y == 10;" +
+                "a4 = join a2 by x, a3 by x;" +
+                "a5 = foreach a4 generate a2::x as x, a3::y as y;" +
+                "b = union a1, a5;" +
+                "c = join b by x, a by x;" +
+                "store c into 'file:///tmp/output';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld");
+    }
+
+    @Test
     public void testCross() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -202,7 +231,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = cross b, a;" +
-                "d = foreach c generate a.x, b.z;" + //Scalar
+                "d = foreach c generate a.x, a.y, z;" + //Scalar
                 "store d into 'file:///tmp/output';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld");