You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2009/10/23 20:10:21 UTC

svn commit: r829159 - in /hadoop/pig/trunk: CHANGES.txt src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java src/org/apache/pig/impl/io/PigNullableWritable.java test/org/apache/pig/test/TestJoin.java

Author: daijy
Date: Fri Oct 23 18:10:20 2009
New Revision: 829159

URL: http://svn.apache.org/viewvc?rev=829159&view=rev
Log:
PIG-927: null should be handled consistently in Join

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=829159&r1=829158&r2=829159&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 23 18:10:20 2009
@@ -115,6 +115,8 @@
 
 PIG-644: Duplicate column names in foreach do not throw parser error (daijy)
 
+PIG-927: null should be handled consistently in Join (daijy)
+
 Release 0.5.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=829159&r1=829158&r2=829159&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java Fri Oct 23 18:10:20 2009
@@ -246,7 +246,23 @@
 		throw new RuntimeException("Error comparing tuples");
 	    }
 	    
-	    return DataType.compare(t1, t2);
+	    int result = DataType.compare(t1, t2);
+	    
+	    // Further check if any field is null
+        // See PIG-927
+	    if (result == 0 && t1 instanceof Tuple && t2 instanceof Tuple)
+	    {
+	        try {
+    	        int firstInputIndex = (Byte)(o1.get(0));
+                int secondInputIndex = (Byte)(o2.get(0));
+    	        for (int i=0;i<((Tuple)t1).size();i++)
+                    if (((Tuple)t1).get(i)==null)
+                        return firstInputIndex - secondInputIndex;
+            } catch (ExecException e) {
+                throw new RuntimeException("Error comparing tuple fields", e);
+            }
+	    }
+	    return result;
 	}
 	
 	public boolean equals(Object obj) {

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=829159&r1=829158&r2=829159&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java Fri Oct 23 18:10:20 2009
@@ -22,6 +22,8 @@
 import java.io.IOException;
 
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
 
 /**
  * A base class for all types that pig uses to move data between map and
@@ -75,7 +77,21 @@
         }
         
         if (!mNull && !w.mNull) {
-            return mValue.compareTo(w.mValue);
+            int result = mValue.compareTo(w.mValue);
+            
+            // If any of the field inside tuple is null, then we do not merge keys
+            // See PIG-927
+            if (result == 0 && mValue instanceof Tuple && w.mValue instanceof Tuple)
+            {
+                try {
+                    for (int i=0;i<((Tuple)mValue).size();i++)
+                        if (((Tuple)mValue).get(i)==null)
+                            return mIndex - w.mIndex;
+                } catch (ExecException e) {
+                    throw new RuntimeException("Unable to access tuple field", e);
+                }
+            }
+            return result;
         } else if (mNull && w.mNull) {
             // If they're both null, compare the indicies
             if ((mIndex & idxSpace) < (w.mIndex & idxSpace)) return -1;

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=829159&r1=829158&r2=829159&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java Fri Oct 23 18:10:20 2009
@@ -510,4 +510,37 @@
         }
     }
 
+    @Test
+    public void testJoinNullTupleFieldKey() throws Exception{
+        for (ExecType execType : execTypes) {
+            setUp(execType);
+            String[] input1 = {
+                    "1\t",
+                    "2\taa"
+            };
+            String[] input2 = {
+                    "1\t",
+                    "2\taa"
+            };
+            
+            String firstInput = createInputFile(execType, "a.txt", input1);
+            String secondInput = createInputFile(execType, "b.txt", input2);
+            
+            String script = "a = load '"+ firstInput +"' as (a1:int, a2:chararray);" +
+                    "b = load '"+ secondInput +"' as (b1:int, b2:chararray);" +
+                    "c = join a by (a1, a2), b by (b1, b2);";
+            Util.registerMultiLineQuery(pigServer, script);
+            Iterator<Tuple> it = pigServer.openIterator("c");
+            
+            assertTrue(it.hasNext());
+            Tuple t = it.next();
+            assertTrue(t.toString().equals("(2,aa,2,aa)"));
+            
+            assertFalse(it.hasNext());
+            
+            deleteInputFile(execType, firstInput);
+            deleteInputFile(execType, secondInput);
+        }
+    }
+
 }