You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/12/09 01:14:45 UTC

svn commit: r1043795 - in /pig/trunk: CHANGES.txt src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java test/org/apache/pig/test/TestSkewedJoin.java

Author: daijy
Date: Thu Dec  9 00:14:45 2010
New Revision: 1043795

URL: http://svn.apache.org/viewvc?rev=1043795&view=rev
Log:
PIG-1709: Skewed join use fewer reducer for extreme large key

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1043795&r1=1043794&r2=1043795&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec  9 00:14:45 2010
@@ -230,6 +230,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1709: Skewed join use fewer reducer for extreme large key (daijy)
+
 PIG-1751: New logical plan: PushDownForEachFlatten fail in UDF with unknown
 output schema (daijy)
 

Modified: pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=1043795&r1=1043794&r2=1043795&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Thu Dec  9 00:14:45 2010
@@ -260,9 +260,10 @@ public class PartitionSkewedKeys extends
 	            t.set(i, currentTuple.get(i));
 	        }
 
+	        int effectiveRedCount = redCount > totalReducers_? totalReducers_:redCount;
 	        // set the min index of reducer for this key
 	        t.set(i++, currentIndex_);
-	        currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1;
+	        currentIndex_ = (currentIndex_ + effectiveRedCount) % totalReducers_ - 1;
 	        if (currentIndex_ < 0) {
 	            currentIndex_ += totalReducers_;
 	        }

Modified: pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=1043795&r1=1043794&r2=1043795&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Thu Dec  9 00:14:45 2010
@@ -20,6 +20,7 @@ package org.apache.pig.test;
 
 import java.io.*;
 import java.util.Iterator;
+import java.util.Map;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
@@ -31,7 +32,10 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultDataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -533,4 +537,35 @@ public class TestSkewedJoin extends Test
         Util.deleteFile(cluster, INPUT_FILE);
           
     }
+    
+    @Test
+    public void testSkewedJoinUDF() throws IOException {
+        PartitionSkewedKeys udf = new PartitionSkewedKeys(new String[]{"0.1", "2", "1.txt"});
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append(3);    // use 3 reducers
+        DataBag db = new DefaultDataBag();
+        Tuple sample;
+        for (int i=0;i<=3;i++) {
+            sample = TupleFactory.getInstance().newTuple();
+            if (i!=3)
+                sample.append("1");
+            else
+                sample.append("2");
+            sample.append((long)200);
+            if (i!=3)
+                sample.append((long)0);
+            else
+                sample.append((long)30);
+            db.add(sample);
+        }
+        t.append(db);
+        Map<String, Object> output = udf.exec(t);
+        DataBag parList = (DataBag)output.get(PartitionSkewedKeys.PARTITION_LIST);
+        for (Tuple par : parList) {
+            if (par.get(0).equals("1")) {
+                par.get(1).equals(0);
+                par.get(2).equals(2);
+            }
+        }
+    }
 }