You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/08 20:10:49 UTC

svn commit: r693182 - in /incubator/pig/branches/types/test/org/apache/pig/test: TestMapReduce.java TestMapReduce2.java TestPOSort.java TestPOUserFunc.java TestProject.java utils/GenRandomData.java

Author: olga
Date: Mon Sep  8 11:10:48 2008
New Revision: 693182

URL: http://svn.apache.org/viewvc?rev=693182&view=rev
Log:
addition of unit tests for NULL testing

Modified:
    incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce2.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java?rev=693182&r1=693181&r2=693182&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java Mon Sep  8 11:10:48 2008
@@ -26,6 +26,7 @@
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.URL;
+import java.text.DecimalFormat;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -56,6 +57,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.junit.Before;
+import org.apache.pig.test.utils.TestHelper;
 
 public class TestMapReduce extends TestCase {
 
@@ -73,22 +75,52 @@
 
     @Test
     public void testBigGroupAll() throws Throwable {
+
         int LOOP_COUNT = 4*1024;
-        File tmpFile = File.createTempFile("test", "txt");
+        File tmpFile = File.createTempFile( this.getName(), ".txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
             ps.println(i);
         }
         ps.close();
+        assertEquals( new Double( LOOP_COUNT ), bigGroupAll( tmpFile) );
+        tmpFile.delete();
+
+    }
+
+    @Test
+    public void testBigGroupAllWithNull() throws Throwable {
+
+        int LOOP_COUNT = 4*1024;
+        File tmpFile = File.createTempFile( this.getName(), ".txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 0; i < LOOP_COUNT; i++) {
+	    if ( i % 10 == 0 ){
+               ps.println("");
+	    } else {
+               ps.println(i);
+	    }
+        }
+        ps.close();
+
+        assertEquals( new Double( LOOP_COUNT ), bigGroupAll( tmpFile) );
+
+        tmpFile.delete();
+
+    }
+
+    @Test
+    public Double bigGroupAll( File tmpFile ) throws Throwable {
+
         String query = "foreach (group (load 'file:" + tmpFile + "') all) generate " + COUNT.class.getName() + "($1) ;";
         System.out.println(query);
         pig.registerQuery("asdf_id = " + query);
         Iterator it = pig.openIterator("asdf_id");
-        tmpFile.delete();
         Tuple t = (Tuple)it.next();
-        Double count = DataType.toDouble(t.get(0));
-        assertEquals(count, (double)LOOP_COUNT);
+
+        return  DataType.toDouble(t.get(0));
     }
+
     
     static public class MyApply extends EvalFunc<DataBag> {
         String field0 = "Got";
@@ -131,25 +163,40 @@
             }
         }
     }
+
     static public class MyStorage implements LoadFunc, StoreFunc {
+
         final static int COUNT = 10;
+
         int count = 0;
+        boolean hasNulls= false;
+
+        public void setNulls(boolean hasNulls ) { this.hasNulls=hasNulls; }
+
         public void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException {
         }
+
         public Tuple getNext() throws IOException {
             if (count < COUNT) {
-                Tuple t = TupleFactory.getInstance().newTuple(Integer.toString(count++));
-                return t;
+
+                   Tuple t = TupleFactory.getInstance().newTuple(Integer.toString(count++));
+                   return t;
+
             }
+
             return null;
         }
+
+
         OutputStream os;
         public void bindTo(OutputStream os) throws IOException {
             this.os = os;
         }
+
         public void finish() throws IOException {
             
         }
+
         public void putNext(Tuple f) throws IOException {
             try {
                 os.write((f.toDelimitedString("-")+"\n").getBytes());            
@@ -235,6 +282,9 @@
 	    }
 
     }
+
+
+
     @Test
     public void testStoreFunction() throws Throwable {
         File tmpFile = File.createTempFile("test", ".txt");
@@ -243,6 +293,8 @@
             ps.println(i+"\t"+i);
         }
         ps.close();
+
+	//Load, Execute and Store query
         String query = "foreach (load 'file:"+tmpFile+"') generate $0,$1;";
         System.out.println(query);
         pig.registerQuery("asdf_id = " + query);
@@ -250,30 +302,97 @@
             pig.deleteFile("frog");
         } catch(Exception e) {}
         pig.store("asdf_id", "frog", MyStorage.class.getName()+"()");
+
+
+	//verify query
+
         InputStream is = FileLocalizer.open("frog", pig.getPigContext());
         BufferedReader br = new BufferedReader(new InputStreamReader(is));
         String line;
         int i = 0;
         while((line = br.readLine()) != null) {
+
             assertEquals(line, Integer.toString(i) + '-' + Integer.toString(i));
             i++;
         }
         br.close();
-        pig.deleteFile("frog");
+        try {
+            pig.deleteFile("frog");
+        } catch(Exception e) {}
+
+
     }
+
+    // This test: "testStoreFunction()" is equivalent to testStoreFunctionNoNulls()
+
     @Test
-    public void testQualifiedFuncions() throws Throwable {
+    public void testStoreFunctionNoNulls() throws Throwable {
+
+        String[][] data = genDataSetFile1( 10, false );
+        storeFunction( data);
+    }
+
+    @Test
+    public void testStoreFunctionWithNulls() throws Throwable {
+
+        String[][] data = genDataSetFile1( 10, true );
+        storeFunction( data);
+    }
+   
+    public void storeFunction(String[][] data) throws Throwable {
+
+        File tmpFile=TestHelper.createTempFile(data) ;
+
+	//Load, Execute and Store query
+        String query = "foreach (load 'file:"+tmpFile+"') generate $0,$1;";
+        System.out.println(query);
+        pig.registerQuery("asdf_id = " + query);
+        try {
+            pig.deleteFile("frog");
+        } catch(Exception e) {}
+        pig.store("asdf_id", "frog", MyStorage.class.getName()+"()");
+
+
+        InputStream is = FileLocalizer.open("frog", pig.getPigContext());
+        BufferedReader br = new BufferedReader(new InputStreamReader(is));
+        String line;
+
+	//verify query
+        int i= 0;
+        while((line = br.readLine()) != null) {
+
+            assertEquals( data[i][0] + '-' + data[i][1], line );
+            i++;
+        }
+
+        br.close();
+
+        try {
+            pig.deleteFile("frog");
+        } catch(Exception e) {}
+
+
+    }
+
+
+    @Test
+    public void testQualifiedFunctions() throws Throwable {
+
+        //create file
         File tmpFile = File.createTempFile("test", ".txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < 1; i++) {
             ps.println(i);
         }
         ps.close();
+
+        // execute query
         String query = "foreach (group (load 'file:"+tmpFile+"' using " + MyStorage.class.getName() + "()) by " + MyGroup.class.getName() + "('all')) generate flatten(" + MyApply.class.getName() + "($1)) ;";
         System.out.println(query);
         pig.registerQuery("asdf_id = " + query);
+
+        //Verfiy query
         Iterator it = pig.openIterator("asdf_id");
-        tmpFile.delete();
         Tuple t;
         int count = 0;
         while(it.hasNext()) {
@@ -282,11 +401,48 @@
             Integer.parseInt(t.get(1).toString());
             count++;
         }
-        assertEquals(count, MyStorage.COUNT);
+
+        assertEquals( MyStorage.COUNT, count );
+    }
+    
+    @Test
+    public void testQualifiedFunctionsWithNulls() throws Throwable {
+
+        //create file
+        File tmpFile = File.createTempFile("test", ".txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 0; i < 1; i++) {
+	    if ( i % 10 == 0 ){
+               ps.println("");
+	    } else {
+               ps.println(i);
+	    }
+        }
+        ps.close();
+
+        // execute query
+        String query = "foreach (group (load 'file:"+tmpFile+"' using " + MyStorage.class.getName() + "()) by " + MyGroup.class.getName() + "('all')) generate flatten(" + MyApply.class.getName() + "($1)) ;";
+        System.out.println(query);
+        pig.registerQuery("asdf_id = " + query);
+
+        //Verfiy query
+        Iterator it = pig.openIterator("asdf_id");
+        Tuple t;
+        int count = 0;
+        while(it.hasNext()) {
+            t = (Tuple) it.next();
+            assertEquals(t.get(0).toString(), "Got");
+            Integer.parseInt(t.get(1).toString());
+            count++;
+        }
+
+        assertEquals( MyStorage.COUNT, count );
     }
     
+
     @Test
     public void testDefinedFunctions() throws Throwable {
+
         File tmpFile = File.createTempFile("test", ".txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < 1; i++) {
@@ -311,6 +467,55 @@
         assertEquals(count, MyStorage.COUNT);
     }
 
+
+   // this test is equivalent to testDefinedFunctions()
+    @Test
+    public void testDefinedFunctionsNoNulls() throws Throwable {
+
+        String[][] data = genDataSetFile1( 10, false );
+        definedFunctions( data);
+    }
+
+
+    @Test
+    public void testDefinedFunctionsWithNulls() throws Throwable {
+
+        String[][] data = genDataSetFile1( 10, true );
+        definedFunctions( data);
+    }
+
+
+    @Test
+    public void definedFunctions(String[][] data) throws Throwable {
+
+        File tmpFile=TestHelper.createTempFile(data) ;
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 0; i < 1; i++) {
+            ps.println(i);
+        }
+        ps.close();
+        pig.registerFunction("foo",
+            new FuncSpec(MyApply.class.getName()+"('foo')"));
+        String query = "foreach (group (load 'file:"+tmpFile+"' using " + MyStorage.class.getName() + "()) by " + MyGroup.class.getName() + "('all')) generate flatten(foo($1)) ;";
+        System.out.println(query);
+        pig.registerQuery("asdf_id = " + query);
+        Iterator it = pig.openIterator("asdf_id");
+        tmpFile.delete();
+        Tuple t;
+        int count = 0;
+        while(it.hasNext()) {
+            t = (Tuple) it.next();
+            assertEquals("foo", t.get(0).toString());
+
+            if ( t.get(1).toString() != "" ) {
+               Integer.parseInt(t.get(1).toString());
+            }
+            count++;
+        }
+        assertEquals(count, MyStorage.COUNT);
+    }
+
+
     @Test
     public void testPigServer() throws Throwable {
         log.debug("creating pig server");
@@ -329,4 +534,74 @@
         long length = pig.fileSize(sampleFileName);
         assertTrue(length > 0);
     }
+
+    /***
+     * For generating a sample dataset as
+     *
+     * no nulls:
+     *   	$0 $1
+     *           0  9
+     *           1  1
+     *           ....
+     *           9  9
+     *
+     * has nulls:
+     *   	$0 $1
+     *           0  9
+     *           1  1
+     *              2
+     *           3  3
+     *           4  4
+     *           5  5
+     *           6   
+     *           7  7
+     *               
+     *           9  9
+     *           
+     */
+    private String[][] genDataSetFile1( int dataLength, boolean hasNulls ) throws IOException {
+
+
+        String[][] data= new String[dataLength][];
+
+        if ( hasNulls == true ) {
+
+        	for (int i = 0; i < dataLength; i++) {
+
+            	     data[i] = new String[2] ;
+                     if ( i == 2 ) {
+            		data[i][0] = "";
+            		data[i][1] = new Integer(i).toString();
+
+		     } else if ( i == 6 ) {
+                   
+            		data[i][0] = new Integer(i).toString();
+            		data[i][1] = "";
+
+		     } else if ( i == 8 ) {
+
+            		data[i][0] = "";
+            		data[i][1] = "";
+
+		     } else {
+            		data[i][0] = new Integer(i).toString();
+            		data[i][1] = new Integer(i).toString();
+           	     }
+	     }
+
+	} else {
+
+        	for (int i = 0; i < dataLength; i++) {
+            		data[i] = new String[2] ;
+            		data[i][0] = new Integer(i).toString();
+            		data[i][1] = new Integer(i).toString();
+        	}
+
+	}
+
+         return  data;
+
+    }
+
+
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce2.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce2.java?rev=693182&r1=693181&r2=693182&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce2.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce2.java Mon Sep  8 11:10:48 2008
@@ -44,76 +44,117 @@
 
     @Test
     public void testUnion1() throws Exception {
-        File tmpFile1 = genDataSetFile1() ;
-        File tmpFile2 = genDataSetFile2() ;
+        File tmpFile1 = genDataSetFile(false, 30 ) ;
+        File tmpFile2 = genDataSetFile(false, 50 ) ;
         pig.registerQuery("a = load 'file:" + tmpFile1 + "'; ") ;
         pig.registerQuery("b = load 'file:" + tmpFile2 + "'; ") ;
         pig.registerQuery("c = union a, b; ") ;
+        
+        verifyUnion( "c", 30 + 50 );
+    }
 
-        Iterator<Tuple> it = pig.openIterator("c");
-        Tuple t = null ;
-        int count = 0 ;
-        while(it.hasNext()) {
-            t = it.next() ;
-            System.out.println(count + ":" + t) ;
-            count++ ;
-        }
-        Assert.assertEquals(count, 30 + 50);
+    @Test
+    public void testUnion1WithNulls() throws Exception {
+
+        File tmpFile1 = genDataSetFile(true, 30 ) ;
+        File tmpFile2 = genDataSetFile(true, 50 ) ;
+        pig.registerQuery("a = load 'file:" + tmpFile1 + "'; ") ;
+        pig.registerQuery("b = load 'file:" + tmpFile2 + "'; ") ;
+        pig.registerQuery("c = union a, b; ") ;
+
+        verifyUnion( "c", 30 + 50 );
     }
 
     @Test
     public void testUnion2() throws Exception {
-        File tmpFile1 = genDataSetFile1() ;
-        File tmpFile2 = genDataSetFile2() ;
+
+        File tmpFile1 = genDataSetFile(false, 30) ;
+        File tmpFile2 = genDataSetFile(false, 50) ;
+        pig.registerQuery("a = load 'file:" + tmpFile1 + "'; ") ;
+        pig.registerQuery("b = load 'file:" + tmpFile2 + "'; ") ;
+        pig.registerQuery("a1 = foreach a generate $0, $1; ") ;
+        pig.registerQuery("b1 = foreach b generate $0, $1; ") ;
+        pig.registerQuery("c = union a1, b1; ") ;
+
+        verifyUnion( "c", 30 + 50 );
+    }
+
+    @Test
+    public void testUnion2WithNulls() throws Exception {
+        File tmpFile1 = genDataSetFile(true, 30) ;
+        File tmpFile2 = genDataSetFile(true, 50) ;
         pig.registerQuery("a = load 'file:" + tmpFile1 + "'; ") ;
         pig.registerQuery("b = load 'file:" + tmpFile2 + "'; ") ;
         pig.registerQuery("a1 = foreach a generate $0, $1; ") ;
         pig.registerQuery("b1 = foreach b generate $0, $1; ") ;
         pig.registerQuery("c = union a1, b1; ") ;
 
-        Iterator<Tuple> it = pig.openIterator("c");
+        verifyUnion( "c", 30 + 50 );
+    }
+
+    //verifies results
+    public void verifyUnion(String id, int actualCount ) throws Exception {
+
+        Iterator<Tuple> it = pig.openIterator(id);
         Tuple t = null ;
         int count = 0 ;
+
         while(it.hasNext()) {
             t = it.next() ;
             System.out.println(count + ":" + t) ;
             count++ ;
         }
-        Assert.assertEquals(count, 30 + 50);
+
+        Assert.assertEquals(count, actualCount);
     }
 
+
     /***
      * For generating a sample dataset
      */
-    private File genDataSetFile1() throws IOException {
+    private File genDataSetFile(boolean hasNulls, int dataLength ) throws IOException {
 
-        int dataLength = 30;
         String[][] data = new String[dataLength][] ;
 
         DecimalFormat formatter = new DecimalFormat("0000000");
 
-        for (int i = 0; i < dataLength; i++) {
-            data[i] = new String[2] ;
-            data[i][0] = formatter.format(i % 10);
-            data[i][1] = formatter.format(dataLength - i);
-        }
+        if ( hasNulls == true ) {
 
-        return TestHelper.createTempFile(data) ;
-    }
+                for (int i = 0; i < dataLength; i++) {
 
-    private File genDataSetFile2() throws IOException {
+                     data[i] = new String[2] ;
+                     if ( i % 7  == 0 ) {
+                        data[i][0] = "";
+                        data[i][1] = formatter.format(dataLength - i);
 
-        int dataLength = 50;
-        String[][] data = new String[dataLength][] ;
+                     } else if ( i % 10 ==0  ) {
 
-        DecimalFormat formatter = new DecimalFormat("0000000");
+                        data[i][0] = formatter.format(i % 10);
+                        data[i][1] = "";
 
-        for (int i = 0; i < dataLength; i++) {
-            data[i] = new String[2] ;
-            data[i][0] = formatter.format(i % 10);
-            data[i][1] = formatter.format(dataLength - i);
-        }
+                     } else if ( i % 13 == 0 ) {
+
+                        data[i][0] = "";
+                        data[i][1] = "";
+
+                     } else {
+                        data[i][0] = formatter.format(i % 10);
+                        data[i][1] = formatter.format(dataLength - i);
+                     }
+             }
+
+        } else {
 
+
+            for (int i = 0; i < dataLength; i++) {
+                data[i] = new String[2] ;
+                data[i][0] = formatter.format(i % 10);
+                data[i][1] = formatter.format(dataLength - i);
+            }
+
+        }
         return TestHelper.createTempFile(data) ;
     }
+
+
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java?rev=693182&r1=693181&r2=693182&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java Mon Sep  8 11:10:48 2008
@@ -49,12 +49,27 @@
 	public void testPOSortAscString() throws ExecException {
 		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
 				MAX_TUPLES, 100);
+		poSortAscString( input );
+	}
+	
+	@Test
+	public void testPOSortAscStringWithNull() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
+				MAX_TUPLES, 100);
+		poSortAscString( input );
+	}
+	
+
+	 
+	public void poSortAscString(DataBag input) throws ExecException {
+		
 		List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
 		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
 		pr1.setResultType(DataType.CHARARRAY);
 		PhysicalPlan expPlan = new PhysicalPlan();
 		expPlan.add(pr1);
 		sortPlans.add(expPlan);
+		
 		List<Boolean> mAscCols = new LinkedList<Boolean>();
 		mAscCols.add(true);
 		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
@@ -62,25 +77,43 @@
 		inputs.add(read);
 		POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
 				sortPlans, mAscCols, null);
+		
+		//verify
 		Tuple t = null;
 		Result res1 = sort.getNext(t);
-		// System.out.println(res1.result);
 		Result res2 = sort.getNext(t);
+
 		while (res2.returnStatus != POStatus.STATUS_EOP) {
 			Object i1 = ((Tuple) res1.result).get(0);
 			Object i2 = ((Tuple) res2.result).get(0);
+			
+			//System.out.println("i1: " + i1.toString() + " i2: " + i2.toString());
 			int i = DataType.compare(i1, i2);
-			// System.out.println(res2.result + " i = " + i);
+			System.out.println("RESULT2=i : " + res2.result + " i = " + i);
 			assertEquals(true, (i <= 0));
 			res1 = res2;
 			res2 = sort.getNext(t);
 		}
 	}
 
+	
 	@Test
 	public void testPOSortDescString() throws ExecException {
 		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
 				MAX_TUPLES, 100);
+		poSortDescString(input);
+	}
+
+	@Test
+	public void testPOSortDescStringWithNulls() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
+				MAX_TUPLES, 100);
+		poSortDescString(input);
+	}
+	
+
+	public void poSortDescString(DataBag input) throws ExecException {
+
 		List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
 		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
 		pr1.setResultType(DataType.CHARARRAY);
@@ -113,6 +146,21 @@
 	public void testPOSortAsc() throws ExecException {
 		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
 				MAX_TUPLES, 100);
+		poSortAscInt( input );
+ 
+	}
+	
+	@Test
+	public void testPOSortAscWithNulls() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
+				MAX_TUPLES, 100);
+		poSortAscInt( input );
+ 
+	}
+	
+
+	public void poSortAscInt( DataBag input) throws ExecException {
+
 		List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
 		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
 		pr1.setResultType(DataType.INTEGER);
@@ -140,11 +188,22 @@
 			res2 = sort.getNext(t);
 		}
 	}
-
+	
 	@Test
-	public void testPOSortDesc() throws ExecException {
+	public void testPOSortDescInt() throws ExecException {
 		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
 				MAX_TUPLES, 100);
+		poSortDescInt(input );
+	}
+	
+	@Test
+	public void testPOSortDescIntWithNulls() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
+				MAX_TUPLES, 100);
+		poSortDescInt(input );
+	}
+	
+	public void poSortDescInt(DataBag input) throws ExecException {
 		List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
 		POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
 		pr1.setResultType(DataType.INTEGER);
@@ -250,6 +309,106 @@
         
     }
 
+    /***
+     * Sorting
+     *  (null, 10)
+     *  (1, 8) 
+     *  (1, null)
+     *  (null,null)
+     *  (3, 8)
+     *  
+     *  BY $1 DESC, $0 ASC 
+     *
+     *  should return
+     *  (null, 10)
+     *  (1, 8 )
+     *  (3, 8 )
+     *  (null,null)
+     *  (1, null)
+
+
+     * @throws ExecException
+     */
+    @Test
+    public void testPOSortMixAscDesc1WithNull() throws ExecException {
+        DataBag input = DefaultBagFactory.getInstance().newDefaultBag() ;
+
+        Tuple t1 = DefaultTupleFactory.getInstance().newTuple() ;
+        t1.append(null);
+        t1.append(10);
+        input.add(t1);
+
+        Tuple t2 = DefaultTupleFactory.getInstance().newTuple() ;
+        t2.append(1);
+        t2.append(8);
+        input.add(t2);
+
+
+        Tuple t3 = DefaultTupleFactory.getInstance().newTuple() ;
+        t3.append(1);
+        t3.append(null);
+        input.add(t3);
+
+        Tuple t4 = DefaultTupleFactory.getInstance().newTuple() ;
+        t4.append(null);
+        t4.append(null);
+        input.add(t4);
+
+        Tuple t5 = DefaultTupleFactory.getInstance().newTuple() ;
+        t5.append(3);
+        t5.append(8);
+        input.add(t5);
+
+        List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
+
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        pr1.setResultType(DataType.INTEGER);
+        PhysicalPlan expPlan1 = new PhysicalPlan();
+        expPlan1.add(pr1);
+        sortPlans.add(expPlan1);
+
+        POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        pr2.setResultType(DataType.INTEGER);
+        PhysicalPlan expPlan2 = new PhysicalPlan();
+        expPlan2.add(pr2);
+        sortPlans.add(expPlan2);
+
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(false);
+        mAscCols.add(true);
+
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                                 sortPlans, mAscCols, null);
+
+        Tuple t = null;
+        Result res ;
+        // output line 1
+        res = sort.getNext(t);
+        Assert.assertEquals(((Tuple) res.result).get(0), null) ;
+        Assert.assertEquals(((Tuple) res.result).get(1), 10) ;
+         // output line 2
+        res = sort.getNext(t);
+        Assert.assertEquals(((Tuple) res.result).get(0), 1) ;
+        Assert.assertEquals(((Tuple) res.result).get(1), 8) ;
+        // output line 3
+        res = sort.getNext(t);
+        Assert.assertEquals(((Tuple) res.result).get(0), 3) ;
+        Assert.assertEquals(((Tuple) res.result).get(1), 8) ;
+        // output line 4
+        res = sort.getNext(t);
+        Assert.assertEquals(((Tuple) res.result).get(0), null) ;
+        Assert.assertEquals(((Tuple) res.result).get(1), null) ;
+        // output line 5
+        res = sort.getNext(t);
+        Assert.assertEquals(((Tuple) res.result).get(0), 1 );
+        Assert.assertEquals(((Tuple) res.result).get(1), null) ;
+
+      
+    }
 
     /***
      * Sorting
@@ -267,6 +426,8 @@
      * @throws ExecException
      */
     
+
+
     @Test
     public void testPOSortMixAscDesc2() throws ExecException {
         DataBag input = DefaultBagFactory.getInstance().newDefaultBag() ;
@@ -328,12 +489,125 @@
         Assert.assertEquals(((Tuple) res.result).get(1), 2) ;
 
     }
+    
+    /***
+     * Sorting
+     *  (null, 10)
+     *  (1, 8) 
+     *  (1, null)
+     *  (null,null)
+     *  (3, 8)
+     *  
+     *  BY $0 DESC, $1 ASC 
+     *
+     *  should return
+     *  (3, 8 )
+     *  (1, null)
+     *  (1, 8 )
+     *  (null,null)
+     *  (null, 10)
+     * @throws ExecException
+     */
+
+    @Test
+    public void testPOSortMixAscDesc2Null() throws ExecException {
+        DataBag input = DefaultBagFactory.getInstance().newDefaultBag() ;
+
+        Tuple t1 = DefaultTupleFactory.getInstance().newTuple() ;
+        t1.append(null);
+        t1.append(10);
+        input.add(t1);
+
+        Tuple t2 = DefaultTupleFactory.getInstance().newTuple() ;
+        t2.append(1);
+        t2.append(8);
+        input.add(t2);
+
+
+        Tuple t3 = DefaultTupleFactory.getInstance().newTuple() ;
+        t3.append(1);
+        t3.append(null);
+        input.add(t3);
+
+        Tuple t4 = DefaultTupleFactory.getInstance().newTuple() ;
+        t4.append(null);
+        t4.append(null);
+        input.add(t4);
+
+        Tuple t5 = DefaultTupleFactory.getInstance().newTuple() ;
+        t5.append(3);
+        t5.append(8);
+        input.add(t5);
+
+        List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
+
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        pr1.setResultType(DataType.INTEGER);
+        PhysicalPlan expPlan1 = new PhysicalPlan();
+        expPlan1.add(pr1);
+        sortPlans.add(expPlan1);
+
+        POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        pr2.setResultType(DataType.INTEGER);
+        PhysicalPlan expPlan2 = new PhysicalPlan();
+        expPlan2.add(pr2);
+        sortPlans.add(expPlan2);
+
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(false);
+        mAscCols.add(true);
+
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
 
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                                 sortPlans, mAscCols, null);
 
+        Tuple t = null;
+        Result res ;
+        // output line 1
+        res = sort.getNext(t);
+        Assert.assertEquals(((Tuple) res.result).get(0), 3) ;
+        Assert.assertEquals(((Tuple) res.result).get(1), 8) ;
+        // output line 2
+        res = sort.getNext(t);
+        Assert.assertEquals(((Tuple) res.result).get(0), 1) ;
+        Assert.assertEquals(((Tuple) res.result).get(1), null) ;
+        // output line 3
+        res = sort.getNext(t);
+        Assert.assertEquals(((Tuple) res.result).get(0), 1) ;
+        Assert.assertEquals(((Tuple) res.result).get(1), 8) ;
+        // output line 4
+        res = sort.getNext(t);
+        Assert.assertEquals(((Tuple) res.result).get(0), null) ;
+        Assert.assertEquals(((Tuple) res.result).get(1), null) ;
+        // output line 5
+        res = sort.getNext(t);
+        Assert.assertEquals(((Tuple) res.result).get(0), null) ;
+        Assert.assertEquals(((Tuple) res.result).get(1), 10) ;
+
+
+    }
+    
     @Test
 	public void testPOSortUDF() throws ExecException {
 		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
 				MAX_TUPLES, 100);
+		poSortUDFWithNull(input);
+     
+    }
+    
+    @Test
+	public void testPOSortUDFWithNull() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
+				MAX_TUPLES, 100);
+		poSortUDFWithNull(input);
+     
+    }
+    
+
+	public void poSortUDFWithNull(DataBag input) throws ExecException {
 		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
 		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
 		inputs.add(read);
@@ -349,16 +623,16 @@
 		// System.out.println(res1.result);
 		Result res2 = sort.getNext(t);
 		while (res2.returnStatus != POStatus.STATUS_EOP) {
-			int i1 = (Integer) ((Tuple) res1.result).get(1);
-			int i2 = (Integer) ((Tuple) res2.result).get(1);
+			int i1 = ((Integer) ((Tuple) res1.result).get(1) == null ? 0 : (Integer) ((Tuple) res1.result).get(1));
+			int i2 = ((Integer) ((Tuple) res2.result).get(1) == null ? 0 : (Integer) ((Tuple) res2.result).get(1));
 			int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
 			assertEquals(true, (i <= 0));
 			System.out.println(i + " : " + res2.result);
 			res1 = res2;
 			res2 = sort.getNext(t);
 		}
-	}
-
+     
+    }
 	// sorts values in ascending order of their distance from 50
 	public static class WeirdComparator extends ComparisonFunc {
 
@@ -367,8 +641,8 @@
 			// TODO Auto-generated method stub
 			int result = 0;
 			try {
-				int i1 = (Integer) t1.get(1);
-				int i2 = (Integer) t2.get(1);
+				int i1 = ((Integer) t1.get(1) == null ? 0 : (Integer)t1.get(1));
+				int i2 = ((Integer) t2.get(1) == null ? 0 : (Integer)t2.get(1));
 				result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
 			} catch (ExecException e) {
 				// TODO Auto-generated catch block

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java?rev=693182&r1=693181&r2=693182&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java Mon Sep  8 11:10:48 2008
@@ -84,6 +84,9 @@
 				// TODO Auto-generated catch block
 				e.printStackTrace();
 			}
+                        if ( o1==null || o2==null ){
+                           return -1;
+                        }
 			int i1 = (Integer) o1 - 2;
 			int i2 = (Integer) o2 - 2;
 
@@ -233,6 +236,17 @@
 	public void testUserFuncArity() throws ExecException {
 		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
 				MAX_TUPLES, 100);
+		userFuncArity( input );
+        }
+
+	@Test
+	public void testUserFuncArityWithNulls() throws ExecException {
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r,
+				MAX_TUPLES, 100);
+		userFuncArity( input );
+        }
+
+	public void userFuncArity(DataBag input ) throws ExecException {
 		String funcSpec = ARITY.class.getName() + "()";
 		PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
 		List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
@@ -250,10 +264,25 @@
 		}
 	}
 
+
 	@Test
 	public void testUDFCompare() throws ExecException {
-		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2,
-				100);
+
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2, 100);
+	        udfCompare(input);
+
+        }
+
+	@Test
+	public void testUDFCompareWithNulls() throws ExecException {
+
+		DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBagWithNulls(r, 2, 100);
+	        udfCompare(input);
+
+        }
+
+	public void udfCompare(DataBag input) throws ExecException {
+
 		String funcSpec = WeirdComparator.class.getName() + "()";
 		POUserComparisonFunc userFunc = new POUserComparisonFunc(new OperatorKey("", r.nextLong()),
 				-1, null, new FuncSpec(funcSpec));
@@ -271,7 +300,36 @@
 
 	@Test
 	public void testAlgebraicAVG() throws IOException, ExecException {
-		int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+	     Integer input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+             algebraicAVG( input, 55, 10, 110, 20, 5.5 );
+
+        }
+
+        /* NOTE: for calculating the average
+         *
+         * A pig "count" will include data that had "null",and the sum will
+         * A pig "count" will include data that had "null",and the sum will
+         * treat the null as a 0, impacting the average
+         * A SQL "count" will exclude data that had "null"
+         */
+	@Test
+	public void testAlgebraicAVGWithNulls() throws IOException, ExecException {
+
+	     Integer input[] = { 1, 2, 3, 4, null, 6, 7, 8, 9, 10 };
+             algebraicAVG( input, 50, 10, 100, 20, 5 );
+
+        }
+
+	@Test
+	public void algebraicAVG( 
+                 Integer[] input 
+               , double initialExpectedSum, long initialExpectedCount
+               , double intermedExpectedSum, long intermedExpectedCount
+               , double expectedAvg
+         ) throws IOException, ExecException {
+
+                // generate data
 		byte INIT = 0;
 		byte INTERMED = 1;
 		byte FINAL = 2;
@@ -285,8 +343,8 @@
 		POUserFunc po = new POUserFunc(new OperatorKey("", r.nextLong()), -1,
 				null, new FuncSpec(funcSpec));
 
+                //************ Initial Calculations ******************
 		TupleFactory tf = TupleFactory.getInstance();
-
 		po.setAlgebraicFunction(INIT);
 		po.attachInput(tup1);
 		Tuple t = null;
@@ -299,8 +357,10 @@
 		assertEquals(outputInitial1, outputInitial2);
 		double sum = (Double) outputInitial1.get(0);
 		long count = (Long) outputInitial1.get(1);
-		assertEquals(55.0, sum);
-		assertEquals(10, count);
+		assertEquals(initialExpectedSum, sum);
+		assertEquals(initialExpectedCount, count);
+
+                //************ Intermediate Data and Calculations ******************
 		DataBag bag = BagFactory.getInstance().newDefaultBag();
 		bag.add(outputInitial1);
 		bag.add(outputInitial2);
@@ -317,9 +377,11 @@
 
 		sum = (Double) outputIntermed.get(0);
 		count = (Long) outputIntermed.get(1);
-		assertEquals(110.0, sum);
-		assertEquals(20, count);
+		assertEquals(intermedExpectedSum, sum);
+		assertEquals(intermedExpectedCount, count);
 		System.out.println(outputIntermed);
+
+                //************ Final Calculations ******************
 		po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
 				new FuncSpec(funcSpec));
 		po.setAlgebraicFunction(FINAL);
@@ -328,7 +390,7 @@
 		Double output = (res.returnStatus == POStatus.STATUS_OK) ? (Double) res.result
 				: null;
 		// Double output = fin.exec(outputInitial);
-		assertEquals(5.5, output);
+		assertEquals( expectedAvg, output);
 		// System.out.println("output = " + output);
 
 	}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java?rev=693182&r1=693181&r2=693182&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java Mon Sep  8 11:10:48 2008
@@ -42,7 +42,7 @@
 public class TestProject extends  junit.framework.TestCase {
     Random r;
 
-    Tuple t;
+    Tuple t, tRandom, tRandomAndNull;
 
     Result res;
 
@@ -51,7 +51,8 @@
     @Before
     public void setUp() throws Exception {
         r = new Random();
-        t = GenRandomData.genRandSmallBagTuple(r, 10, 100);
+        tRandom = GenRandomData.genRandSmallBagTuple(r, 10, 100);
+        tRandomAndNull = GenRandomData.genRandSmallBagTupleWithNulls(r, 10, 100);
         res = new Result();
         proj = GenPhyOp.exprProject();
     }
@@ -62,6 +63,7 @@
 
     @Test
     public void testGetNext() throws ExecException, IOException {
+    	t=tRandom;
         proj.attachInput(t);
         for (int j = 0; j < t.size(); j++) {
             proj.attachInput(t);
@@ -73,8 +75,11 @@
         }
     }
 
+    
+
     @Test
     public void testGetNextTuple() throws IOException, ExecException {
+    	t=tRandom;
         proj.attachInput(t);
         proj.setColumn(0);
         proj.setOverloaded(true);
@@ -102,8 +107,12 @@
         assertEquals(t.get(9), res.result);
     }
 
+
+
+
     @Test
     public void testGetNextMultipleProjections() throws ExecException, IOException {
+    	t=tRandom;
         ArrayList<Integer> cols = new ArrayList<Integer>();
         proj.attachInput(t);
         for (int j = 0; j < t.size() - 1; j++) {
@@ -124,8 +133,12 @@
         }
     }
 
+    
+
+    
     @Test
     public void testGetNextTupleMultipleProjections() throws IOException, ExecException {
+    	t=tRandom;
         proj.attachInput(t);
         proj.setOverloaded(true);
         int j = 0;
@@ -156,4 +169,113 @@
         assertEquals(POStatus.STATUS_OK, res.returnStatus);
         assertEquals(t.get(9), res.result);
     }
+    
+    @Test
+    public void testGetNextWithNull() throws ExecException, IOException {
+    	t= tRandomAndNull;
+        proj.attachInput(t);
+        for (int j = 0; j < t.size(); j++) {
+            proj.attachInput(t);
+            proj.setColumn(j);
+
+            res = proj.getNext();
+            assertEquals(POStatus.STATUS_OK, res.returnStatus);
+            assertEquals(t.get(j), res.result);
+        }
+    }
+
+    
+	@Test
+    public void testGetNextTupleWithNull() throws IOException, ExecException {
+    	t= tRandomAndNull;
+        proj.attachInput(t);
+        proj.setColumn(0);
+        proj.setOverloaded(true);
+        DataBag inpBag = (DataBag) t.get(0);
+        int cntr = 0;
+        boolean contains = true;
+        while (true) {
+            res = proj.getNext(t);
+            if (res.returnStatus == POStatus.STATUS_EOP)
+                break;
+            if (!TestHelper.bagContains(inpBag, (Tuple) res.result)) {
+                contains = false;
+                break;
+            }
+            ++cntr;
+        }
+        assertEquals((float) (inpBag).size(), (float) cntr, 0.01f);
+        assertEquals(true, contains);
+
+        proj.attachInput(t);
+        proj.setColumn(9);
+        proj.setOverloaded(false);
+        res = proj.getNext(t);
+        assertEquals(POStatus.STATUS_OK, res.returnStatus);
+        assertEquals(t.get(9), res.result);
+    }
+
+
+
+
+    @Test
+    public void testGetNextMultipleProjectionsWithNull() throws ExecException, IOException {
+    	t= tRandomAndNull;
+        ArrayList<Integer> cols = new ArrayList<Integer>();
+        proj.attachInput(t);
+        for (int j = 0; j < t.size() - 1; j++) {
+            proj.attachInput(t);
+            cols.add(j);
+            cols.add(j+1);
+            proj.setColumns(cols);
+
+            res = proj.getNext();
+	        TupleFactory tupleFactory = TupleFactory.getInstance();
+	        ArrayList<Object> objList = new ArrayList<Object>(); 
+            objList.add(t.get(j)); 
+            objList.add(t.get(j+1)); 
+		    Tuple expectedResult = tupleFactory.newTuple(objList);
+            assertEquals(POStatus.STATUS_OK, res.returnStatus);
+            assertEquals(expectedResult, res.result);
+            cols.clear();
+        }
+    }
+
+    
+
+    
+    @Test
+    public void testGetNextTupleMultipleProjectionsWithNull() throws IOException, ExecException {
+    	t= tRandomAndNull;
+        proj.attachInput(t);
+        proj.setOverloaded(true);
+        int j = 0;
+        ArrayList<Integer> cols = new ArrayList<Integer>();
+
+        while (true) {
+            cols.add(j);
+            cols.add(j+1);
+            proj.setColumns(cols);
+            res = proj.getNext(t);
+            if (res.returnStatus == POStatus.STATUS_EOP)
+                break;
+	        TupleFactory tupleFactory = TupleFactory.getInstance();
+	        ArrayList<Object> objList = new ArrayList<Object>(); 
+            objList.add(t.get(j)); 
+            objList.add(t.get(j+1)); 
+		    Tuple expectedResult = tupleFactory.newTuple(objList);
+            assertEquals(POStatus.STATUS_OK, res.returnStatus);
+            assertEquals(expectedResult, res.result);
+            ++j;
+            cols.clear();
+        }
+
+        proj.attachInput(t);
+        proj.setColumn(9);
+        proj.setOverloaded(false);
+        res = proj.getNext(t);
+        assertEquals(POStatus.STATUS_OK, res.returnStatus);
+        assertEquals(t.get(9), res.result);
+    }
+
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java?rev=693182&r1=693181&r2=693182&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java Mon Sep  8 11:10:48 2008
@@ -215,4 +215,25 @@
         }
         return db;
     }
+    
+    public static Tuple genRandSmallBagTupleWithNulls(Random r, int num, int limit){
+        if(r==null){
+            Tuple t = new DefaultTuple();
+            t.append("RANDOM");
+            return t;
+        }
+        Tuple t = new DefaultTuple();
+        t.append(genRandSmallTupDataBag(r, num, limit));
+        t.append(r.nextBoolean());
+        t.append(genRandDBA(r));
+        t.append(genRandString(r));
+        t.append(r.nextDouble());
+        t.append(r.nextFloat());
+        t.append(r.nextInt());
+        t.append(r.nextLong());
+        t.append(genRandMap(r, num));
+        t.append(genRandSmallTuple(r, 100));
+        t.append(null);
+        return t;
+    }
 }