You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/10/05 22:56:44 UTC
svn commit: r1706922 - in /pig/branches/branch-0.15: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
test/org/apache/pig/test/TestLimitVariable.java
Author: rohini
Date: Mon Oct 5 20:56:44 2015
New Revision: 1706922
URL: http://svn.apache.org/viewvc?rev=1706922&view=rev
Log:
PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)
Modified:
pig/branches/branch-0.15/CHANGES.txt
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
pig/branches/branch-0.15/test/org/apache/pig/test/TestLimitVariable.java
Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1706922&r1=1706921&r2=1706922&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Mon Oct 5 20:56:44 2015
@@ -28,6 +28,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)
+
PIG-4635: NPE while running pig script in tez mode (daijy)
PIG-4683: Nested order is broken after PIG-3591 in some cases (daijy)
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1706922&r1=1706921&r2=1706922&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Mon Oct 5 20:56:44 2015
@@ -34,6 +34,7 @@ import org.apache.pig.JVMReuseImpl;
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -195,6 +196,18 @@ public class PigProcessor extends Abstra
runPipeline(leaf);
+ if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))
+ && !execPlan.endOfAllInput) {
+ // If there is a stream in the pipeline or if this map job belongs to merge-join we could
+ // potentially have more to process - so lets
+ // set the flag stating that all map input has been sent
+ // already and then lets run the pipeline one more time
+ // This will result in nothing happening in the case
+ // where there is no stream or it is not a merge-join in the pipeline
+ execPlan.endOfAllInput = true;
+ runPipeline(leaf);
+ }
+
// Calling EvalFunc.finish()
UDFFinishVisitor finisher = new UDFFinishVisitor(execPlan,
new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(
Modified: pig/branches/branch-0.15/test/org/apache/pig/test/TestLimitVariable.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/TestLimitVariable.java?rev=1706922&r1=1706921&r2=1706922&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/test/TestLimitVariable.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/test/TestLimitVariable.java Mon Oct 5 20:56:44 2015
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
@@ -64,20 +65,24 @@ public class TestLimitVariable {
@Test
public void testLimitVariable1() throws IOException {
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "" + true);
String query =
- "a = load '" + inputFile.getName() + "';" +
+ "a = load '" + inputFile.getName() + "' as (f1:int, f2:int);" +
"b = group a all;" +
"c = foreach b generate COUNT(a) as sum;" +
"d = order a by $0 DESC;" +
- "e = limit d c.sum/2;" // return top half of the tuples
+ "e = limit d c.sum/2;" + // return top half of the tuples
+ "f = group e all;" +
+ "g = foreach f generate AVG(e.$0), SUM(e.$1);"
;
Util.registerMultiLineQuery(pigServer, query);
- Iterator<Tuple> it = pigServer.openIterator("e");
+ Iterator<Tuple> it = pigServer.openIterator("g");
List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] {
- "(6,15)", "(5,10)", "(4,11)" });
+ "(5.0,36)"});
Util.checkQueryOutputs(it, expectedRes);
+ pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_EXEC_MAP_PARTAGG);
}
@Test