You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/10/30 23:49:23 UTC
svn commit: r831481 - in /hadoop/pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
test/org/apache/pig/test/TestSkewedJoin.java
Author: gates
Date: Fri Oct 30 22:49:22 2009
New Revision: 831481
URL: http://svn.apache.org/viewvc?rev=831481&view=rev
Log:
PIG-1048: inner join using 'skewed' produces multiple rows for keys with single row in both input relations.
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 30 22:49:22 2009
@@ -109,6 +109,9 @@
BUG FIXES
+PIG-1048: inner join using 'skewed' produces multiple rows for keys with
+ single row in both input relations (sriranjan via gates)
+
PIG-1063: Pig does not call checkOutSpecs() on OutputFormat provided by
StoreFunc in the multistore case (pradeepkth)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Oct 30 22:49:22 2009
@@ -147,13 +147,7 @@
}
}
// number of reducers
- Integer cnt = 0;
- if (minIndex < maxIndex) {
- cnt = maxIndex - minIndex;
- } else {
- cnt = totalReducers[0] + maxIndex - minIndex;
- }
-
+ Integer cnt = maxIndex - minIndex;
reducerMap.put(keyT, new Pair(minIndex, cnt));// 1 is added to account for the 0 index
}
return reducerMap;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Fri Oct 30 22:49:22 2009
@@ -236,7 +236,7 @@
// get the number of total tuples for this key
long tupleCount = (long) (((double) count) / totalSampleCount_
* inputFileSize_ / avgD);
-
+ tupleCount = Math.max(tupleCount, 1);
int redCount = (int) Math.round(Math.ceil((double) tupleCount
/ tupleMCount));
@@ -252,7 +252,7 @@
}
// this is not a skewed key
- if (redCount == 1) {
+ if (redCount <= 1) {
return new Pair<Tuple, Integer>(null, 1);
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Fri Oct 30 22:49:22 2009
@@ -138,7 +138,6 @@
}
-
public void testSkewedJoinWithGroup() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
@@ -290,17 +289,18 @@
lineCount[key][i] ++;
}
}
+
+ int fc = 0;
for(int i=0; i<3; i++) {
- int fc = 0;
for(int j=0; j<7; j++) {
- if (lineCount[i][j] > 0) {
+ if (lineCount[i][j] > 0) {
fc ++;
}
}
- // all three keys are skewed keys,
- // check each key should appear in more than 1 part- file
- assertTrue(fc > 1);
}
+ // atleast one key should be a skewed key
+ // check atleast one key should appear in more than 1 part- file
+ assertTrue(fc > 3);
}
public void testSkewedJoinNullKeys() throws IOException {
@@ -324,4 +324,35 @@
return;
}
+ // pig 1048
+ public void testSkewedJoinOneValue() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE3 + "' as (id,name);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE3 + "' as (id,name);");
+ // Filter key with a single value
+
+ pigServer.registerQuery("C = FILTER A by id == 400;");
+ pigServer.registerQuery("D = FILTER B by id == 400;");
+
+
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("E = join C by id, D by id using \"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("E = join C by id, D by id;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ dbrj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbfrj.size(), dbrj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
+
+ }
}