You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/21 23:15:55 UTC
svn commit: r1596690 -
/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Author: daijy
Date: Wed May 21 21:15:54 2014
New Revision: 1596690
URL: http://svn.apache.org/r1596690
Log:
PIG-3946: Fix e2e test failure CastScalar_11
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1596690&r1=1596689&r2=1596690&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed May 21 21:15:54 2014
@@ -80,6 +80,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -433,6 +434,7 @@ public class TezDagBuilder extends TezOp
}
// Configure the classes for incoming shuffles to this TezOp
+ // TODO: Refactor out resetting input keys, PIG-3957
List<PhysicalOperator> roots = tezOp.plan.getRoots();
if (roots.size() == 1 && roots.get(0) instanceof POPackage) {
POPackage pack = (POPackage) roots.get(0);
@@ -498,8 +500,20 @@ public class TezDagBuilder extends TezOp
identityInOut.setInputKey(identityInOutPred.getOperatorKey().toString());
} else if (roots.size() == 1 && roots.get(0) instanceof POValueInputTez) {
POValueInputTez valueInput = (POValueInputTez) roots.get(0);
- TezOperator pred = mPlan.getPredecessors(tezOp).get(0);
- valueInput.setInputKey(pred.getOperatorKey().toString());
+
+ LinkedList<String> scalarInputs = new LinkedList<String>();
+ for (POUserFunc userFunc : PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class) ) {
+ if (userFunc.getFunc() instanceof ReadScalarsTez) {
+ scalarInputs.add(((ReadScalarsTez)userFunc.getFunc()).getTezInputs()[0]);
+ }
+ }
+ // Make sure we don't find the scalar
+ for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
+ if (!scalarInputs.contains(pred.getOperatorKey().toString())) {
+ valueInput.setInputKey(pred.getOperatorKey().toString());
+ break;
+ }
+ }
}
JobControlCompiler.setOutputFormat(payloadConf);