You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/10/30 23:49:23 UTC

svn commit: r831481 - in /hadoop/pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java test/org/apache/pig/test/TestSkewedJoin.java

Author: gates
Date: Fri Oct 30 22:49:22 2009
New Revision: 831481

URL: http://svn.apache.org/viewvc?rev=831481&view=rev
Log:
PIG-1048: inner join using 'skewed' produces multiple rows for keys with single row in both input relations.

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 30 22:49:22 2009
@@ -109,6 +109,9 @@
 
 BUG FIXES
 
+PIG-1048: inner join using 'skewed' produces multiple rows for keys with
+          single row in both input relations (sriranjan via gates)
+
 PIG-1063: Pig does not call checkOutSpecs() on OutputFormat provided by
 StoreFunc in the multistore case (pradeepkth)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Oct 30 22:49:22 2009
@@ -147,13 +147,7 @@
 				}
 			}
 			// number of reducers
-			Integer cnt = 0;
-			if (minIndex < maxIndex) {
-				cnt = maxIndex - minIndex;
-			} else {
-				cnt = totalReducers[0] + maxIndex - minIndex;
-			}
-
+			Integer cnt = maxIndex - minIndex;
 			reducerMap.put(keyT, new Pair(minIndex, cnt));// 1 is added to account for the 0 index
 		}		
 		return reducerMap;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Fri Oct 30 22:49:22 2009
@@ -236,7 +236,7 @@
 		// get the number of total tuples for this key
 		long tupleCount = (long) (((double) count) / totalSampleCount_
 				* inputFileSize_ / avgD);	
-
+		tupleCount = Math.max(tupleCount, 1);
 
 		int redCount = (int) Math.round(Math.ceil((double) tupleCount
 				/ tupleMCount));
@@ -252,7 +252,7 @@
 		}
 
 		// this is not a skewed key
-		if (redCount == 1) {
+		if (redCount <= 1) {
 			return new Pair<Tuple, Integer>(null, 1);
 		}
 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=831481&r1=831480&r2=831481&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Fri Oct 30 22:49:22 2009
@@ -138,7 +138,6 @@
 
     }
     
-    
     public void testSkewedJoinWithGroup() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
@@ -290,17 +289,18 @@
       	        lineCount[key][i] ++;
       	    }
          }
+         
+         int fc = 0;
          for(int i=0; i<3; i++) {
-        	 int fc = 0;
         	 for(int j=0; j<7; j++) {
-        		 if (lineCount[i][j] > 0) {
+        	     if (lineCount[i][j] > 0) {
         			 fc ++;
         		 }
         	 }
-        	 // all three keys are skewed keys,
-        	 // check each key should appear in more than 1 part- file
-        	 assertTrue(fc > 1);
          }
+         // atleast one key should be a skewed key
+         // check atleast one key should appear in more than 1 part- file
+         assertTrue(fc > 3);
     }
     
     public void testSkewedJoinNullKeys() throws IOException {
@@ -324,4 +324,35 @@
         return;
     }
     
+    // pig 1048
+    public void testSkewedJoinOneValue() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE3 + "' as (id,name);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE3 + "' as (id,name);");
+        // Filter key with a single value
+
+        pigServer.registerQuery("C = FILTER A by id == 400;");
+        pigServer.registerQuery("D = FILTER B by id == 400;");
+
+        
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("E = join C by id, D by id using \"skewed\";");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+                
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+        }
+        {
+        	pigServer.registerQuery("E = join C by id, D by id;");
+        	Iterator<Tuple> iter = pigServer.openIterator("E");
+        
+        	while(iter.hasNext()) {
+        		dbrj.add(iter.next());
+        	}
+        }
+        Assert.assertEquals(dbfrj.size(), dbrj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));       
+       
+    }
 }