You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/10/06 16:30:26 UTC

svn commit: r1707064 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/ test/org/apache/pig/test/

Author: rohini
Date: Tue Oct  6 14:30:26 2015
New Revision: 1707064

URL: http://svn.apache.org/viewvc?rev=1707064&view=rev
Log:
PIG-3957: Refactor out resetting input key in TezDagBuilder (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
    pig/trunk/test/org/apache/pig/test/TestFRJoin.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Oct  6 14:30:26 2015
@@ -49,6 +49,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-3957: Refactor out resetting input key in TezDagBuilder (rohini)
+
 PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)
 
 PIG-4635: NPE while running pig script in tez mode (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Oct  6 14:30:26 2015
@@ -80,7 +80,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -94,12 +93,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPOPackageAnnotator.LoRearrangeDiscoverer;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POIdentityInOutTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POShuffleTezLoad;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
@@ -585,33 +581,6 @@ public class TezDagBuilder extends TezOp
 
             //POShuffleTezLoad accesses the comparator setting
             selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);
-        } else if (roots.size() == 1 && roots.get(0) instanceof POIdentityInOutTez) {
-            POIdentityInOutTez identityInOut = (POIdentityInOutTez) roots.get(0);
-            // TODO Need to fix multiple input key mapping
-            TezOperator identityInOutPred = null;
-            for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
-                if (!pred.isSampleAggregation()) {
-                    identityInOutPred = pred;
-                    break;
-                }
-            }
-            identityInOut.setInputKey(identityInOutPred.getOperatorKey().toString());
-        } else if (roots.size() == 1 && roots.get(0) instanceof POValueInputTez) {
-            POValueInputTez valueInput = (POValueInputTez) roots.get(0);
-
-            LinkedList<String> scalarInputs = new LinkedList<String>();
-            for (POUserFunc userFunc : PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class) ) {
-                if (userFunc.getFunc() instanceof ReadScalarsTez) {
-                    scalarInputs.add(((ReadScalarsTez)userFunc.getFunc()).getTezInputs()[0]);
-                }
-            }
-            // Make sure we don't find the scalar
-            for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
-                if (!scalarInputs.contains(pred.getOperatorKey().toString())) {
-                    valueInput.setInputKey(pred.getOperatorKey().toString());
-                    break;
-                }
-            }
         }
         setOutputFormat(job);
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Tue Oct  6 14:30:26 2015
@@ -294,6 +294,7 @@ public class TezCompiler extends PhyPlan
                         storeSeen.get(store).addOutputKey(tezOp.getOperatorKey().toString());
                     } else {
                         POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+                        output.setScalarOutput(true);
                         output.addOutputKey(tezOp.getOperatorKey().toString());
                         from.plan.remove(from.plan.getOperator(store.getOperatorKey()));
                         from.plan.addAsLeaf(output);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java Tue Oct  6 14:30:26 2015
@@ -120,12 +120,10 @@ public class POValueInputTez extends Phy
                     }
                     hasNext = shuffleReader.next();
                 }
-            } else {
-                if (reader.next()) {
-                    Tuple origTuple = (Tuple)reader.getCurrentValue();
-                    Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
-                    return new Result(POStatus.STATUS_OK, copy);
-                }
+            } else if (reader.next()) {
+                Tuple origTuple = (Tuple) reader.getCurrentValue();
+                Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                return new Result(POStatus.STATUS_OK, copy);
             }
             finished = true;
             // For certain operators (such as STREAM), we could still have some work

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java Tue Oct  6 14:30:26 2015
@@ -53,6 +53,8 @@ public class POValueOutputTez extends Ph
 
     private static final TupleFactory tupleFactory = TupleFactory.getInstance();
 
+    private boolean scalarOutput;
+    private transient Object scalarValue;
     private boolean taskIndexWithRecordIndexAsKey;
     // TODO Change this to outputKey and write only once
     // when a shared edge support is available in Tez
@@ -71,6 +73,14 @@ public class POValueOutputTez extends Ph
         super(k);
     }
 
+    public boolean isScalarOutput() {
+        return scalarOutput;
+    }
+
+    public void setScalarOutput(boolean scalarOutput) {
+        this.scalarOutput = scalarOutput;
+    }
+
     public boolean isTaskIndexWithRecordIndexAsKey() {
         return taskIndexWithRecordIndexAsKey;
     }
@@ -149,14 +159,25 @@ public class POValueOutputTez extends Ph
             if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
             }
+            if (scalarOutput) {
+                if (scalarValue == null) {
+                    scalarValue = inp.result;
+                } else {
+                    String msg = "Scalar has more than one row in the output. "
+                            + "1st : " + scalarValue + ", 2nd :"
+                            + inp.result
+                            + " (common cause: \"JOIN\" then \"FOREACH ... GENERATE foo.bar\" should be \"foo::bar\" )";
+                    throw new ExecException(msg);
+                }
+            }
+            if (taskIndexWithRecordIndexAsKey) {
+                Tuple tuple = tupleFactory.newTuple(2);
+                tuple.set(0, taskIndex);
+                tuple.set(1, count++);
+                key = tuple;
+            }
             for (KeyValueWriter writer : writers) {
                 try {
-                    if (taskIndexWithRecordIndexAsKey) {
-                        Tuple tuple = tupleFactory.newTuple(2);
-                        tuple.set(0, taskIndex);
-                        tuple.set(1, count++);
-                        key = tuple;
-                    }
                     writer.write(key, inp.result);
                 } catch (IOException e) {
                     throw new ExecException(e);

Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin.java?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoin.java Tue Oct  6 14:30:26 2015
@@ -437,7 +437,7 @@ public class TestFRJoin {
         Map<String, Tuple> hashJoin = new HashMap<String, Tuple>();
         {
             pigServer.registerQuery("C = join A by $0 left, B by $0 using 'replicated';");
-            pigServer.registerQuery("D = join A by $1 left, B by $1 using 'replicated';");
+            pigServer.registerQuery("D = join A by $1 left, B by $1 using 'repl';");
             pigServer.registerQuery("E = union C,D;");
             Iterator<Tuple> iter = pigServer.openIterator("E");
 
@@ -470,14 +470,14 @@ public class TestFRJoin {
     public void testFRJoinOut9() throws IOException {
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (x:int,y:int);");
+        pigServer.registerQuery("C = UNION A, B;");
+        pigServer.registerQuery("D = FILTER C BY x == 1;");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance()
                 .newDefaultBag();
         Map<String, Tuple> hashFRJoin = new HashMap<String, Tuple>();
         Map<String, Tuple> hashJoin = new HashMap<String, Tuple>();
         {
-            pigServer.registerQuery("C = join A by $0 left, B by $0 using 'repl';");
-            pigServer.registerQuery("D = join A by $1 left, B by $1 using 'repl';");
-            pigServer.registerQuery("E = union C,D;");
+            pigServer.registerQuery("E = join C by $0 left, D by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("E");
 
             while (iter.hasNext()) {
@@ -489,9 +489,7 @@ public class TestFRJoin {
             }
         }
         {
-            pigServer.registerQuery("C = join A by $0 left, B by $0;");
-            pigServer.registerQuery("D = join A by $1 left, B by $1;");
-            pigServer.registerQuery("E = union C,D;");
+            pigServer.registerQuery("E = join C by $0 left, D by $0;");
             Iterator<Tuple> iter = pigServer.openIterator("E");
             while (iter.hasNext()) {
                 Tuple tuple = iter.next();