You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/10/02 19:14:24 UTC
svn commit: r1629021 - in /pig/branches/branch-0.14: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/
Author: rohini
Date: Thu Oct 2 17:14:24 2014
New Revision: 1629021
URL: http://svn.apache.org/r1629021
Log:
PIG-3985: Multiquery execution of RANK with RANK BY causes NPE (rohini)
Modified:
pig/branches/branch-0.14/CHANGES.txt
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
pig/branches/branch-0.14/test/org/apache/pig/test/TestPOPartialAgg.java
pig/branches/branch-0.14/test/org/apache/pig/test/TestRank3.java
pig/branches/branch-0.14/test/org/apache/pig/test/TestUnionOnSchema.java
pig/branches/branch-0.14/test/org/apache/pig/test/Util.java
Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1629021&r1=1629020&r2=1629021&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Thu Oct 2 17:14:24 2014
@@ -84,6 +84,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3985: Multiquery execution of RANK with RANK BY causes NPE (rohini)
+
PIG-4218: Pig OrcStorage fail to load a map with null key (daijy)
PIG-4164: After Pig job finish, Pig client spend too much time retry to connect to AM (daijy)
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1629021&r1=1629020&r2=1629021&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Oct 2 17:14:24 2014
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
@@ -28,6 +29,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.Operator;
@@ -523,23 +525,32 @@ public class MapReduceOper extends Opera
}
private POCounter getCounterOperation() {
- PhysicalOperator operator;
- Iterator<PhysicalOperator> it = this.mapPlan.getLeaves().iterator();
-
- while(it.hasNext()) {
- operator = it.next();
- if(operator instanceof POCounter)
- return (POCounter) operator;
+ POCounter counter = getCounterOperation(this.mapPlan);
+ if (counter == null) {
+ counter = getCounterOperation(this.reducePlan);
}
+ return counter;
+ }
- it = this.reducePlan.getLeaves().iterator();
+ private POCounter getCounterOperation(PhysicalPlan plan) {
+ PhysicalOperator operator;
+ Iterator<PhysicalOperator> it = plan.getLeaves().iterator();
- while(it.hasNext()) {
+ while (it.hasNext()) {
operator = it.next();
- if(operator instanceof POCounter)
+ if (operator instanceof POCounter) {
return (POCounter) operator;
+ } else if (operator instanceof POStore) {
+ List<PhysicalOperator> preds = plan.getPredecessors(operator);
+ if (preds != null) {
+ for (PhysicalOperator pred : preds) {
+ if (pred instanceof POCounter) {
+ return (POCounter) pred;
+ }
+ }
+ }
+ }
}
-
return null;
}
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1629021&r1=1629020&r2=1629021&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Thu Oct 2 17:14:24 2014
@@ -121,6 +121,11 @@ class MultiQueryOptimizer extends MROpPl
+ " uses customPartitioner, do not merge it");
continue;
}
+ if (successor.isCounterOperation()) {
+ log.debug("Splittee " + successor.getOperatorKey().getId()
+ + " has POCounter, do not merge it");
+ continue;
+ }
if (isMapOnly(successor)) {
if (isSingleLoadMapperPlan(successor.mapPlan)
&& isSinglePredecessor(successor)) {
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestPOPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestPOPartialAgg.java?rev=1629021&r1=1629020&r2=1629021&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestPOPartialAgg.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestPOPartialAgg.java Thu Oct 2 17:14:24 2014
@@ -17,7 +17,8 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
@@ -324,7 +325,7 @@ public class TestPOPartialAgg {
res = partAggOp.getNextTuple();
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
- Util.compareActualAndExpectedResults(outputs, expectedOuts);
+ Util.checkQueryOutputsAfterSort(outputs, expectedOuts);
} else {
while (true) {
Result res = partAggOp.getNextTuple();
@@ -332,7 +333,7 @@ public class TestPOPartialAgg {
break;
}
}
- Util.compareActualAndExpectedResults(outputs, expectedOuts);
+ Util.checkQueryOutputsAfterSort(outputs, expectedOuts);
}
}
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestRank3.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestRank3.java?rev=1629021&r1=1629020&r2=1629021&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestRank3.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestRank3.java Thu Oct 2 17:14:24 2014
@@ -53,6 +53,12 @@ public class TestRank3 {
data = resetData(pigServer);
data.set("empty");
+ data.set("testsplit",
+ tuple(1, 2),
+ tuple(1, 2),
+ tuple(3, 1),
+ tuple(2, 4),
+ tuple(2, 3));
data.set(
"testcascade",
tuple(3,2,3),
@@ -156,6 +162,39 @@ public class TestRank3 {
verifyExpected(data.get("empty_result"), expected);
}
+ @Test
+ public void testRankWithSplitInMap() throws Exception {
+ String query = "R1 = LOAD 'testsplit' USING mock.Storage() AS (a:int,b:int);"
+ + "R2 = rank R1 by a ;"
+ + "R3 = rank R1 ;"
+ + "R4 = union R2, R3;"
+ + "store R4 into 'R4' using mock.Storage();";
+
+ Util.registerMultiLineQuery(pigServer, query);
+ List<Tuple> expectedResults = Util
+ .getTuplesFromConstantTupleStrings(new String[] { "(1L,1,2)",
+ "(2L,1,2)", "(3L,3,1)", "(4L,2,4)", "(5L,2,3)", "(1L,1,2)",
+ "(1L,1,2)", "(3L,2,3)", "(3L,2,4)", "(5L,3,1)" });
+ Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults);
+ }
+
+ @Test
+ public void testRankWithSplitInReduce() throws Exception {
+ String query = "R1 = LOAD 'testsplit' USING mock.Storage() AS (a:int,b:int);"
+ + "R1 = ORDER R1 by b;"
+ + "R2 = rank R1 by a ;"
+ + "R3 = rank R1;"
+ + "R4 = union R2, R3;"
+ + "store R4 into 'R4' using mock.Storage();";
+
+ Util.registerMultiLineQuery(pigServer, query);
+ List<Tuple> expectedResults = Util
+ .getTuplesFromConstantTupleStrings(new String[] { "(1L,3,1)",
+ "(2L,1,2)", "(3L,1,2)", "(4L,2,3)", "(5L,2,4)", "(1L,1,2)",
+ "(1L,1,2)", "(3L,2,4)", "(3L,2,3)", "(5L,3,1)" });
+ Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults);
+ }
+
public void verifyExpected(List<Tuple> out, Set<Tuple> expected) {
for (Tuple tup : out) {
assertTrue(expected + " contains " + tup, expected.contains(tup));
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestUnionOnSchema.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1629021&r1=1629020&r2=1629021&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestUnionOnSchema.java Thu Oct 2 17:14:24 2014
@@ -492,7 +492,7 @@ public class TestUnionOnSchema {
"(4,5,6,null)",
});
- Util.compareActualAndExpectedResults(list1, expectedRes);
+ Util.checkQueryOutputsAfterSort(list1, expectedRes);
assertEquals(0, list2.size());
}
@@ -852,6 +852,7 @@ public class TestUnionOnSchema {
* Udf that has schema of tuple column with no inner schema
*/
public static class UDFTupleNullSchema extends EvalFunc <Tuple> {
+ @Override
public Tuple exec(Tuple input) {
return input;
}
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/Util.java?rev=1629021&r1=1629020&r2=1629021&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/Util.java Thu Oct 2 17:14:24 2014
@@ -540,14 +540,10 @@ public class Util {
while(actualResultsIt.hasNext()){
actualResList.add(actualResultsIt.next());
}
-
- compareActualAndExpectedResults(actualResList, expectedResList);
-
+ checkQueryOutputsAfterSort(actualResList, expectedResList);
}
-
-
- static public void compareActualAndExpectedResults(
+ static public void checkQueryOutputsAfterSort(
List<Tuple> actualResList, List<Tuple> expectedResList) {
Collections.sort(actualResList);
Collections.sort(expectedResList);