You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/12/09 01:14:45 UTC
svn commit: r1043795 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
test/org/apache/pig/test/TestSkewedJoin.java
Author: daijy
Date: Thu Dec 9 00:14:45 2010
New Revision: 1043795
URL: http://svn.apache.org/viewvc?rev=1043795&view=rev
Log:
PIG-1709: Skewed join use fewer reducer for extreme large key
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1043795&r1=1043794&r2=1043795&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec 9 00:14:45 2010
@@ -230,6 +230,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1709: Skewed join use fewer reducer for extreme large key (daijy)
+
PIG-1751: New logical plan: PushDownForEachFlatten fail in UDF with unknown
output schema (daijy)
Modified: pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=1043795&r1=1043794&r2=1043795&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Thu Dec 9 00:14:45 2010
@@ -260,9 +260,10 @@ public class PartitionSkewedKeys extends
t.set(i, currentTuple.get(i));
}
+ int effectiveRedCount = redCount > totalReducers_? totalReducers_:redCount;
// set the min index of reducer for this key
t.set(i++, currentIndex_);
- currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1;
+ currentIndex_ = (currentIndex_ + effectiveRedCount) % totalReducers_ - 1;
if (currentIndex_ < 0) {
currentIndex_ += totalReducers_;
}
Modified: pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=1043795&r1=1043794&r2=1043795&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Thu Dec 9 00:14:45 2010
@@ -20,6 +20,7 @@ package org.apache.pig.test;
import java.io.*;
import java.util.Iterator;
+import java.util.Map;
import junit.framework.Assert;
import junit.framework.TestCase;
@@ -31,7 +32,10 @@ import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.test.utils.TestHelper;
import org.junit.After;
import org.junit.AfterClass;
@@ -533,4 +537,35 @@ public class TestSkewedJoin extends Test
Util.deleteFile(cluster, INPUT_FILE);
}
+
+ @Test
+ public void testSkewedJoinUDF() throws IOException {
+ PartitionSkewedKeys udf = new PartitionSkewedKeys(new String[]{"0.1", "2", "1.txt"});
+ Tuple t = TupleFactory.getInstance().newTuple();
+ t.append(3); // use 3 reducers
+ DataBag db = new DefaultDataBag();
+ Tuple sample;
+ for (int i=0;i<=3;i++) {
+ sample = TupleFactory.getInstance().newTuple();
+ if (i!=3)
+ sample.append("1");
+ else
+ sample.append("2");
+ sample.append((long)200);
+ if (i!=3)
+ sample.append((long)0);
+ else
+ sample.append((long)30);
+ db.add(sample);
+ }
+ t.append(db);
+ Map<String, Object> output = udf.exec(t);
+ DataBag parList = (DataBag)output.get(PartitionSkewedKeys.PARTITION_LIST);
+ for (Tuple par : parList) {
+ if (par.get(0).equals("1")) {
+ par.get(1).equals(0);
+ par.get(2).equals(2);
+ }
+ }
+ }
}