You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/02/04 23:13:29 UTC
svn commit: r1657425 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig/test/
Author: daijy
Date: Wed Feb 4 22:13:28 2015
New Revision: 1657425
URL: http://svn.apache.org/r1657425
Log:
PIG-4392: RANK BY fails when default_parallel is greater than cardinality of field being ranked by
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1657425&r1=1657424&r2=1657425&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Feb 4 22:13:28 2015
@@ -48,6 +48,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4392: RANK BY fails when default_parallel is greater than cardinality of field being ranked by (daijy)
+
PIG-4403: Combining -Dpig.additional.jars.uris with -useHCatalog breaks due to combination
with colon instead of comma (ovlaere via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1657425&r1=1657424&r2=1657425&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Feb 4 22:13:28 2015
@@ -407,17 +407,13 @@ public class JobControlCompiler{
}
groupCounters = counters.getGroup(groupName);
- Iterator<Counter> it = groupCounters.iterator();
HashMap<Integer,Long> counterList = new HashMap<Integer, Long>();
- while(it.hasNext()) {
- try{
- Counter c = it.next();
- counterList.put(Integer.valueOf(c.getDisplayName()), c.getValue());
- } catch (Exception ex) {
- ex.printStackTrace();
- }
+ for (int i=0;i<job.getJob().getNumReduceTasks();i++) {
+ Long value = groupCounters.getCounter(Integer.toString(i));
+ counterList.put(i, value);
}
+
counterSize = counterList.size();
counterPairs = new ArrayList<Pair<String,Long>>();
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1657425&r1=1657424&r2=1657425&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Feb 4 22:13:28 2015
@@ -422,6 +422,7 @@ public class LogToPhyTranslationVisitor
poSort = new POSort(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), -1, null,
newPhysicalPlan, newOrderPlan, null);
+ poSort.setRequestedParallelism(loRank.getRequestedParallelism());
poSort.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1657425&r1=1657424&r2=1657425&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java Wed Feb 4 22:13:28 2015
@@ -1265,4 +1265,27 @@ public class TestEvalPipelineLocal {
Assert.assertEquals(iter.next().toString(), "(1)");
Assert.assertFalse(iter.hasNext());
}
+
+ // see PIG-4392
+ @Test
+ public void testRankWithEmptyReduce() throws Exception {
+ File f1 = createFile(new String[]{"1\t2\t3", "4\t5\t6", "7\t8\t9"});
+ pigServer.setDefaultParallel(4);
+
+ pigServer.registerQuery("d = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext())
+ + "' as (a:int, b:int, c:int);");
+ pigServer.registerQuery("e = rank d by a parallel 4;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("e");
+
+ Collection<String> results = new HashSet<String>();
+ results.add("(1,1,2,3)");
+ results.add("(2,4,5,6)");
+ results.add("(3,7,8,9)");
+
+ Assert.assertTrue(results.contains(iter.next().toString()));
+ Assert.assertTrue(results.contains(iter.next().toString()));
+ Assert.assertTrue(results.contains(iter.next().toString()));
+ Assert.assertFalse(iter.hasNext());
+ }
}