You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/21 23:15:55 UTC

svn commit: r1596690 - /pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java

Author: daijy
Date: Wed May 21 21:15:54 2014
New Revision: 1596690

URL: http://svn.apache.org/r1596690
Log:
PIG-3946: Fix e2e test failure CastScalar_11

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1596690&r1=1596689&r2=1596690&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed May 21 21:15:54 2014
@@ -80,6 +80,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -433,6 +434,7 @@ public class TezDagBuilder extends TezOp
         }
 
         // Configure the classes for incoming shuffles to this TezOp
+        // TODO: Refactor out resetting input keys, PIG-3957
         List<PhysicalOperator> roots = tezOp.plan.getRoots();
         if (roots.size() == 1 && roots.get(0) instanceof POPackage) {
             POPackage pack = (POPackage) roots.get(0);
@@ -498,8 +500,20 @@ public class TezDagBuilder extends TezOp
             identityInOut.setInputKey(identityInOutPred.getOperatorKey().toString());
         } else if (roots.size() == 1 && roots.get(0) instanceof POValueInputTez) {
             POValueInputTez valueInput = (POValueInputTez) roots.get(0);
-            TezOperator pred = mPlan.getPredecessors(tezOp).get(0);
-            valueInput.setInputKey(pred.getOperatorKey().toString());
+
+            LinkedList<String> scalarInputs = new LinkedList<String>();
+            for (POUserFunc userFunc : PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class) ) {
+                if (userFunc.getFunc() instanceof ReadScalarsTez) {
+                    scalarInputs.add(((ReadScalarsTez)userFunc.getFunc()).getTezInputs()[0]);
+                }
+            }
+            // Make sure we don't find the scalar
+            for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
+                if (!scalarInputs.contains(pred.getOperatorKey().toString())) {
+                    valueInput.setInputKey(pred.getOperatorKey().toString());
+                    break;
+                }
+            }
         }
         JobControlCompiler.setOutputFormat(payloadConf);