You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/10/05 22:56:44 UTC

svn commit: r1706922 - in /pig/branches/branch-0.15: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java test/org/apache/pig/test/TestLimitVariable.java

Author: rohini
Date: Mon Oct  5 20:56:44 2015
New Revision: 1706922

URL: http://svn.apache.org/viewvc?rev=1706922&view=rev
Log:
PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)

Modified:
    pig/branches/branch-0.15/CHANGES.txt
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
    pig/branches/branch-0.15/test/org/apache/pig/test/TestLimitVariable.java

Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1706922&r1=1706921&r2=1706922&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Mon Oct  5 20:56:44 2015
@@ -28,6 +28,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)
+
 PIG-4635: NPE while running pig script in tez mode (daijy)
 
 PIG-4683: Nested order is broken after PIG-3591 in some cases (daijy)

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1706922&r1=1706921&r2=1706922&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Mon Oct  5 20:56:44 2015
@@ -34,6 +34,7 @@ import org.apache.pig.JVMReuseImpl;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -195,6 +196,18 @@ public class PigProcessor extends Abstra
 
             runPipeline(leaf);
 
+            if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))
+                    && !execPlan.endOfAllInput) {
+                // If there is a stream in the pipeline or if this map job belongs to merge-join we could
+                // potentially have more to process - so lets
+                // set the flag stating that all map input has been sent
+                // already and then lets run the pipeline one more time
+                // This will result in nothing happening in the case
+                // where there is no stream or it is not a merge-join in the pipeline
+                execPlan.endOfAllInput = true;
+                runPipeline(leaf);
+            }
+
             // Calling EvalFunc.finish()
             UDFFinishVisitor finisher = new UDFFinishVisitor(execPlan,
                     new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(

Modified: pig/branches/branch-0.15/test/org/apache/pig/test/TestLimitVariable.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/TestLimitVariable.java?rev=1706922&r1=1706921&r2=1706922&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/test/TestLimitVariable.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/test/TestLimitVariable.java Mon Oct  5 20:56:44 2015
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
@@ -64,20 +65,24 @@ public class TestLimitVariable {
 
     @Test
     public void testLimitVariable1() throws IOException {
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "" + true);
         String query =
-            "a = load '" + inputFile.getName() + "';" +
+            "a = load '" + inputFile.getName() + "' as (f1:int, f2:int);" +
             "b = group a all;" +
             "c = foreach b generate COUNT(a) as sum;" +
             "d = order a by $0 DESC;" +
-            "e = limit d c.sum/2;" // return top half of the tuples
+            "e = limit d c.sum/2;" + // return top half of the tuples
+            "f = group e all;" +
+            "g = foreach f generate AVG(e.$0), SUM(e.$1);"
             ;
 
         Util.registerMultiLineQuery(pigServer, query);
-        Iterator<Tuple> it = pigServer.openIterator("e");
+        Iterator<Tuple> it = pigServer.openIterator("g");
 
         List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] {
-                "(6,15)", "(5,10)", "(4,11)" });
+                "(5.0,36)"});
         Util.checkQueryOutputs(it, expectedRes);
+        pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_EXEC_MAP_PARTAGG);
     }
 
     @Test