You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/08/06 20:27:43 UTC

svn commit: r683358 [4/4] - in /incubator/pig/branches/types: src/org/apache/pig/backend/hadoop/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backe...

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java?rev=683358&r1=683357&r2=683358&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java Wed Aug  6 11:27:41 2008
@@ -19,11 +19,15 @@
 
 import java.util.Iterator;
 import java.util.Random;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -42,20 +46,99 @@
 public class TestPOBinCond extends TestCase {
     Random r = new Random();
     DataBag bag = BagFactory.getInstance().newDefaultBag();
+    DataBag bagDefault = BagFactory.getInstance().newDefaultBag();
+    DataBag bagWithNull = BagFactory.getInstance().newDefaultBag();
+    DataBag bagWithBoolean = BagFactory.getInstance().newDefaultBag();
+    DataBag bagWithBooleanAndNull = BagFactory.getInstance().newDefaultBag();
+
     final int MAX = 10;
+
+    /***
+     *  POBinCondition tests
+     *  
+     *  (r1, 1, 0 )
+     *  (r2, 1, 0 )
+     *  (r3, 1, 0 )
+     *  ...
+     *  (rn, 1, 0 )
+     *    
+     *   where r is a random number ( r1 .. rn )
+     *   
+     *   The POBinCondition to test is:  Integer(result)= ( r == 1 )? Ingeger(1), Ingeger(0);
+     *   but the condition can be of any datatype: Interger, Float, Double...
+     *   
+     * @throws ExecException
+     */
     
     @Before
     @Override
     public void setUp() {
+    	
+    	//default bag as shown above
         for(int i = 0; i < 10; i ++) {
             Tuple t = TupleFactory.getInstance().newTuple();
             t.append(r.nextInt(2));
+            t.append(1);
             t.append(0);
+            bagDefault.add(t);
+        }
+       
+        //same as default bag but contains nulls
+        for(int i = 0; i < 10; i ++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            if (r.nextInt(4)%3 == 0){
+            	t.append(null);
+            	
+            }else{
+                t.append(r.nextInt(2));
+            }
             t.append(1);
-            bag.add(t);
+            t.append(0);
+            bagWithNull.add(t);
+
         }
-    }
+        
+        //r is a boolean  
+        for(int i = 0; i < 10; i ++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            if (r.nextInt(2)%2 == 0 ){
+            		t.append(true);
+            } else  {
+            		t.append(false);
+            }
+            t.append(1);
+            t.append(0);
+            bagWithBoolean.add(t);
+
+    	}
     
+        //r is a boolean with nulls
+        for(int i = 0; i < 10; i ++) {
+
+            Tuple t = TupleFactory.getInstance().newTuple();
+            if (r.nextInt(3)%2 == 0){
+            	
+             	t.append(null);
+             	
+            }else{
+            	
+               	if (r.nextInt(2)%2 == 0 ){
+            		t.append(true);
+            	} else {
+            		t.append(false);
+            	}
+
+            }
+            t.append(1);
+            t.append(0);
+            bagWithBooleanAndNull.add(t);
+
+        }
+
+        
+    }
+  
+    /* ORIGINAL TEST
     public void testPOBinCond() throws ExecException, PlanException {
         ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst();
         rt.setValue(1);
@@ -105,4 +188,285 @@
         
         
     }
+  */
+    
+
+    public void testPOBinCondWithInteger() throws  ExecException, PlanException {
+    	
+	    bag= getBag(DataType.INTEGER);
+    	TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.INTEGER, new Integer(1) );
+ 
+    	for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            testHelper.getPlan().attachInput(t);
+            Integer value = (Integer) t.get(0);
+            int expected = (value.intValue() == 1)? 1:0 ;
+            Integer result=(Integer)testHelper.getOperator().getNext(value).result;
+            int actual = result.intValue();
+            assertEquals( expected, actual );
+        }
+
+    }
+   
+    public void testPOBinCondWithLong() throws  ExecException, PlanException {
+        bag= getBag(DataType.LONG);
+       	TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.LONG, new Long(1L) );
+    
+       	for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+               Tuple t = it.next();
+               testHelper.getPlan().attachInput(t);
+               Long value = (Long) t.get(0);
+               int expected = (value.longValue() == 1L )? 1:0 ;
+               Integer dummy = new Integer(0);
+               Integer result=(Integer)testHelper.getOperator().getNext(dummy).result;
+               int actual = result.intValue();
+               assertEquals( expected, actual );
+        }
+    }
+
+    public void testPOBinCondWithFloat() throws  ExecException, PlanException {
+	   	
+		bag= getBag(DataType.FLOAT);
+	   	TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.FLOAT, new Float(1.0f) );
+
+	   	for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+	           Tuple t = it.next();
+	           testHelper.getPlan().attachInput(t);
+	           Float value = (Float) t.get(0);
+	           int expected = (value.floatValue() == 1.0f )? 1:0 ;
+	           Integer dummy = new Integer(0);
+	           Integer result=(Integer)testHelper.getOperator().getNext(dummy).result;
+	           int actual = result.intValue();
+	           assertEquals( expected, actual );
+	    }
+
+	}
+   
+    public void testPOBinCondWithDouble() throws  ExecException, PlanException {
+	   	
+		bag= getBag(DataType.DOUBLE);
+	   	TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.DOUBLE, new Double(1.0) );
+
+	   	for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+	           Tuple t = it.next();
+	           testHelper.getPlan().attachInput(t);
+	           Double value = (Double) t.get(0);
+	           int expected = (value.doubleValue() == 1.0 )? 1:0 ;
+	           Integer dummy = new Integer(0);
+	           Integer result=(Integer)testHelper.getOperator().getNext(dummy).result;
+	           int actual = result.intValue();
+	           assertEquals( expected, actual );
+	    }
+
+    }
+   
+    public void testPOBinCondIntWithNull() throws  ExecException, PlanException {
+   	
+    	bag= getBagWithNulls(DataType.INTEGER);
+       	TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.INTEGER, new Integer(1) );
+    
+       	for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            testHelper.getPlan().attachInput(t);
+            Integer value = null;
+            Integer result;
+
+            if (t.get(0) != null) {
+                value = (Integer) t.get(0);
+                result = (Integer) testHelper.getOperator().getNext(value).result;
+            } else {
+                result = (Integer) testHelper.getOperator().getNext(
+                        (Integer) null).result;
+            }
+            int actual;
+            if (value != null) {
+                int expected = (value.intValue() == 1) ? 1 : 0;
+                actual = result.intValue();
+                assertEquals(expected, actual);
+            } else {
+                assertEquals(null, result);
+            }
+ 
+       }
+
+   }
+   
+    public void testPOBinCondLongWithNull() throws  ExecException, PlanException {
+	   	
+	    bag= getBagWithNulls(DataType.LONG);
+	   	TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.LONG, new Long(1L) );
+
+	   	for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+	           Tuple t = it.next();
+	           testHelper.getPlan().attachInput(t);
+	           
+	           Long value=null;
+	           if ( t.get(0)!=null){
+	        	   value = (Long) t.get(0);
+	           }
+	           Integer dummy = new Integer(0);
+	           Integer result=(Integer)testHelper.getOperator().getNext(dummy).result;	                
+	           int expected;
+	           int actual;
+	           if ( value!=null ) {
+	        	   expected=(value.intValue() == 1)? 1:0 ;
+	        	   actual  = result.intValue();
+	        	   assertEquals( expected, actual );
+	           } else {
+	        	   assertEquals( null, result );
+	           }
+	       }
+	}
+   
+    public void testPOBinCondDoubleWithNull() throws  ExecException, PlanException {
+	   	
+	    bag= getBagWithNulls(DataType.DOUBLE);
+	   	TestPoBinCondHelper testHelper= new TestPoBinCondHelper(DataType.DOUBLE, new Double(1.0) );
+
+	   	for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+	           Tuple t = it.next();
+	           testHelper.getPlan().attachInput(t);
+
+	           Double value=null;
+	           if ( t.get(0)!=null){
+	        	   value = (Double) t.get(0);
+	           }
+	           Integer dummy = new Integer(0);
+	           Integer result=(Integer)testHelper.getOperator().getNext(dummy).result;
+	                
+	           int expected;
+	           int actual;
+	           if ( value!=null ) {
+	        	   expected=(value.intValue() == 1)? 1:0 ;
+	        	   actual  = result.intValue();
+	        	   assertEquals( expected, actual );
+	           } else {
+	        	   assertEquals( null, result );
+	           }
+	          
+
+	       }
+
+	}
+   
+    protected class TestPoBinCondHelper {
+    	
+    	 PhysicalPlan plan= null;
+     	 POBinCond op= null;
+    	 
+   
+		public <U> TestPoBinCondHelper(   byte type,  U value  )  throws  ExecException, PlanException {
+		    	
+				
+		        ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst();
+		        rt.setValue(value);
+		        rt.setResultType(type);
+		        
+		        POProject prj1 = GenPhyOp.exprProject();
+		        prj1.setColumn(0);
+		        prj1.setResultType(type);
+
+		        
+		        EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr();
+		        equal.setLhs(prj1);
+		        equal.setRhs(rt);
+		        equal.setOperandType(type);
+		        
+		        POProject prjLhs = GenPhyOp.exprProject();
+		        prjLhs.setResultType(DataType.INTEGER);
+		        prjLhs.setColumn(1);
+		        
+		        POProject prjRhs =prjRhs = GenPhyOp.exprProject();
+		        prjRhs.setResultType(DataType.INTEGER);
+		        prjRhs.setColumn(2);
+		     
+		        op = new POBinCond(new OperatorKey("", r.nextLong()), -1, equal, prjLhs, prjRhs);
+		        op.setResultType(DataType.INTEGER);
+		       
+		        plan= new PhysicalPlan();
+		        plan.add(op);
+		        plan.add(prjLhs);
+		        plan.add(prjRhs);
+		        plan.add(equal);
+		        plan.connect(equal, op);
+		        plan.connect(prjLhs, op);
+		        plan.connect(prjRhs, op);
+		        
+		        plan.add(prj1);
+		        plan.add(rt);
+		        plan.connect(prj1, equal);
+		        plan.connect(rt, equal);
+		        
+		       // File tmpFile = File.createTempFile("test", ".txt" );
+		       //PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+		       //plan.explain(ps);
+		       //ps.close();
+
+		    }
+		
+		public PhysicalPlan getPlan(){
+			return plan;
+		}
+
+		
+		public POBinCond getOperator(){
+			return op;
+		}
+
+
+	}
+    
+    private DataBag getBag(byte type) {
+        DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
+        for(int i = 0; i < 10; i ++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            switch(type) {
+                case DataType.INTEGER:
+                    t.append(r.nextInt(2));
+                    break;
+                case DataType.LONG:
+                    t.append(r.nextLong() % 2L);
+                    break;
+                case DataType.FLOAT:
+                    t.append((i % 2 == 0 ? 1.0f : 0.0f));
+                    break;
+                case DataType.DOUBLE:
+                    t.append((i % 2 == 0 ? 1.0 : 0.0));
+                    break;                
+            }
+            t.append(1);
+            t.append(0);
+            bag.add(t);
+        }        
+        return bag;        
+    }
+    
+    private DataBag getBagWithNulls(byte type) {
+        DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
+        for(int i = 0; i < 10; i ++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            if (r.nextInt(4)%3 == 0){
+                t.append(null);                
+            }else{
+                switch(type) {
+                    case DataType.INTEGER:
+                        t.append(r.nextInt(2));
+                        break;
+                    case DataType.LONG: 
+                        t.append(r.nextLong() % 2L);
+                        break;
+                    case DataType.FLOAT:
+                        t.append( (i % 2 == 0 ? 1.0f : 0.0f));
+                        break;
+                    case DataType.DOUBLE:
+                        t.append( (i % 2 == 0 ? 1.0 : 0.0));
+                        break;
+                }
+            }            
+            t.append(1);
+            t.append(0);
+            bag.add(t);
+        }
+        return bag;
+    }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java?rev=683358&r1=683357&r2=683358&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Wed Aug  6 11:27:41 2008
@@ -358,6 +358,17 @@
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
+            if(t.get(0) == null) {
+            	
+               	Float result  = (Float)op.getNext((Float)null).result;
+				assertEquals( null, result);
+
+            } 
+		}
+
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
 			Map map = null;
 			Result res = op.getNext(map);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
@@ -1129,4 +1140,101 @@
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
 	}
+	
+	@Test
+	public void testNullToOther() throws PlanException, ExecException {
+		//Create data
+		DataBag bag = BagFactory.getInstance().newDefaultBag();
+		for(int i = 0; i < MAX; i++) {
+			Tuple t = TupleFactory.getInstance().newTuple();
+			t.append(r.nextInt());
+			bag.add(t);
+            if( r.nextInt(3) % 3 == 0 ){
+            	t = TupleFactory.getInstance().newTuple();
+            	t.append(null);
+	            bag.add(t);
+            }
+
+		}
+		
+		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		PhysicalPlan plan = new PhysicalPlan();
+		plan.add(prj);
+		plan.add(op);
+		plan.connect(prj, op);
+		
+		prj.setResultType(DataType.INTEGER);
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+            if(t.get(0) == null) {
+            	
+               	Integer result  = (Integer)op.getNext((Integer)null).result;
+				assertEquals( null, result);
+
+            } 
+            
+		}
+		
+		prj.setResultType(DataType.FLOAT);
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+            if(t.get(0) == null) {
+            	
+               	Float result  = (Float)op.getNext((Float)null).result;
+				assertEquals( null, result);
+
+            } 
+		}
+
+		prj.setResultType(DataType.DOUBLE);
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+            if(t.get(0) == null) {
+            	
+               	Double result  = (Double)op.getNext((Double)null).result;
+				assertEquals( null, result);
+
+            } 
+		}
+		
+		prj.setResultType(DataType.CHARARRAY);
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+            if(t.get(0) == null) {
+            	
+               	String result  = (String)op.getNext((String)null).result;
+				assertEquals( null, result);
+
+            } 
+		}
+		
+		prj.setResultType(DataType.BYTEARRAY);
+		
+		TupleFactory tf = TupleFactory.getInstance();
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(new DataByteArray((new Integer(r.nextInt())).toString().getBytes()));
+			plan.attachInput(t);
+            if(t.get(0) == null) {
+            	
+            	DataByteArray result  = (DataByteArray)op.getNext((String)null).result;
+				assertEquals( null, result);
+
+            } 
+
+		}
+
+
+	
+	}
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java?rev=683358&r1=683357&r2=683358&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java Wed Aug  6 11:27:41 2008
@@ -41,13 +41,21 @@
 import org.junit.Test;
 
 public class TestPODistinct extends TestCase {
-    DataBag input = BagFactory.getInstance().newDefaultBag();
+	DataBag input = BagFactory.getInstance().newDefaultBag();
     Random r = new Random();
     final int MAX_VALUE = 10;
     final int MAX_SAMPLES = 100;
 
     @Before
     public void setUp() {
+         // System.out.println();
+    }
+
+ 
+    @Test
+    public void testPODistictWithInt() throws ExecException {
+    	
+       	input = BagFactory.getInstance().newDefaultBag();
         TupleFactory tf = TupleFactory.getInstance();
         for (int i = 0; i < MAX_SAMPLES; i++) {
             Tuple t = tf.newTuple();
@@ -55,35 +63,114 @@
             input.add(t);
             // System.out.println(t);
         }
-        // System.out.println();
-    }
 
-    @Test
-    public void testPODistict() throws ExecException {
-        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
-        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
-        inputs.add(read);
-        PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()),
-                -1, inputs);
-        Map<Tuple, Integer> output = new HashMap<Tuple, Integer>();
-        Tuple t = null;
-        Result res = distinct.getNext(t);
-        t = (Tuple) res.result;
-        while (res.returnStatus != POStatus.STATUS_EOP) {
-            if (output.containsKey(t)) {
-                int i = output.get(t);
-                output.put(t, ++i);
-            } else {
-                output.put(t, 1);
-            }
-            res = distinct.getNext(t);
-            t = (Tuple) res.result;
+    	confirmDistinct();
+     }
+
+    public void testPODistictWithNullValues() throws ExecException {
+    	
+    	input = BagFactory.getInstance().newDefaultBag();
+        TupleFactory tf = TupleFactory.getInstance();
+        for (int i = 0; i < MAX_SAMPLES; i++) {
+            Tuple t = tf.newTuple();
+            t.append(null);
+            input.add(t);
+            // System.out.println(t);
+        }
+
+    	confirmDistinct();
+     }
+  
+    public void testPODistictWithIntAndNullValues() throws ExecException {
+    	
+      	input = BagFactory.getInstance().newDefaultBag();
+        TupleFactory tf = TupleFactory.getInstance();
+        for (int i = 0; i < MAX_SAMPLES; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextInt(MAX_VALUE));
+            input.add(t);
+            t = tf.newTuple();
+            t.append(null);
+            input.add(t);
+            // System.out.println(t);
+        }
+
+    	confirmDistinct();
+     }
+ 
+    public void testPODistictWithIntNullValues() throws ExecException {
+    	
+    	input = BagFactory.getInstance().newDefaultBag();
+        TupleFactory tf = TupleFactory.getInstance();
+        for (int i = 0; i < MAX_SAMPLES; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextInt(MAX_VALUE));
+            t.append(null);
+            input.add(t);
+            // System.out.println(t);
+        }
+
+    	confirmDistinct();
+     }
+    public void testPODistictWithNullIntValues() throws ExecException {
+    	
+    	input = BagFactory.getInstance().newDefaultBag();
+        TupleFactory tf = TupleFactory.getInstance();
+        for (int i = 0; i < MAX_SAMPLES; i++) {
+            Tuple t = tf.newTuple();
+            t.append(null);
+            t.append(r.nextInt(MAX_VALUE));
+            input.add(t);
+            // System.out.println(t);
         }
-        for (Map.Entry<Tuple, Integer> e : output.entrySet()) {
-            int i = e.getValue();
-            // System.out.println(e.getKey());
-            assertEquals(1, i);
+
+    	confirmDistinct();
+     }
+    
+    public void testPODistictArityWithNullValues() throws ExecException {
+    	
+    	input = BagFactory.getInstance().newDefaultBag();
+        TupleFactory tf = TupleFactory.getInstance();
+        for (int i = 0; i < MAX_SAMPLES; i++) {
+            Tuple t = tf.newTuple();
+            if ( r.nextInt(MAX_VALUE) % 3 == 0 ){
+            	t.append(null);
+            }
+            t.append(r.nextInt(MAX_VALUE));
+            t.append(r.nextInt(MAX_VALUE));
+            input.add(t);
+            // System.out.println(t);
         }
+
+    	confirmDistinct();
+     }
+
+    public void confirmDistinct() throws ExecException {
+	   	
+	    PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+	    List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+	    inputs.add(read);
+	    PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()),
+	            -1, inputs);
+	    Map<Tuple, Integer> output = new HashMap<Tuple, Integer>();
+	    Tuple t = null;
+	    Result res = distinct.getNext(t);
+	    t = (Tuple) res.result;
+	    while (res.returnStatus != POStatus.STATUS_EOP) {
+	        if (output.containsKey(t)) {
+	            int i = output.get(t);
+	            output.put(t, ++i);
+	        } else {
+	            output.put(t, 1);
+	        }
+	        res = distinct.getNext(t);
+	        t = (Tuple) res.result;
+	    }
+	    for (Map.Entry<Tuple, Integer> e : output.entrySet()) {
+	        int i = e.getValue();
+	        // System.out.println(e.getKey());
+	        assertEquals(1, i);
+	    }
     }
 
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java?rev=683358&r1=683357&r2=683358&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java Wed Aug  6 11:27:41 2008
@@ -68,6 +68,48 @@
         
     }
     
+    public void testPONegIntAndNull () throws PlanException, ExecException {
+    	
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextInt());
+            bag.add(t);
+            if( r.nextInt(3) % 3 == 0 ){
+            	t = tf.newTuple();
+	            t.append(null);
+	            bag.add(t);
+            }
+
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        prj.setResultType(DataType.INTEGER);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+        pn.setResultType(DataType.INTEGER);
+        
+        PhysicalPlan plan = new PhysicalPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            
+            if(t.get(0) == null) {
+                Integer output = (Integer)pn.getNext((Integer)null).result;
+                assertEquals(null, output);
+
+            } else  {
+                Integer expected = -(Integer)t.get(0);
+                int output = (Integer) pn.getNext(expected).result;
+                assertEquals(expected.intValue(), output);
+                
+            }
+            
+          }
+        
+    }
+    
     public void testPONegLong () throws PlanException, ExecException {
         for(int i = 0; i < MAX; i++) {
             Tuple t = tf.newTuple();
@@ -94,6 +136,48 @@
         
     }
     
+    public void testPONegLongAndNull () throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextLong());
+            bag.add(t);
+            if( r.nextInt(3) % 3 == 0 ){
+            	t = tf.newTuple();
+	            t.append(null);
+	            bag.add(t);
+            }
+
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        prj.setResultType(DataType.LONG);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+        pn.setResultType(DataType.LONG);
+        
+        PhysicalPlan plan = new PhysicalPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            
+            if(t.get(0) == null) {
+                Long output = (Long)pn.getNext((Long)null).result;
+                assertEquals(null, output);
+
+            } else  {
+	            Long expected = -(Long)t.get(0);
+	            long output = (Long) pn.getNext(expected).result;
+	            assertEquals(expected.longValue(), output);
+                
+            }
+
+            
+        }
+        
+    }
+    
     public void testPONegDouble() throws PlanException, ExecException {
         for(int i = 0; i < MAX; i++) {
             Tuple t = tf.newTuple();
@@ -113,13 +197,56 @@
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
-            Double expected = -(Double)t.get(0);
-            double output = (Double) pn.getNext(expected).result;
-            assertEquals(expected.doubleValue(), output);
+			Double expected = -(Double)t.get(0);
+			double output = (Double) pn.getNext(expected).result;
+			assertEquals(expected.doubleValue(), output);
+
         }
         
     }
+  
     
+    public void testPONegDoubleAndNull() throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextDouble());
+            bag.add(t);
+            if( r.nextInt(3) % 3 == 0 ){
+            	t = tf.newTuple();
+	            t.append(null);
+	            bag.add(t);
+            }
+
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        prj.setResultType(DataType.DOUBLE);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+        pn.setResultType(DataType.DOUBLE);
+        
+        PhysicalPlan plan = new PhysicalPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            
+            if(t.get(0) == null) {
+            	Double output = (Double )pn.getNext((Double )null).result;
+                assertEquals(null, output);
+
+            } else  {
+                Double expected = -(Double)t.get(0);
+                double output = (Double) pn.getNext(expected).result;
+                assertEquals(expected.doubleValue(), output);
+                
+            }
+
+        }
+        
+    }
+
     public void testPONegFloat() throws PlanException, ExecException {
         for(int i = 0; i < MAX; i++) {
             Tuple t = tf.newTuple();
@@ -145,4 +272,47 @@
         }
         
     }
+    
+ 
+    public void testPONegFloatAndNull() throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextFloat());
+            bag.add(t);
+            if( r.nextInt(3) % 3 == 0 ){
+            	t = tf.newTuple();
+	            t.append(null);
+	            bag.add(t);
+            }
+
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        prj.setResultType(DataType.FLOAT);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+        pn.setResultType(DataType.FLOAT);
+        
+        PhysicalPlan plan = new PhysicalPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            
+            if(t.get(0) == null) {
+            	Float output = (Float)pn.getNext((Float)null).result;
+                assertEquals(null, output);
+
+            } else  {
+
+                Float expected = -(Float)t.get(0);
+                float output = (Float) pn.getNext(expected).result;
+                assertEquals(expected.floatValue(), output);
+                
+            }
+        }
+        
+    }
+
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestRegexp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestRegexp.java?rev=683358&r1=683357&r2=683358&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestRegexp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestRegexp.java Wed Aug  6 11:27:41 2008
@@ -60,6 +60,19 @@
         Result res = op.getNext(new Boolean(true));
         assertEquals(POStatus.STATUS_OK, res.returnStatus);
         assertTrue((Boolean)res.result);
+        
+        // test with null in lhs
+        lt.setValue(null);
+        rt.setValue(".*s.y.*");
+        res = op.getNext(new Boolean(true));
+        assertEquals(null, (Boolean)res.result);
+        
+        // test with null in rhs
+        lt.setValue(new String(
+        "The quick sly fox jumped over the lazy brown dog"));
+        rt.setValue(null);
+        res = op.getNext(new Boolean(true));
+        assertEquals(null, (Boolean)res.result);
     }
 
     @Test

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestSubtract.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestSubtract.java?rev=683358&r1=683357&r2=683358&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestSubtract.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestSubtract.java Wed Aug  6 11:27:41 2008
@@ -56,7 +56,7 @@
         byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, DataType.CHARARRAY, 
                 DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.MAP, DataType.TUPLE};
         //Map<Byte,String> map = GenRandomData.genTypeToNameMap();
-        System.out.println("Testing ADD operator");
+        System.out.println("Testing Subtract operator");
         for(byte type : types) {
             lt.setResultType(type);
             rt.setResultType(type);
@@ -71,6 +71,17 @@
                 rt.setValue(inpdb2);
                 Result resdb = op.getNext(inpdb1);
                 assertEquals(resdb.returnStatus, POStatus.STATUS_ERR);
+                
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpdb2);
+                resdb = op.getNext(inpdb1);
+                assertEquals(resdb.returnStatus, POStatus.STATUS_ERR);
+                // test with null in rhs
+                lt.setValue(inpdb1);
+                rt.setValue(null);
+                resdb = op.getNext(inpdb1);
+                assertEquals(resdb.returnStatus, POStatus.STATUS_ERR);
                 break;
             case DataType.BOOLEAN:
                 Boolean inpb1 = r.nextBoolean();
@@ -79,6 +90,17 @@
                 rt.setValue(inpb2);
                 Result resb = op.getNext(inpb1);
                 assertEquals(resb.returnStatus, POStatus.STATUS_ERR);
+                
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpb2);
+                resb = op.getNext(inpb1);
+                assertEquals(resb.returnStatus, POStatus.STATUS_ERR);
+                // test with null in rhs
+                lt.setValue(inpb1);
+                rt.setValue(null);
+                resb = op.getNext(inpb1);
+                assertEquals(resb.returnStatus, POStatus.STATUS_ERR);
                 break;
             case DataType.BYTEARRAY: {
                 DataByteArray inpba1 = GenRandomData.genRandDBA(r);
@@ -89,6 +111,17 @@
                 //DataByteArray expected = new DataByteArray(inpba1.toString() + inpba2.toString());
                 //assertEquals(expected, (DataByteArray)resba.result);
                 assertEquals(POStatus.STATUS_ERR, resba.returnStatus);
+                
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpba2);
+                resba = op.getNext(inpba1);
+                assertEquals(resba.returnStatus, POStatus.STATUS_ERR);
+                // test with null in rhs
+                lt.setValue(inpba1);
+                rt.setValue(null);
+                resba = op.getNext(inpba1);
+                assertEquals(resba.returnStatus, POStatus.STATUS_ERR);
                 break;
             }
             case DataType.CHARARRAY: {
@@ -100,6 +133,17 @@
                 /*String expected = new String(inps1 + inps2);
                 assertEquals(expected, (String)ress.result);*/
                 assertEquals(POStatus.STATUS_ERR, ress.returnStatus);
+                
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inps2);
+                ress = op.getNext(inps1);
+                assertEquals(ress.returnStatus, POStatus.STATUS_ERR);
+                // test with null in rhs
+                lt.setValue(inps1);
+                rt.setValue(null);
+                ress = op.getNext(inps1);
+                assertEquals(ress.returnStatus, POStatus.STATUS_ERR);
                 break;
             }
             case DataType.DOUBLE: {
@@ -110,6 +154,17 @@
                 Result resd = op.getNext(inpd1);
                 Double expected = new Double(inpd1 - inpd2);
                 assertEquals(expected, (Double)resd.result);
+                
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpd2);
+                resd = op.getNext(inpd1);
+                assertEquals(null, (Double)resd.result);
+                // test with null in rhs
+                lt.setValue(inpd1);
+                rt.setValue(null);
+                resd = op.getNext(inpd1);
+                assertEquals(null, (Double)resd.result);
                 break;
             }
             case DataType.FLOAT: {
@@ -120,6 +175,17 @@
                 Result resf = op.getNext(inpf1);
                 Float expected = new Float(inpf1 - inpf2);
                 assertEquals(expected, (Float)resf.result);
+
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpf2);
+                resf = op.getNext(inpf1);
+                assertEquals(null, (Float)resf.result);
+                // test with null in rhs
+                lt.setValue(inpf1);
+                rt.setValue(null);
+                resf = op.getNext(inpf1);
+                assertEquals(null, (Float)resf.result);
                 break;
             }
             case DataType.INTEGER: {
@@ -130,6 +196,17 @@
                 Result resi = op.getNext(inpi1);
                 Integer expected = new Integer(inpi1 - inpi2);
                 assertEquals(expected, (Integer) resi.result);
+
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpi2);
+                resi = op.getNext(inpi1);
+                assertEquals(null, (Integer)resi.result);
+                // test with null in rhs
+                lt.setValue(inpi1);
+                rt.setValue(null);
+                resi = op.getNext(inpi1);
+                assertEquals(null, (Integer)resi.result);
                 break;
             }
             case DataType.LONG: {
@@ -140,6 +217,17 @@
                 Result resl = op.getNext(inpl1);
                 Long expected = new Long(inpl1 - inpl2);
                 assertEquals(expected, (Long)resl.result);
+
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpl2);
+                resl = op.getNext(inpl1);
+                assertEquals(null, (Long)resl.result);
+                // test with null in rhs
+                lt.setValue(inpl1);
+                rt.setValue(null);
+                resl = op.getNext(inpl1);
+                assertEquals(null, (Long)resl.result);
                 break;
             }
             case DataType.MAP: {
@@ -149,6 +237,17 @@
                 rt.setValue(inpm2);
                 Result resm = op.getNext(inpm1);
                 assertEquals(POStatus.STATUS_ERR, resm.returnStatus);
+
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpm2);
+                resm = op.getNext(inpm1);
+                assertEquals(POStatus.STATUS_ERR, resm.returnStatus);
+                // test with null in rhs
+                lt.setValue(inpm1);
+                rt.setValue(null);
+                resm = op.getNext(inpm1);
+                assertEquals(POStatus.STATUS_ERR, resm.returnStatus);
                 break;
             }
             case DataType.TUPLE: {
@@ -158,6 +257,17 @@
                 rt.setValue(inpt2);
                 Result rest = op.getNext(inpt1);
                 assertEquals(POStatus.STATUS_ERR, rest.returnStatus);
+
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpt2);
+                rest = op.getNext(inpt1);
+                assertEquals(POStatus.STATUS_ERR, rest.returnStatus);
+                // test with null in rhs
+                lt.setValue(inpt1);
+                rt.setValue(null);
+                rest = op.getNext(inpt1);
+                assertEquals(POStatus.STATUS_ERR, rest.returnStatus);
                 break;
             }
             }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java?rev=683358&r1=683357&r2=683358&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenRandomData.java Wed Aug  6 11:27:41 2008
@@ -87,13 +87,46 @@
         return t;
     }
     
-    public static Tuple genRandSmallTuple(String s, int value){
+    public static Tuple genRandSmallTuple(String s, Integer value){
         Tuple t = new DefaultTuple();
         t.append(s);
         t.append(value);
         return t;
     }
     
+    public static DataBag genRandSmallTupDataBagWithNulls(Random r, int num, int limit){
+        if(r==null) {
+            DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+            Tuple t = new DefaultTuple();
+            t.append("RANDOM");
+            db.add(t);
+            return db;
+        }
+        DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+        for(int i=0;i<num;i++){
+            // the first tuple is used as a sample tuple 
+            // in some tests to deduce return type - so
+            // don't introduce nulls into first tuple
+            if(i == 0) {
+                db.add(genRandSmallTuple(r, limit));
+                continue;
+            } else {
+                int rand = r.nextInt(num);
+                if(rand <= (0.2 * num) ) {
+                    db.add(genRandSmallTuple((String)null, rand));
+                } else if (rand > (0.2 * num) && rand <= (0.4 * num)) {
+                    db.add(genRandSmallTuple(genRandString(r), null));
+                } else if (rand > (0.4 * num) && rand <= (0.6 * num)) {
+                    db.add(genRandSmallTuple(null, null));
+                } else {
+                    db.add(genRandSmallTuple(r, limit));
+                }
+            }
+        }
+        
+        return db;
+    }
+    
     public static DataBag genRandSmallTupDataBag(Random r, int num, int limit){
         if(r==null) {
             DataBag db = DefaultBagFactory.getInstance().newDefaultBag();