You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/10/02 19:14:11 UTC

svn commit: r1629020 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/

Author: rohini
Date: Thu Oct  2 17:14:10 2014
New Revision: 1629020

URL: http://svn.apache.org/r1629020
Log:
PIG-3985: Multiquery execution of RANK with RANK BY causes NPE (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
    pig/trunk/test/org/apache/pig/test/TestRank3.java
    pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1629020&r1=1629019&r2=1629020&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct  2 17:14:10 2014
@@ -92,6 +92,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3985: Multiquery execution of RANK with RANK BY causes NPE (rohini)
+
 PIG-4218: Pig OrcStorage fail to load a map with null key (daijy)
 
 PIG-4164: After Pig job finish, Pig client spend too much time retry to connect to AM (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1629020&r1=1629019&r2=1629020&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Oct  2 17:14:10 2014
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
@@ -28,6 +29,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
@@ -523,23 +525,32 @@ public class MapReduceOper extends Opera
     }
 
     private POCounter getCounterOperation() {
-        PhysicalOperator operator;
-        Iterator<PhysicalOperator> it =  this.mapPlan.getLeaves().iterator();
-
-        while(it.hasNext()) {
-            operator = it.next();
-            if(operator instanceof POCounter)
-                return (POCounter) operator;
+        POCounter counter = getCounterOperation(this.mapPlan);
+        if (counter == null) {
+            counter = getCounterOperation(this.reducePlan);
         }
+        return counter;
+    }
 
-        it =  this.reducePlan.getLeaves().iterator();
+    private POCounter getCounterOperation(PhysicalPlan plan) {
+        PhysicalOperator operator;
+        Iterator<PhysicalOperator> it = plan.getLeaves().iterator();
 
-        while(it.hasNext()) {
+        while (it.hasNext()) {
             operator = it.next();
-            if(operator instanceof POCounter)
+            if (operator instanceof POCounter) {
                 return (POCounter) operator;
+            } else if (operator instanceof POStore) {
+                List<PhysicalOperator> preds = plan.getPredecessors(operator);
+                if (preds != null) {
+                    for (PhysicalOperator pred : preds) {
+                        if (pred instanceof POCounter) {
+                            return (POCounter) pred;
+                        }
+                    }
+                }
+            }
         }
-
         return null;
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1629020&r1=1629019&r2=1629020&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Thu Oct  2 17:14:10 2014
@@ -121,6 +121,11 @@ class MultiQueryOptimizer extends MROpPl
                         + " uses customPartitioner, do not merge it");
                 continue;
             }
+            if (successor.isCounterOperation()) {
+                log.debug("Splittee " + successor.getOperatorKey().getId()
+                        + " has POCounter, do not merge it");
+                continue;
+            }
             if (isMapOnly(successor)) {
                 if (isSingleLoadMapperPlan(successor.mapPlan)
                         && isSinglePredecessor(successor)) {

Modified: pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java?rev=1629020&r1=1629019&r2=1629020&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java Thu Oct  2 17:14:10 2014
@@ -17,7 +17,8 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -324,7 +325,7 @@ public class TestPOPartialAgg {
 
             res = partAggOp.getNextTuple();
             assertEquals(POStatus.STATUS_EOP, res.returnStatus);
-            Util.compareActualAndExpectedResults(outputs, expectedOuts);
+            Util.checkQueryOutputsAfterSort(outputs, expectedOuts);
         } else {
             while (true) {
                 Result res = partAggOp.getNextTuple();
@@ -332,7 +333,7 @@ public class TestPOPartialAgg {
                     break;
                 }
             }
-            Util.compareActualAndExpectedResults(outputs, expectedOuts);
+            Util.checkQueryOutputsAfterSort(outputs, expectedOuts);
         }
 
     }

Modified: pig/trunk/test/org/apache/pig/test/TestRank3.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestRank3.java?rev=1629020&r1=1629019&r2=1629020&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestRank3.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestRank3.java Thu Oct  2 17:14:10 2014
@@ -53,6 +53,12 @@ public class TestRank3 {
 
             data = resetData(pigServer);
             data.set("empty");
+            data.set("testsplit",
+                    tuple(1, 2),
+                    tuple(1, 2),
+                    tuple(3, 1),
+                    tuple(2, 4),
+                    tuple(2, 3));
             data.set(
                     "testcascade",
                     tuple(3,2,3),
@@ -156,6 +162,39 @@ public class TestRank3 {
       verifyExpected(data.get("empty_result"), expected);
     }
 
+    @Test
+    public void testRankWithSplitInMap() throws Exception {
+        String query = "R1 = LOAD 'testsplit' USING mock.Storage() AS (a:int,b:int);"
+            + "R2 = rank R1 by a ;"
+            + "R3 = rank R1 ;"
+            + "R4 = union R2, R3;"
+            + "store R4 into 'R4' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+        List<Tuple> expectedResults = Util
+                .getTuplesFromConstantTupleStrings(new String[] { "(1L,1,2)",
+                        "(2L,1,2)", "(3L,3,1)", "(4L,2,4)", "(5L,2,3)", "(1L,1,2)",
+                        "(1L,1,2)", "(3L,2,3)", "(3L,2,4)", "(5L,3,1)" });
+        Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults);
+    }
+
+    @Test
+    public void testRankWithSplitInReduce() throws Exception {
+        String query = "R1 = LOAD 'testsplit' USING mock.Storage() AS (a:int,b:int);"
+            + "R1 = ORDER R1 by b;"
+            + "R2 = rank R1 by a ;"
+            + "R3 = rank R1;"
+            + "R4 = union R2, R3;"
+            + "store R4 into 'R4' using mock.Storage();";
+
+        Util.registerMultiLineQuery(pigServer, query);
+        List<Tuple> expectedResults = Util
+                .getTuplesFromConstantTupleStrings(new String[] { "(1L,3,1)",
+                        "(2L,1,2)", "(3L,1,2)", "(4L,2,3)", "(5L,2,4)", "(1L,1,2)",
+                        "(1L,1,2)", "(3L,2,4)", "(3L,2,3)", "(5L,3,1)" });
+        Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults);
+    }
+
     public void verifyExpected(List<Tuple> out, Set<Tuple> expected) {
         for (Tuple tup : out) {
             assertTrue(expected + " contains " + tup, expected.contains(tup));

Modified: pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1629020&r1=1629019&r2=1629020&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Thu Oct  2 17:14:10 2014
@@ -492,7 +492,7 @@ public class TestUnionOnSchema  {
                                 "(4,5,6,null)",
                         });
         
-        Util.compareActualAndExpectedResults(list1, expectedRes);
+        Util.checkQueryOutputsAfterSort(list1, expectedRes);
         
         assertEquals(0, list2.size());
     }
@@ -852,6 +852,7 @@ public class TestUnionOnSchema  {
      * Udf that has schema of tuple column with no inner schema 
      */
     public static class UDFTupleNullSchema extends EvalFunc <Tuple> {
+        @Override
         public Tuple exec(Tuple input) {
             return input;
         }

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1629020&r1=1629019&r2=1629020&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Thu Oct  2 17:14:10 2014
@@ -540,14 +540,10 @@ public class Util {
          while(actualResultsIt.hasNext()){
              actualResList.add(actualResultsIt.next());
          }
-
-         compareActualAndExpectedResults(actualResList, expectedResList);
-
+         checkQueryOutputsAfterSort(actualResList, expectedResList);
      }
 
-
-
-     static public void compareActualAndExpectedResults(
+     static public void checkQueryOutputsAfterSort(
             List<Tuple> actualResList, List<Tuple> expectedResList) {
          Collections.sort(actualResList);
          Collections.sort(expectedResList);