You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2017/09/20 16:44:49 UTC

svn commit: r1809051 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ src/org/apache/pig/backend/hadoop/executionengine/tez/util/ test/org/apache/pig/tez/

Author: knoguchi
Date: Wed Sep 20 16:44:48 2017
New Revision: 1809051

URL: http://svn.apache.org/viewvc?rev=1809051&view=rev
Log:
PIG-5271: StackOverflowError when compiling in Tez mode (with union and replicated join) (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1809051&r1=1809050&r2=1809051&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Sep 20 16:44:48 2017
@@ -48,6 +48,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5271: StackOverflowError when compiling in Tez mode (with union and replicated join) (knoguchi)
+
 PIG-5299: PartitionFilterOptimizer failing at compile time (knoguchi)
 
 PIG-5290: User Cache upload contention can cause job failures (xkrogen via rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1809051&r1=1809050&r2=1809051&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Wed Sep 20 16:44:48 2017
@@ -54,15 +54,6 @@ public class MultiQueryOptimizerTez exte
         this.unionUnsupportedStoreFuncs = unionUnsupportedStoreFuncs;
     }
 
-    private void addAllPredecessors(TezOperator tezOp, List<TezOperator> predsList) {
-        if (getPlan().getPredecessors(tezOp) != null) {
-            for (TezOperator pred : getPlan().getPredecessors(tezOp)) {
-                predsList.add(pred);
-                addAllPredecessors(pred, predsList);
-            }
-        }
-    }
-
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
         try {
@@ -88,7 +79,7 @@ public class MultiQueryOptimizerTez exte
             }
 
             for (TezOperator successor : successors) {
-                List<TezOperator> predecessors = new ArrayList<TezOperator>(getPlan().getPredecessors(successor));
+                HashSet<TezOperator> predecessors = new HashSet<TezOperator>(getPlan().getPredecessors(successor));
                 predecessors.remove(tezOp);
                 if (!predecessors.isEmpty()) {
                     // If has other dependency that conflicts with other splittees, don't merge into split
@@ -103,16 +94,16 @@ public class MultiQueryOptimizerTez exte
                     for (TezOperator predecessor : getPlan().getPredecessors(successor)) {
                         if (predecessor != tezOp) {
                             predecessors.add(predecessor);
-                            addAllPredecessors(predecessor, predecessors);
+                            TezCompilerUtil.addAllPredecessors(getPlan(), predecessor, predecessors);
                         }
                     }
-                    List<TezOperator> toMergeSuccPredecessors = new ArrayList<TezOperator>(successors);
+                    Set<TezOperator> toMergeSuccPredecessors = new HashSet<TezOperator>(successors);
                     toMergeSuccPredecessors.remove(successor);
                     for (TezOperator splittee : splittees) {
                         for (TezOperator spliteePred : getPlan().getPredecessors(splittee)) {
                             if (spliteePred != tezOp) {
                                 toMergeSuccPredecessors.add(spliteePred);
-                                addAllPredecessors(spliteePred, toMergeSuccPredecessors);
+                                TezCompilerUtil.addAllPredecessors(getPlan(), spliteePred, toMergeSuccPredecessors);
                             }
                         }
                     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1809051&r1=1809050&r2=1809051&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Wed Sep 20 16:44:48 2017
@@ -224,6 +224,19 @@ public class UnionOptimizer extends TezO
             PhysicalPlan splitPredPlan = splitPredOp.plan;
             if (splitPredPlan.getLeaves().get(0) instanceof POSplit) { //It has to be. But check anyways
 
+                for( TezOperator op : predecessors ) {
+                    if( !op.getOperatorKey().equals(splitPredKey)) {
+                        Set<TezOperator> allNonMemberPredecessorsAncestors = new HashSet<TezOperator>();
+                        TezCompilerUtil.addAllPredecessors(tezPlan, op, allNonMemberPredecessorsAncestors);
+                        // If any of the nonMemberPredecessor's ancestors(recursive predecessor)
+                        // is from the single unionmember, then we stop the merge effort to avoid creating
+                        // an illegal loop.
+                        if( allNonMemberPredecessorsAncestors.contains(splitPredOp) ) {
+                            return;
+                        }
+                    }
+                }
+
                 try {
                     connectUnionNonMemberPredecessorsToSplit(unionOp, splitPredOp, predecessors);
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1809051&r1=1809050&r2=1809051&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Wed Sep 20 16:44:48 2017
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.pig.PigException;
@@ -300,4 +301,15 @@ public class TezCompilerUtil {
         return false;
     }
 
+    public static void addAllPredecessors(TezOperPlan tezPlan, TezOperator tezOp, Set<TezOperator> predSet) {
+        if (tezPlan.getPredecessors(tezOp) != null) {
+            for (TezOperator pred : tezPlan.getPredecessors(tezOp)) {
+                if( ! predSet.contains(pred) ) {
+                    predSet.add(pred);
+                    addAllPredecessors(tezPlan, pred, predSet);
+                }
+            }
+        }
+    }
+
 }

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1809051&r1=1809050&r2=1809051&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Wed Sep 20 16:44:48 2017
@@ -1358,6 +1358,29 @@ public class TestTezCompiler {
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld");
     }
 
+    @Test
+    public void testJoinUnionSingleMemberOverlappingPredecessor() throws Exception {
+        String query =
+                "A = load 'file:///tmp/input1.txt' as (a1:int, a2:int);" +
+                "A1 = FILTER A by a1 > 10;" +
+                "A2 = FILTER A by a2 > 10;" +
+                "B = UNION A1, A2;" +
+                "C = join A1 by a1, A2 by a1;" +
+                "D = DISTINCT C;" +
+                "Z = join B by a1, D by A1::a1 using 'replicated'; " +
+                "store Z into 'file:///tmp/pigoutput';";
+        /*
+        [A,A1,A2] -> [C], [B,Z]
+        [C] -> [D]
+        [D] -> [B,Z]
+
+        with bug PIG-5271, UnionOptimizor tries to combine [A,A1,A2] and [B,Z], and creates an incorrect loop.
+        [A,A1,A2,B,Z] -> [C] -> [D] -> [A,A1,A2,B,Z]
+        */
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-22.gld");
+    }
+
     private String getProperty(String property) {
         return pigServer.getPigContext().getProperties().getProperty(property);
     }