You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/04 19:11:21 UTC

svn commit: r1574168 - in /pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez: MultiQueryOptimizerTez.java POFRJoinTez.java

Author: cheolsoo
Date: Tue Mar  4 18:11:20 2014
New Revision: 1574168

URL: http://svn.apache.org/r1574168
Log:
PIG-3787: Multiquery with FRJoin fail (daijy)

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1574168&r1=1574167&r2=1574168&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Tue Mar  4 18:11:20 2014
@@ -18,14 +18,17 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -100,14 +103,14 @@ public class MultiQueryOptimizerTez exte
 
                 tezOp.plan.remove(firstNodeLeaf);
                 singleSplitee.plan.remove(secondNodeRoot);
-                
+
                 //TODO remove filter all
 
                 tezOp.plan.merge(singleSplitee.plan);
                 tezOp.plan.connect(firstNodeLeafPred, secondNodeSucc);
 
                 addSubPlanPropertiesToParent(tezOp, singleSplitee);
-                
+
                 removeSplittee(getPlan(), tezOp, singleSplitee);
             } else {
                 POValueOutputTez valueOutput = (POValueOutputTez)tezOp.plan.getLeaves().get(0);
@@ -116,9 +119,9 @@ public class MultiQueryOptimizerTez exte
                     PhysicalOperator spliteeRoot =  splitee.plan.getRoots().get(0);
                     splitee.plan.remove(spliteeRoot);
                     split.addPlan(splitee.plan);
-                    
+
                     addSubPlanPropertiesToParent(tezOp, splitee);
-                    
+
                     removeSplittee(getPlan(), tezOp, splitee);
                     valueOutput.outputKeys.remove(splitee.getOperatorKey().toString());
                 }
@@ -138,7 +141,7 @@ public class MultiQueryOptimizerTez exte
             throw new VisitorException(e);
         }
     }
-    
+
     static public void removeSplittee(TezOperPlan plan, TezOperator splitter, TezOperator splittee) throws PlanException {
         if (plan.getSuccessors(splittee)!=null) {
             List<TezOperator> succs = new ArrayList<TezOperator>();
@@ -151,6 +154,20 @@ public class MultiQueryOptimizerTez exte
                 succTezOperator.inEdges.remove(splittee.getOperatorKey());
                 plan.disconnect(splittee, succTezOperator);
                 TezCompilerUtil.connect(plan, splitter, succTezOperator, edge);
+
+                for (TezOperator succ : succs) {
+                    try {
+                        List<POFRJoinTez> frJoins = PlanHelper.getPhysicalOperators(succ.plan, POFRJoinTez.class);
+                        for (POFRJoinTez frJoin : frJoins) {
+                            if (frJoin.getInputKeys().contains(splittee.getOperatorKey().toString())) {
+                                frJoin.getInputKeys().set(frJoin.getInputKeys().indexOf(splittee.getOperatorKey().toString()),
+                                        splitter.getOperatorKey().toString());
+                            }
+                        }
+                    } catch (VisitorException e) {
+                        throw new PlanException(e);
+                    }
+                }
             }
         }
         plan.remove(splittee);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java?rev=1574168&r1=1574167&r2=1574168&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POFRJoinTez.java Tue Mar  4 18:11:20 2014
@@ -195,4 +195,8 @@ public class POFRJoinTez extends POFRJoi
         }
         return super.name() + "\t<-\t " + inputs.toString();
     }
+
+    public List<String> getInputKeys() {
+        return inputKeys;
+    }
 }