You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/08/30 07:10:35 UTC

svn commit: r1378801 [2/2] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/builtin/ src/or...

Modified: pig/trunk/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOCast.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOCast.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOCast.java Thu Aug 30 05:10:34 2012
@@ -17,16 +17,14 @@
  */
 package org.apache.pig.test;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
 
-import junit.framework.TestCase;
-
-import org.joda.time.DateTime;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -59,16 +57,17 @@ import org.apache.pig.impl.plan.PlanExce
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.parser.ParserException;
 import org.apache.pig.test.utils.GenRandomData;
+import org.joda.time.DateTime;
 import org.junit.Test;
 
-public class TestPOCast extends TestCase {
+public class TestPOCast {
 
 	Random r = new Random();
 	final int MAX = 10;
 	Tuple dummyTuple = null;
 	Map<Object,Object> dummyMap = null;
 	DataBag dummyBag = null;
-	
+
 	@Test
 	public void testBooleanToOther() throws IOException {
 	    //Create data
@@ -78,7 +77,7 @@ public class TestPOCast extends TestCase
             t.append(r.nextBoolean());
             bag.add(t);
         }
-        
+
         POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
         LoadFunc load = new TestLoader();
         op.setFuncSpec(new FuncSpec(load.getClass().getName()));
@@ -87,14 +86,14 @@ public class TestPOCast extends TestCase
         plan.add(prj);
         plan.add(op);
         plan.connect(prj, op);
-        
+
         prj.setResultType(DataType.BOOLEAN);
-        
+
         // Plan to test when result type is ByteArray and casting is requested
         // for example casting of values coming out of map lookup.
         POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", r.nextLong()), -1);
         PhysicalPlan planToTestBACasts = constructPlan(opWithInputTypeAsBA);
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -109,7 +108,7 @@ public class TestPOCast extends TestCase
                 assertEquals(b, res.result);
             }
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -124,7 +123,7 @@ public class TestPOCast extends TestCase
                 assertEquals(i, res.result);
             }
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -139,7 +138,7 @@ public class TestPOCast extends TestCase
                 assertEquals(l, res.result);
             }
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -154,7 +153,7 @@ public class TestPOCast extends TestCase
                 assertEquals(f, res.result);
             }
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -192,7 +191,7 @@ public class TestPOCast extends TestCase
                 assertEquals(str, res.result);
             }
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -207,7 +206,7 @@ public class TestPOCast extends TestCase
                 assertEquals(dba, res.result);
             }
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -215,14 +214,14 @@ public class TestPOCast extends TestCase
             Result res = op.getNext(map);
             assertEquals(POStatus.STATUS_ERR, res.returnStatus);
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
             Result res = op.getNext(t);
             assertEquals(POStatus.STATUS_ERR, res.returnStatus);
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -255,7 +254,7 @@ public class TestPOCast extends TestCase
             }
         }
 	}
-	
+
 	@Test
 	public void testIntegerToOther() throws IOException {
 		//Create data
@@ -265,7 +264,7 @@ public class TestPOCast extends TestCase
 			t.append(i == 0 ? 0 : r.nextInt());
 			bag.add(t);
 		}
-		
+
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		LoadFunc load = new TestLoader();
 		op.setFuncSpec(new FuncSpec(load.getClass().getName()));
@@ -274,7 +273,7 @@ public class TestPOCast extends TestCase
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
-		
+
 		prj.setResultType(DataType.INTEGER);
 		// Plan to test when result type is ByteArray and casting is requested
 		// for example casting of values coming out of map lookup.
@@ -310,7 +309,7 @@ public class TestPOCast extends TestCase
 				assertEquals(i, res.result);
 			}
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -325,7 +324,7 @@ public class TestPOCast extends TestCase
 				assertEquals(f, res.result);
 			}
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -340,7 +339,7 @@ public class TestPOCast extends TestCase
 				assertEquals(l, res.result);
 			}
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -370,7 +369,7 @@ public class TestPOCast extends TestCase
                 assertEquals(dt, res.result);
             }
         }
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -385,7 +384,7 @@ public class TestPOCast extends TestCase
 				assertEquals(str, res.result);
 			}
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -400,7 +399,7 @@ public class TestPOCast extends TestCase
 				assertEquals(dba, res.result);
 			}
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -408,14 +407,14 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(map);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
 			Result res = op.getNext(t);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -448,7 +447,7 @@ public class TestPOCast extends TestCase
             }
         }
 	}
-	
+
 	@Test
 	public void testLongToOther() throws IOException {
 		//Create data
@@ -458,7 +457,7 @@ public class TestPOCast extends TestCase
 			t.append(i == 0 ? 0L : r.nextLong());
 			bag.add(t);
 		}
-		
+
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		LoadFunc load = new TestLoader();
 		op.setFuncSpec(new FuncSpec(load.getClass().getName()));
@@ -467,9 +466,9 @@ public class TestPOCast extends TestCase
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
-		
+
 		prj.setResultType(DataType.LONG);
-		
+
 		// Plan to test when result type is ByteArray and casting is requested
 		// for example casting of values coming out of map lookup.
 		POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -501,14 +500,14 @@ public class TestPOCast extends TestCase
 				//System.out.println(res.result + " : " + i);
 				assertEquals(i, res.result);
 			}
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(i);
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(i, res.result);
-			
+
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -518,13 +517,13 @@ public class TestPOCast extends TestCase
 //			   System.out.println(res.result + " : " + f);
 				assertEquals(f, res.result);
 			}
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(f);
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(f, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -534,13 +533,13 @@ public class TestPOCast extends TestCase
 				//System.out.println(res.result + " : " + l);
 				assertEquals(l, res.result);
 			}
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(l);
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(l, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -550,7 +549,7 @@ public class TestPOCast extends TestCase
 				//System.out.println(res.result + " : " + d);
 				assertEquals(d, res.result);
 			}
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(d);
 			if(res.returnStatus == POStatus.STATUS_OK)
@@ -560,13 +559,13 @@ public class TestPOCast extends TestCase
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
-            DateTime dt = new DateTime((Long)t.get(0));
+            DateTime dt = new DateTime(t.get(0));
             Result res = op.getNext(dt);
             if(res.returnStatus == POStatus.STATUS_OK) {
                 //System.out.println(res.result + " : " + l);
                 assertEquals(dt, res.result);
             }
-            
+
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(dt);
             if(res.returnStatus == POStatus.STATUS_OK)
@@ -582,13 +581,13 @@ public class TestPOCast extends TestCase
 				//System.out.println(res.result + " : " + str);
 				assertEquals(str, res.result);
 			}
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(str);
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(str, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -598,13 +597,13 @@ public class TestPOCast extends TestCase
 				//System.out.println(res.result + " : " + dba);
 				assertEquals(dba, res.result);
 			}
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(dba);
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(dba, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -612,14 +611,14 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(map);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
 			Result res = op.getNext(t);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -652,7 +651,7 @@ public class TestPOCast extends TestCase
             }
         }
 	}
-	
+
 	@Test
 	public void testFloatToOther() throws IOException {
 		//Create data
@@ -662,7 +661,7 @@ public class TestPOCast extends TestCase
 			t.append(i == 0 ? 0.0F : r.nextFloat());
 			bag.add(t);
 		}
-		
+
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		LoadFunc load = new TestLoader();
 		op.setFuncSpec(new FuncSpec(load.getClass().getName()));
@@ -671,14 +670,14 @@ public class TestPOCast extends TestCase
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
-		
+
 		prj.setResultType(DataType.FLOAT);
-		
+
 		// Plan to test when result type is ByteArray and casting is requested
 		// for example casting of values coming out of map lookup.
 		POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", r.nextLong()), -1);
 		PhysicalPlan planToTestBACasts = constructPlan(opWithInputTypeAsBA);
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -693,7 +692,7 @@ public class TestPOCast extends TestCase
             if(res.returnStatus == POStatus.STATUS_OK)
                 assertEquals(b, res.result);
         }
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -708,7 +707,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(i, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -723,7 +722,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(f, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -738,7 +737,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(l, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -763,12 +762,12 @@ public class TestPOCast extends TestCase
                 //System.out.println(res.result + " : " + dt);
                 assertEquals(dt, res.result);
             }
-            
+
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(dt);
             if(res.returnStatus == POStatus.STATUS_OK)
                 assertEquals(dt, res.result);
-        }   
+        }
 
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
@@ -784,7 +783,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(str, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -799,12 +798,12 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(dba, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
 			if(t.get(0) == null) {
-			
+
 					  Float result = (Float) op.getNext((Float) null).result;
 				assertEquals( null, result);
 
@@ -818,14 +817,14 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(map);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
 			Result res = op.getNext(t);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -858,7 +857,7 @@ public class TestPOCast extends TestCase
             }
         }
 	}
-	
+
 	@Test
 	public void testDoubleToOther() throws IOException {
 		//Create data
@@ -868,7 +867,7 @@ public class TestPOCast extends TestCase
 			t.append(i == 0 ? 0.0D : r.nextDouble());
 			bag.add(t);
 		}
-		
+
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		LoadFunc load = new TestLoader();
 		op.setFuncSpec(new FuncSpec(load.getClass().getName()));
@@ -877,9 +876,9 @@ public class TestPOCast extends TestCase
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
-		
+
 		prj.setResultType(DataType.DOUBLE);
-		
+
 		// Plan to test when result type is ByteArray and casting is requested
 		// for example casting of values coming out of map lookup.
 		POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -914,7 +913,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(i, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -929,7 +928,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(f, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -944,7 +943,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(l, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -969,12 +968,12 @@ public class TestPOCast extends TestCase
                 //System.out.println(res.result + " : " + dt);
                 assertEquals(dt, res.result);
             }
-            
+
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(dt);
             if(res.returnStatus == POStatus.STATUS_OK)
                 assertEquals(dt, res.result);
-        }		
+        }
 
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
@@ -1005,7 +1004,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(dba, res.result);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -1013,14 +1012,14 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(map);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
 			Result res = op.getNext(t);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
@@ -1063,7 +1062,7 @@ public class TestPOCast extends TestCase
             t.append(i == 0 ? new DateTime(0L) : new DateTime(r.nextLong()));
             bag.add(t);
         }
-        
+
         POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
         LoadFunc load = new TestLoader();
         op.setFuncSpec(new FuncSpec(load.getClass().getName()));
@@ -1072,9 +1071,9 @@ public class TestPOCast extends TestCase
         plan.add(prj);
         plan.add(op);
         plan.connect(prj, op);
-        
+
         prj.setResultType(DataType.DATETIME);
-        
+
         // Plan to test when result type is ByteArray and casting is requested
         // for example casting of values coming out of map lookup.
         POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -1097,14 +1096,14 @@ public class TestPOCast extends TestCase
                 //System.out.println(res.result + " : " + i);
                 assertEquals(i, res.result);
             }
-            
+
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(i);
             if(res.returnStatus == POStatus.STATUS_OK)
                 assertEquals(i, res.result);
-            
+
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -1114,13 +1113,13 @@ public class TestPOCast extends TestCase
                 //System.out.println(res.result + " : " + f);
                 assertEquals(f, res.result);
             }
-            
+
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(f);
             if(res.returnStatus == POStatus.STATUS_OK)
                 assertEquals(f, res.result);
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -1130,13 +1129,13 @@ public class TestPOCast extends TestCase
                 //System.out.println(res.result + " : " + l);
                 assertEquals(l, res.result);
             }
-            
+
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(l);
             if(res.returnStatus == POStatus.STATUS_OK)
                 assertEquals(l, res.result);
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -1146,7 +1145,7 @@ public class TestPOCast extends TestCase
                 //System.out.println(res.result + " : " + f);
                 assertEquals(d, res.result);
             }
-            
+
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(d);
             if(res.returnStatus == POStatus.STATUS_OK)
@@ -1162,7 +1161,7 @@ public class TestPOCast extends TestCase
                 //System.out.println(res.result + " : " + l);
                 assertEquals(dt, res.result);
             }
-            
+
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(dt);
             if(res.returnStatus == POStatus.STATUS_OK)
@@ -1178,13 +1177,13 @@ public class TestPOCast extends TestCase
                 //System.out.println(res.result + " : " + str);
                 assertEquals(str, res.result);
             }
-            
+
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(str);
             if(res.returnStatus == POStatus.STATUS_OK)
                 assertEquals(str, res.result);
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -1194,13 +1193,13 @@ public class TestPOCast extends TestCase
                 //System.out.println(res.result + " : " + dba);
                 assertEquals(dba, res.result);
             }
-            
+
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(dba);
             if(res.returnStatus == POStatus.STATUS_OK)
                 assertEquals(dba, res.result);
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -1208,14 +1207,14 @@ public class TestPOCast extends TestCase
             Result res = op.getNext(map);
             assertEquals(POStatus.STATUS_ERR, res.returnStatus);
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
             Result res = op.getNext(t);
             assertEquals(POStatus.STATUS_ERR, res.returnStatus);
         }
-        
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
@@ -1259,16 +1258,16 @@ public class TestPOCast extends TestCase
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
-		
+
 		prj.setResultType(DataType.CHARARRAY);
 
 		// Plan to test when result type is ByteArray and casting is requested
 		// for example casting of values coming out of map lookup.
 		POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", r.nextLong()), -1);
 		PhysicalPlan planToTestBACasts = constructPlan(opWithInputTypeAsBA);
-		
+
 		TupleFactory tf = TupleFactory.getInstance();
-		
+
         {
             Tuple t = tf.newTuple();
             t.append((new Boolean(r.nextBoolean())).toString());
@@ -1303,7 +1302,7 @@ public class TestPOCast extends TestCase
             if (res.returnStatus == POStatus.STATUS_OK)
                 assertEquals(null, res.result);
         }
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append((new Integer(r.nextInt())).toString());
@@ -1319,7 +1318,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(i, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append((new Float(r.nextFloat())).toString());
@@ -1335,7 +1334,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(i, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append((new Long(r.nextLong())).toString());
@@ -1351,7 +1350,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(i, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append((new Double(r.nextDouble())).toString());
@@ -1372,16 +1371,16 @@ public class TestPOCast extends TestCase
             Tuple t = tf.newTuple();
             t.append((new DateTime(r.nextLong())).toString());
             plan.attachInput(t);
-            DateTime i = new DateTime(((String) t.get(0)));
+            DateTime i = new DateTime(t.get(0));
             Result res = op.getNext(i);
             if(res.returnStatus == POStatus.STATUS_OK) {
                 //System.out.println(res.result + " : " + i);
-                assertEquals(i, res.result);
+                assertEquals(i.getMillis(), ((DateTime) res.result).getMillis());
             }
             planToTestBACasts.attachInput(t);
             res = opWithInputTypeAsBA.getNext(i);
             if(res.returnStatus == POStatus.STATUS_OK)
-                assertEquals(i, res.result);
+                assertEquals(i.getMillis(), ((DateTime) res.result).getMillis());
         }
 
 		{
@@ -1399,11 +1398,11 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(str, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
-		
+
 			plan.attachInput(t);
 			DataByteArray dba = new DataByteArray(((String)t.get(0)).getBytes());
 			Result res = op.getNext(dba);
@@ -1411,13 +1410,13 @@ public class TestPOCast extends TestCase
 				//System.out.println(res.result + " : " + dba);
 				assertEquals(dba, res.result);
 			}
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(dba);
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(dba, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -1426,7 +1425,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(map);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -1467,22 +1466,24 @@ public class TestPOCast extends TestCase
             }
         }
 	}
-	
+
 	public static class TestLoader extends LoadFunc implements LoadCaster{
-	    
+
         public void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException {
-            
+
         }
-        
+
         @Override
         public Tuple getNext() throws IOException {
             return null;
         }
-        
+
+        @Override
         public DataBag bytesToBag(byte[] b, ResourceFieldSchema s) throws IOException {
             return null;
         }
 
+        @Override
         public Boolean bytesToBoolean(byte[] b) throws IOException {
             DataByteArray dba = new DataByteArray(b);
             String str = dba.toString();
@@ -1499,40 +1500,49 @@ public class TestPOCast extends TestCase
                 return null;
             }
         }
-        
+
+        @Override
         public String bytesToCharArray(byte[] b) throws IOException {
             DataByteArray dba = new DataByteArray(b);
             return dba.toString();
         }
-        
+
+        @Override
         public Double bytesToDouble(byte[] b) throws IOException {
             return new Double(Double.valueOf(new DataByteArray(b).toString()));
         }
-        
+
+        @Override
         public Float bytesToFloat(byte[] b) throws IOException {
             return new Float(Float.valueOf(new DataByteArray(b).toString()));
         }
-        
+
+        @Override
         public Integer bytesToInteger(byte[] b) throws IOException {
             return new Integer(Integer.valueOf(new DataByteArray(b).toString()));
         }
 
+        @Override
         public Long bytesToLong(byte[] b) throws IOException {
             return new Long(Long.valueOf(new DataByteArray(b).toString()));
         }
 
+        @Override
         public DateTime bytesToDateTime(byte[] b) throws IOException {
             return new DateTime(new DataByteArray(b).toString());
         }
 
+        @Override
         public Map<String, Object> bytesToMap(byte[] b) throws IOException {
           return null;
         }
-        
+
+        @Override
         public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema s) throws IOException {
             return null;
         }
 
+        @Override
         public Tuple bytesToTuple(byte[] b, ResourceFieldSchema s) throws IOException {
             return null;
         }
@@ -1540,27 +1550,27 @@ public class TestPOCast extends TestCase
         public byte[] toBytes(DataBag bag) throws IOException {
             return null;
         }
-	
+
         public byte[] toBytes(String s) throws IOException {
             return s.getBytes();
         }
-        
+
         public byte[] toBytes(Double d) throws IOException {
             return d.toString().getBytes();
         }
-        
+
         public byte[] toBytes(Float f) throws IOException {
             return f.toString().getBytes();
         }
-        
+
         public byte[] toBytes(Integer i) throws IOException {
             return i.toString().getBytes();
         }
-        
+
         public byte[] toBytes(Long l) throws IOException {
             return l.toString().getBytes();
         }
-        
+
         public byte[] toBytes(DateTime dt) throws IOException {
             return dt.toString().getBytes();
         }
@@ -1568,11 +1578,11 @@ public class TestPOCast extends TestCase
         public byte[] toBytes(Boolean b) throws IOException {
             return b.toString().getBytes();
         }
-        
+
 	    public byte[] toBytes(Map<String, Object> m) throws IOException {
 	        return null;
 	    }
-	
+
         public byte[] toBytes(Tuple t) throws IOException {
             return null;
         }
@@ -1601,11 +1611,11 @@ public class TestPOCast extends TestCase
 
         @Override
         public void setLocation(String location, Job job) throws IOException {
- 
+
         }
-        
+
 	}
-	
+
 	@Test
 	public void testByteArrayToOther() throws IOException {
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -1616,11 +1626,11 @@ public class TestPOCast extends TestCase
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
-		
+
 		prj.setResultType(DataType.BYTEARRAY);
-		
+
 		TupleFactory tf = TupleFactory.getInstance();
-		
+
 		// Plan to test when result type is ByteArray and casting is requested
 		// for example casting of values coming out of map lookup.
 		POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -1660,7 +1670,7 @@ public class TestPOCast extends TestCase
 				//System.out.println(res.result + " : " + i);
 				assertEquals(i, res.result);
 			}
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(i);
 			if(res.returnStatus == POStatus.STATUS_OK)
@@ -1683,7 +1693,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(i, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(new DataByteArray((new Long(r.nextLong())).toString().getBytes()));
@@ -1699,7 +1709,7 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(i, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(new DataByteArray((new Double(r.nextDouble())).toString().getBytes()));
@@ -1747,22 +1757,22 @@ public class TestPOCast extends TestCase
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(str, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(new DataByteArray(GenRandomData.genRandString(r).getBytes()));
-			
+
 			plan.attachInput(t);
 			DataByteArray dba = (DataByteArray) t.get(0);
 			Result res = op.getNext(dba);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(dba);
 			if(res.returnStatus == POStatus.STATUS_OK)
 				assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(new DataByteArray(GenRandomData.genRandString(r).getBytes()));
@@ -1772,13 +1782,13 @@ public class TestPOCast extends TestCase
 			//assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 			assertEquals(POStatus.STATUS_OK, res.returnStatus);
 			assertEquals(null, res.result);
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(map);
 			assertEquals(POStatus.STATUS_OK, res.returnStatus);
 			assertEquals(null, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(new DataByteArray(GenRandomData.genRandString(r).getBytes()));
@@ -1787,13 +1797,13 @@ public class TestPOCast extends TestCase
 			//assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 			assertEquals(POStatus.STATUS_OK, res.returnStatus);
 			assertEquals(null, res.result);
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(t);
 			assertEquals(POStatus.STATUS_OK, res.returnStatus);
 			assertEquals(null, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(new DataByteArray(GenRandomData.genRandString(r).getBytes()));
@@ -1803,14 +1813,14 @@ public class TestPOCast extends TestCase
 			//assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 			assertEquals(POStatus.STATUS_OK, res.returnStatus);
 			assertEquals(null, res.result);
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(b);
 			assertEquals(POStatus.STATUS_OK, res.returnStatus);
 			assertEquals(null, res.result);
 		}
 	}
-	
+
 	private PhysicalPlan constructPlan(POCast op) throws IOException {
         LoadFunc load = new TestLoader();
         op.setFuncSpec(new FuncSpec(load.getClass().getName()));
@@ -1822,17 +1832,17 @@ public class TestPOCast extends TestCase
         prj.setResultType(DataType.BYTEARRAY);
         return plan;
 	}
-	
-	/* 
-     * Test that if the input type is actually same 
+
+	/*
+     * Test that if the input type is actually same
      * as output type and we think that the input type is a
      * bytearray we still can handle it. This can happen in the
      * following situation:
-     * If a map in pig (say returned from a UDF) has a key with 
+     * If a map in pig (say returned from a UDF) has a key with
      * the value being a string, then a lookup of that key being used
      * in a context which expects a string will cause an implicit cast
-     * to a string. This is because the Pig frontend (logical layer) 
-     * thinks of all map "values" as bytearrays and hence introduces 
+     * to a string. This is because the Pig frontend (logical layer)
+     * thinks of all map "values" as bytearrays and hence introduces
      * a Cast to convert the bytearray to string. Though in reality
      * the input to the cast is already a string
      */
@@ -1841,9 +1851,9 @@ public class TestPOCast extends TestCase
         POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
         PhysicalPlan plan = constructPlan(op);
         TupleFactory tf = TupleFactory.getInstance();
-        
+
         {
-            // create a new POCast each time since we 
+            // create a new POCast each time since we
             // maintain a state variable per POCast object
             // indicating if cast is really required
             POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -1857,7 +1867,7 @@ public class TestPOCast extends TestCase
                 //System.out.println(res.result + " : " + i);
                 assertEquals(input, res.result);
             }
-            
+
             t = tf.newTuple();
             t.append("neither true nor false");
             plan.attachInput(t);
@@ -1867,10 +1877,10 @@ public class TestPOCast extends TestCase
                 assertEquals(null, res.result);
             }
         }
-        
+
         {
             Tuple t = tf.newTuple();
-            Integer input = new Integer(r.nextInt()); 
+            Integer input = new Integer(r.nextInt());
             t.append(input);
             plan.attachInput(t);
             Result res = op.getNext(new Integer(0));
@@ -1879,9 +1889,9 @@ public class TestPOCast extends TestCase
                 assertEquals(input, res.result);
             }
         }
-        
+
         {
-            // create a new POCast each time since we 
+            // create a new POCast each time since we
             // maintain a state variable per POCast object
             // indicating if cast is really required
             POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -1896,9 +1906,9 @@ public class TestPOCast extends TestCase
                 assertEquals(input, res.result);
             }
         }
-        
+
         {
-            // create a new POCast each time since we 
+            // create a new POCast each time since we
             // maintain a state variable per POCast object
             // indicating if cast is really required
             POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -1913,9 +1923,9 @@ public class TestPOCast extends TestCase
                 assertEquals(input, res.result);
             }
         }
-        
+
         {
-            // create a new POCast each time since we 
+            // create a new POCast each time since we
             // maintain a state variable per POCast object
             // indicating if cast is really required
             POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -1932,7 +1942,7 @@ public class TestPOCast extends TestCase
         }
 
         {
-            // create a new POCast each time since we 
+            // create a new POCast each time since we
             // maintain a state variable per POCast object
             // indicating if cast is really required
             POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -1949,7 +1959,7 @@ public class TestPOCast extends TestCase
         }
 
         {
-            // create a new POCast each time since we 
+            // create a new POCast each time since we
             // maintain a state variable per POCast object
             // indicating if cast is really required
             POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -1964,9 +1974,9 @@ public class TestPOCast extends TestCase
                 assertEquals(input, res.result);
             }
         }
-        
+
         {
-            // create a new POCast each time since we 
+            // create a new POCast each time since we
             // maintain a state variable per POCast object
             // indicating if cast is really required
             POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -1981,9 +1991,9 @@ public class TestPOCast extends TestCase
                 assertEquals(input, res.result);
             }
         }
-        
+
         {
-            // create a new POCast each time since we 
+            // create a new POCast each time since we
             // maintain a state variable per POCast object
             // indicating if cast is really required
             POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -2000,9 +2010,9 @@ public class TestPOCast extends TestCase
                 assertEquals(input, res.result);
             }
         }
-        
+
 	}
-	
+
 	@Test
 	public void testTupleToOther() throws IOException, ParserException {
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -2012,16 +2022,16 @@ public class TestPOCast extends TestCase
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
-		
+
 		prj.setResultType(DataType.TUPLE);
-		
+
 		TupleFactory tf = TupleFactory.getInstance();
-		
+
 		//Plan to test when result type is ByteArray and casting is requested
 		//for example casting of values coming out of map lookup.
 		POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", r.nextLong()), -1);
 		PhysicalPlan planToTestBACasts = constructPlan(opWithInputTypeAsBA);
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -2032,7 +2042,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(map);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -2042,12 +2052,12 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(t);
 			//System.out.println(res.result + " : " + t);
 			assertEquals(t, res.result);
-			
+
 			planToTestBACasts.attachInput(tNew);
 			res = opWithInputTypeAsBA.getNext(t);
 			assertEquals(t, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -2058,7 +2068,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(b);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -2069,7 +2079,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -2080,7 +2090,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -2091,7 +2101,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -2113,7 +2123,7 @@ public class TestPOCast extends TestCase
             Result res = op.getNext(dt);
             assertEquals(POStatus.STATUS_ERR, res.returnStatus);
         }
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -2124,7 +2134,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandString(r));
@@ -2134,13 +2144,13 @@ public class TestPOCast extends TestCase
 			DataByteArray i = null;
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
-			
+
 			op.setFuncSpec(new FuncSpec(BinStorage.class.getName()));
 			plan.attachInput(tNew);
 			res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
         {
             Tuple t = tf.newTuple();
             Tuple wrappedTuple = tf.newTuple();
@@ -2152,10 +2162,10 @@ public class TestPOCast extends TestCase
             plan.attachInput(t);
             Tuple tup = null;
             Result res = op.getNext(tup);
-            
+
             assertTrue(res.result==null);
         }
-        
+
         {
             //positive test case
             Tuple t = tf.newTuple();
@@ -2185,13 +2195,13 @@ public class TestPOCast extends TestCase
             Result res = op.getNext(tup);
             verifyResult(res, POStatus.STATUS_OK, wrappedTuple);
         }
-	
-	
+
+
 	}
-	
+
 	private void verifyResult(Result res, byte status, Object result) {
         assertEquals("result status", status, res.returnStatus);
-        assertEquals("result value", result, res.result);        
+        assertEquals("result value", result, res.result);
     }
 
     @Test
@@ -2203,11 +2213,11 @@ public class TestPOCast extends TestCase
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
-		
+
 		prj.setResultType(DataType.BAG);
-		
+
 		TupleFactory tf = TupleFactory.getInstance();
-		
+
 		//Plan to test when result type is ByteArray and casting is requested
 		//for example casting of values coming out of map lookup.
 		POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -2220,7 +2230,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(map);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
@@ -2228,7 +2238,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(t);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
@@ -2237,12 +2247,12 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(b);
 			//System.out.println(res.result + " : " + t);
 			assertEquals(b, res.result);
-			
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(b);
 			assertEquals(b, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
@@ -2251,7 +2261,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
@@ -2260,16 +2270,16 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
-			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));       
+			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
 			plan.attachInput(t);
 			Float i = null;
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
@@ -2296,7 +2306,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
@@ -2304,13 +2314,13 @@ public class TestPOCast extends TestCase
 			DataByteArray i = null;
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
-			
+
 			op.setFuncSpec(new FuncSpec(BinStorage.class.getName()));
 			plan.attachInput(t);
 			res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
         {
             Tuple t = tf.newTuple();
             t.append(GenRandomData.genRandSmallTupDataBagWithNulls(r, 20, 100));
@@ -2321,7 +2331,7 @@ public class TestPOCast extends TestCase
             Result res = op.getNext(db);
             Iterator<Tuple> expectedBagIterator = ((DataBag)(t.get(0))).iterator();
             Iterator<Tuple> convertedBagIterator = ((DataBag)(res.result)).iterator();
-            
+
             while(expectedBagIterator.hasNext()) {
                 Tuple expectedBagTuple = expectedBagIterator.next();
                 Tuple convertedBagTuple = convertedBagIterator.next();
@@ -2333,11 +2343,11 @@ public class TestPOCast extends TestCase
                     assertTrue(convertedBagTuple.get(1) instanceof Float);
                     assertTrue(((Float)(expectedBagTuple.get(1))).floatValue()==(Float)(convertedBagTuple.get(1)));
                 }
-                
+
 
             }
         }
-        
+
         {
             Tuple t = tf.newTuple();
             t.append(GenRandomData.genRandSmallTupDataBagWithNulls(r, 20, 100));
@@ -2348,16 +2358,16 @@ public class TestPOCast extends TestCase
             Result res = op.getNext(db);
             Iterator<Tuple> expectedBagIterator = ((DataBag)(t.get(0))).iterator();
             Iterator<Tuple> convertedBagIterator = ((DataBag)(res.result)).iterator();
-            
+
             while(expectedBagIterator.hasNext()) {
                 Tuple expectedBagTuple = expectedBagIterator.next();
                 Tuple convertedBagTuple = convertedBagIterator.next();
-                
+
                 if(expectedBagTuple.get(0) != null){
                     assertTrue(convertedBagTuple.get(0) instanceof String);
                     assertTrue(expectedBagTuple.get(0).equals(convertedBagTuple.get(0)));
                 }
-                
+
                 if(expectedBagTuple.get(1) != null){
                     assertTrue(convertedBagTuple.get(1) instanceof Integer);
                     assertTrue(((Integer)(expectedBagTuple.get(1)))==(Integer)(convertedBagTuple.get(1)));
@@ -2366,7 +2376,7 @@ public class TestPOCast extends TestCase
             }
         }
 	}
-	
+
 	@Test
 	public void testMapToOther() throws IOException {
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -2377,7 +2387,7 @@ public class TestPOCast extends TestCase
 		plan.add(op);
 		plan.connect(prj, op);
 		prj.setResultType(DataType.MAP);
-		
+
 		// Plan to test when result type is ByteArray and casting is requested
 		// for example casting of values coming out of map lookup.
 		POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", r.nextLong()), -1);
@@ -2391,21 +2401,21 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(map);
 			//System.out.println(res.result + " : " + t);
 			assertEquals(map, res.result);
-		     
+
 			planToTestBACasts.attachInput(t);
 			res = opWithInputTypeAsBA.getNext(map);
 			assertEquals(map, res.result);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandMap(r, 10));
 			plan.attachInput(t);
 			Result res = op.getNext(t);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
-			
+
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandMap(r, 10));
@@ -2413,9 +2423,9 @@ public class TestPOCast extends TestCase
 			DataBag b = null;
 			Result res = op.getNext(b);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
-			
+
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandMap(r, 10));
@@ -2423,15 +2433,15 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
-		{     
+
+		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandMap(r, 10));
 			Long i = null;
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandMap(r, 10));
@@ -2439,7 +2449,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandMap(r, 10));
@@ -2448,7 +2458,7 @@ public class TestPOCast extends TestCase
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
 
-        {     
+        {
             Tuple t = tf.newTuple();
             t.append(GenRandomData.genRandMap(r, 10));
             DateTime dt = null;
@@ -2463,7 +2473,7 @@ public class TestPOCast extends TestCase
 			Result res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(GenRandomData.genRandMap(r, 10));
@@ -2478,7 +2488,7 @@ public class TestPOCast extends TestCase
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
 	}
-	
+
 	@Test
 	public void testNullToOther() throws PlanException, ExecException {
 		//Create data
@@ -2492,16 +2502,16 @@ public class TestPOCast extends TestCase
 				t.append(null);
 				bag.add(t);
 			}
-			
+
 		}
-		
+
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		PhysicalPlan plan = new PhysicalPlan();       
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
-		
+
         prj.setResultType(DataType.BOOLEAN);
 
         for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
@@ -2516,94 +2526,94 @@ public class TestPOCast extends TestCase
 
         }
 
-        prj.setResultType(DataType.INTEGER); 
-		
+        prj.setResultType(DataType.INTEGER);
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
 			if(t.get(0) == null) {
-				
+
 				Integer result  = (Integer)op.getNext((Integer)null).result;
 				assertEquals( null, result);
 
-			} 
-			
+			}
+
 		}
-		
+
 		prj.setResultType(DataType.FLOAT);
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
 			if(t.get(0) == null) {
-				
+
 				Integer result  = (Integer)op.getNext((Integer)null).result;
 				assertEquals( null, result);
 
-			} 
-			
+			}
+
 		}
-		
+
 		prj.setResultType(DataType.DOUBLE);
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
 			if(t.get(0) == null) {
-				
+
 				Double result = (Double) op.getNext((Double) null).result;
 			assertEquals(null, result);
 
 			}
 		}
 
-        prj.setResultType(DataType.DATETIME); 
-        
+        prj.setResultType(DataType.DATETIME);
+
         for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
             plan.attachInput(t);
             if(t.get(0) == null) {
-                
+
                 DateTime result  = (DateTime)op.getNext((DateTime)null).result;
                 assertEquals( null, result);
 
-            } 
-            
-        }	
+            }
+
+        }
 
 		prj.setResultType(DataType.CHARARRAY);
-		
+
 		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
 			Tuple t = it.next();
 			plan.attachInput(t);
 			if(t.get(0) == null) {
-				
+
 				String result  = (String)op.getNext((String)null).result;
 				assertEquals( null, result);
 
-			} 
-			
+			}
+
 		}
-		
+
 		prj.setResultType(DataType.BYTEARRAY);
-		
+
 		TupleFactory tf = TupleFactory.getInstance();
-		
+
 		{
 			Tuple t = tf.newTuple();
 			t.append(new DataByteArray((new Integer(r.nextInt())).toString().getBytes()));
 			plan.attachInput(t);
 			if(t.get(0) == null) {
-				
+
 				DataByteArray result = (DataByteArray) op.getNext((String) null).result;
 				assertEquals(null, result);
-				
+
 			}
-			
+
 		}
-		
+
 	}
-	
+
 	@Test
 	public void testValueTypesChanged() throws IOException {
 

Modified: pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java Thu Aug 30 05:10:34 2012
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -201,16 +202,6 @@ public class TestPOPartialAgg {
     }
 
     @Test
-    public void testPartialMultiInput1HashMemEmpty() throws Exception {
-        // input tuple has key, and bag containing SUM.Init output
-        // gby keys in consecutive row, they get aggregated even when
-        // hashmap is not given any memory
-        String[] inputTups = { "(1,(1L))", "(1,(2L))", "(2,(1L))" };
-        String[] outputTups = { "(1,(3L))", "(2,(1L))" };
-        checkInputAndOutput(inputTups, outputTups, true);
-    }
-
-    @Test
     public void testMultiInput1HashMemEmpty() throws Exception {
         // input tuple has key, and bag containing SUM.Init output
         String[] inputTups = { "(1,(1L))", "(2,(2L))", "(1,(2L))" };
@@ -244,7 +235,7 @@ public class TestPOPartialAgg {
 
         PigMapReduce.sJobConfInternal.set(new Configuration());
         if (isMapMemEmpty) {
-            PigMapReduce.sJobConfInternal.get().set("pig.cachedbag.memusage",
+            PigMapReduce.sJobConfInternal.get().set(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE,
                     "0");
         }
 

Modified: pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java?rev=1378801&r1=1378800&r2=1378801&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java Thu Aug 30 05:10:34 2012
@@ -25,8 +25,8 @@ import java.util.Iterator;
 import java.util.Properties;
 
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -62,7 +62,7 @@ public class TestPOPartialAggPlan  {
     public void testMapAggPropFalse() throws Exception{
         //test with pig.exec.mapPartAgg set to false
         String query = getGByQuery();
-        pc.getProperties().setProperty(MapReduceLauncher.PROP_EXEC_MAP_PARTAGG, "false");
+        pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, "false");
         MROperPlan mrp = Util.buildMRPlan(query, pc);
         assertEquals(mrp.size(), 1);
 
@@ -73,7 +73,7 @@ public class TestPOPartialAggPlan  {
     public void testMapAggPropTrue() throws Exception{
         //test with pig.exec.mapPartAgg to true
         String query = getGByQuery();
-        pc.getProperties().setProperty(MapReduceLauncher.PROP_EXEC_MAP_PARTAGG, "true");
+        pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, "true");
         MROperPlan mrp = Util.buildMRPlan(query, pc);
         assertEquals(mrp.size(), 1);
 
@@ -100,7 +100,7 @@ public class TestPOPartialAggPlan  {
         String query = "l = load 'x' as (a,b,c);" +
                 "g = group l by a;" +
                 "f = foreach g generate group;";
-        pc.getProperties().setProperty(MapReduceLauncher.PROP_EXEC_MAP_PARTAGG, "true");
+        pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, "true");
         MROperPlan mrp = Util.buildMRPlan(query, pc);
         assertEquals(mrp.size(), 1);
 
@@ -113,19 +113,19 @@ public class TestPOPartialAggPlan  {
         String query = "l = load 'x' as (a,b,c);" +
                 "g = group l by a;" +
                 "f = foreach g generate group, COUNT(l.b), l.b;";
-        pc.getProperties().setProperty(MapReduceLauncher.PROP_EXEC_MAP_PARTAGG, "true");
+        pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, "true");
         MROperPlan mrp = Util.buildMRPlan(query, pc);
         assertEquals(mrp.size(), 1);
 
         assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
     }
-    
+
     private PhysicalOperator findPOPartialAgg(PhysicalPlan mapPlan) {
         Iterator<PhysicalOperator> it = mapPlan.iterator();
         while(it.hasNext()){
-            PhysicalOperator op = (PhysicalOperator) it.next();
+            PhysicalOperator op = it.next();
             if(op instanceof POPartialAgg){
-                return (POPartialAgg)op;
+                return op;
             }
         }
         return null;