You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/02/04 23:13:29 UTC

svn commit: r1657425 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig/test/

Author: daijy
Date: Wed Feb  4 22:13:28 2015
New Revision: 1657425

URL: http://svn.apache.org/r1657425
Log:
PIG-4392: RANK BY fails when default_parallel is greater than cardinality of field being ranked by

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1657425&r1=1657424&r2=1657425&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Feb  4 22:13:28 2015
@@ -48,6 +48,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4392: RANK BY fails when default_parallel is greater than cardinality of field being ranked by (daijy)
+
 PIG-4403: Combining -Dpig.additional.jars.uris with -useHCatalog breaks due to combination
  with colon instead of comma (ovlaere via daijy)
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1657425&r1=1657424&r2=1657425&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Feb  4 22:13:28 2015
@@ -407,17 +407,13 @@ public class JobControlCompiler{
             }
             groupCounters = counters.getGroup(groupName);
 
-            Iterator<Counter> it = groupCounters.iterator();
             HashMap<Integer,Long> counterList = new HashMap<Integer, Long>();
 
-            while(it.hasNext()) {
-                try{
-                    Counter c = it.next();
-                    counterList.put(Integer.valueOf(c.getDisplayName()), c.getValue());
-                } catch (Exception ex) {
-                    ex.printStackTrace();
-                }
+            for (int i=0;i<job.getJob().getNumReduceTasks();i++) {
+                Long value = groupCounters.getCounter(Integer.toString(i));
+                counterList.put(i, value);
             }
+
             counterSize = counterList.size();
             counterPairs = new ArrayList<Pair<String,Long>>();
 

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1657425&r1=1657424&r2=1657425&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Feb  4 22:13:28 2015
@@ -422,6 +422,7 @@ public class LogToPhyTranslationVisitor
                 poSort = new POSort(new OperatorKey(scope, nodeGen
                         .getNextNodeId(scope)), -1, null,
                         newPhysicalPlan, newOrderPlan, null);
+                poSort.setRequestedParallelism(loRank.getRequestedParallelism());
                 poSort.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
 
 

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1657425&r1=1657424&r2=1657425&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java Wed Feb  4 22:13:28 2015
@@ -1265,4 +1265,27 @@ public class TestEvalPipelineLocal {
         Assert.assertEquals(iter.next().toString(), "(1)");
         Assert.assertFalse(iter.hasNext());
     }
+
+    // see PIG-4392
+    @Test
+    public void testRankWithEmptyReduce() throws Exception {
+        File f1 = createFile(new String[]{"1\t2\t3", "4\t5\t6", "7\t8\t9"});
+        pigServer.setDefaultParallel(4);
+        
+        pigServer.registerQuery("d = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext())
+                + "' as (a:int, b:int, c:int);");
+        pigServer.registerQuery("e = rank d by a parallel 4;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("e");
+
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,1,2,3)");
+        results.add("(2,4,5,6)");
+        results.add("(3,7,8,9)");
+        
+        Assert.assertTrue(results.contains(iter.next().toString()));
+        Assert.assertTrue(results.contains(iter.next().toString()));
+        Assert.assertTrue(results.contains(iter.next().toString()));
+        Assert.assertFalse(iter.hasNext());
+    }
 }