You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/19 21:33:42 UTC

svn commit: r657934 - in /incubator/pig/branches/types: ./ src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ test/org/apache/pig/test/ test/org/apache/pig/test/utils/

Author: gates
Date: Mon May 19 12:33:42 2008
New Revision: 657934

URL: http://svn.apache.org/viewvc?rev=657934&view=rev
Log:
PIG-161 Shubham's implementation of POCast.


Added:
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/POCastDummy.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POCast.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=657934&r1=657933&r2=657934&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Mon May 19 12:33:42 2008
@@ -144,7 +144,7 @@
                 **/test/TestProject.java, **/test/TestLoad.java, **/test/TestStore.java,
                 **/test/FakeFSOutputStream.java, **/test/TestPackage.java, **/test/TestForEach.java,
         		**/test/TestLocalRearrange.java, **/test/TestPOUserFunc.java,
-        		**/test/TestPODistinct.java, **/test/TestPOSort.java,
+        		**/test/TestPODistinct.java, **/test/TestPOSort.java, **/test/TestPOCast.java,
         		**/test/TestSchema.java, **/test/TestLogicalPlanBuilder.java,**/test/TestUnion.java, **/test/TestMRCompiler.java,
                 **/test/FakeFSInputStream.java, **/test/Util.java, **/test/TestJobSubmission.java,
         		**/test/TestLocalJobSubmission.java, **/test/TestPOMapLookUp.java, 
@@ -283,6 +283,7 @@
                 	<include name="**/TestPOBinCond.java" />
                 	<include name="**/TestPONegative.java" />
                 	<include name="**/TestGrunt.java" />
+                	<include name="**/TestPOCast.java" />
                     <!--
                     <include name="**/*Test*.java" />
                     <exclude name="**/TestLargeFile.java" />

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POCast.java?rev=657934&r1=657933&r2=657934&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POCast.java Mon May 19 12:33:42 2008
@@ -17,12 +17,22 @@
  */
 package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
 
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
 import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -32,9 +42,9 @@
  */
 public class POCast extends ExpressionOperator {
 
-    /**
-     * 
-     */
+	LoadFunc load;
+	private Log log = LogFactory.getLog(getClass());
+	
     private static final long serialVersionUID = 1L;
 
     public POCast(OperatorKey k) {
@@ -46,10 +56,14 @@
         super(k, rp);
         // TODO Auto-generated constructor stub
     }
+    
+    public void setLoad(LoadFunc load) {
+    	this.load = load;
+    }
 
     @Override
     public void visit(ExprPlanVisitor v) throws VisitorException {
-        // TODO Auto-generated method stub
+        v.visitCast(this);
 
     }
 
@@ -66,34 +80,720 @@
 
     @Override
     public Result getNext(Integer i) throws ExecException {
-        Result res = inputs.get(0).getNext(i);
+    	PhysicalOperator in = inputs.get(0);
+    	Byte resultType = in.getResultType();
+        switch(resultType) {
+        case DataType.BAG : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;
+        }
+        
+        case DataType.TUPLE : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;
+        }
+
+        case DataType.BYTEARRAY : {
+        	DataByteArray dba = null;
+        	Result res = in.getNext(dba);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = new Integer(Integer.valueOf((((DataByteArray)res.result).toString())));
+        		dba = (DataByteArray) res.result;
+        		try {
+					res.result = load.bytesToInteger(dba.get());
+				} catch (IOException e) {
+					log.error("Error while casting from ByteArray to Integer");
+				}
+        	}
+        	return res;
+        }
+        
+        case DataType.MAP : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;        	
+        }
+        
+        case DataType.BOOLEAN : {
+        	Boolean b = null;
+        	Result res = in.getNext(b);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		if (((Boolean)res.result) == true) res.result = new Integer(1);
+                else res.result = new Integer(0);
+        	}
+        	return res;
+        }
+        case DataType.INTEGER : {
+        	
+        	Result res = in.getNext(i);
+        	return res;
+        }
+
+        case DataType.DOUBLE : {
+        	Double d = null;
+        	Result res = in.getNext(d);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = DataType.toInteger(res.result);
+        		res.result = new Integer(((Double)res.result).intValue());
+        	}
+        	return res;
+        }
+
+        case DataType.LONG : {
+        	Long l = null;
+        	Result res = in.getNext(l);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Integer(((Long)res.result).intValue());
+        	}
+        	return res;
+        }
+        
+        case DataType.FLOAT : {
+        	Float f = null;
+        	Result res = in.getNext(f);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Integer(((Float)res.result).intValue());
+        	}
+        	return res;
+        }
+        
+        case DataType.CHARARRAY : {
+        	String str = null;
+        	Result res = in.getNext(str);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Integer(Integer.valueOf((String)res.result));
+        	}
+        	return res;
+        }
+        
+        }
+        
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+    
+    @Override
+    public Result getNext(Long l) throws ExecException {
+    	PhysicalOperator in = inputs.get(0);
+    	Byte resultType = in.getResultType();
+        switch(resultType) {
+        case DataType.BAG : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;
+        }
+        
+        case DataType.TUPLE : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;
+        }
+
+        case DataType.MAP : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;        	
+        }
+                
+        case DataType.BYTEARRAY : {
+        	DataByteArray dba = null;
+        	Result res = in.getNext(dba);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = new Long(Long.valueOf((((DataByteArray)res.result).toString())));
+        		dba = (DataByteArray) res.result;
+        		try {
+					res.result = load.bytesToLong(dba.get());
+				} catch (IOException e) {
+					log.error("Error while casting from ByteArray to Long");
+				}
+        	}
+        	return res;
+        }
+        
+        case DataType.BOOLEAN : {
+        	Boolean b = null;
+        	Result res = in.getNext(b);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		if (((Boolean)res.result) == true) res.result = new Long(1);
+                else res.result = new Long(0);
+        	}
+        	return res;
+        }
+        case DataType.INTEGER : {
+        	Integer dummyI = null;
+        	Result res = in.getNext(dummyI);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Long(((Integer)res.result).longValue());
+        	}
+        	return res;
+        }
+
+        case DataType.DOUBLE : {
+        	Double d = null;
+        	Result res = in.getNext(d);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = DataType.toInteger(res.result);
+        		res.result = new Long(((Double)res.result).longValue());
+        	}
+        	return res;
+        }
+
+        case DataType.LONG : {
+        	
+        	Result res = in.getNext(l);
+        	
+        	return res;
+        }
+        
+        case DataType.FLOAT : {
+        	Float f = null;
+        	Result res = in.getNext(f);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Long(((Float)res.result).longValue());
+        	}
+        	return res;
+        }
+        
+        case DataType.CHARARRAY : {
+        	String str = null;
+        	Result res = in.getNext(str);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Long(Long.valueOf((String)res.result));
+        	}
+        	return res;
+        }
+        
+        }
+        
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+    
+    @Override
+    public Result getNext(Double d) throws ExecException {
+    	PhysicalOperator in = inputs.get(0);
+    	Byte resultType = in.getResultType();
+        switch(resultType) {
+        case DataType.BAG : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;
+        }
+        
+        case DataType.TUPLE : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;
+        }
+
+        case DataType.MAP : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;        	
+        }
+                
+        case DataType.BYTEARRAY : {
+        	DataByteArray dba = null;
+        	Result res = in.getNext(dba);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = new Double(Double.valueOf((((DataByteArray)res.result).toString())));
+        		dba = (DataByteArray) res.result;
+        		try {
+					res.result = load.bytesToDouble(dba.get());
+				} catch (IOException e) {
+					log.error("Error while casting from ByteArray to Double");
+				}
+        	}
+        	return res;
+        }
+        
+        case DataType.BOOLEAN : {
+        	Boolean b = null;
+        	Result res = in.getNext(b);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		if (((Boolean)res.result) == true) res.result = new Double(1);
+                else res.result = new Double(0);
+        	}
+        	return res;
+        }
+        case DataType.INTEGER : {
+        	Integer dummyI = null;
+        	Result res = in.getNext(dummyI);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Double(((Integer)res.result).doubleValue());
+        	}
+        	return res;
+        }
+
+        case DataType.DOUBLE : {
+        	
+        	Result res = in.getNext(d);
+        	
+        	return res;
+        }
+
+        case DataType.LONG : {
+        	Long l = null;
+        	Result res = in.getNext(l);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Double(((Long)res.result).doubleValue());
+        	}
+        	return res;
+        }
+        
+        case DataType.FLOAT : {
+        	Float f = null;
+        	Result res = in.getNext(f);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Double(((Float)res.result).doubleValue());
+        	}
+        	return res;
+        }
+        
+        case DataType.CHARARRAY : {
+        	String str = null;
+        	Result res = in.getNext(str);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Double(Double.valueOf((String)res.result));
+        	}
+        	return res;
+        }
+        
+        }
+        
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+    
+    @Override
+    public Result getNext(Float f) throws ExecException {
+    	PhysicalOperator in = inputs.get(0);
+    	Byte resultType = in.getResultType();
+        switch(resultType) {
+        case DataType.BAG : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;
+        }
+        
+        case DataType.TUPLE : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;
+        }
+
+        case DataType.MAP : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;        	
+        }
+                
+        case DataType.BYTEARRAY : {
+        	DataByteArray dba = null;
+        	Result res = in.getNext(dba);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = new Float(Float.valueOf((((DataByteArray)res.result).toString())));
+        		dba = (DataByteArray) res.result;
+        		try {
+					res.result = load.bytesToFloat(dba.get());
+				} catch (IOException e) {
+					log.error("Error while casting from ByteArray to Float");
+				}
+        	}
+        	return res;
+        }
+        
+        case DataType.BOOLEAN : {
+        	Boolean b = null;
+        	Result res = in.getNext(b);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		if (((Boolean)res.result) == true) res.result = new Float(1);
+                else res.result = new Float(0);
+        	}
+        	return res;
+        }
+        case DataType.INTEGER : {
+        	Integer dummyI = null;
+        	Result res = in.getNext(dummyI);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Float(((Integer)res.result).floatValue());
+        	}
+        	return res;
+        }
+
+        case DataType.DOUBLE : {
+        	Double d = null;
+        	Result res = in.getNext(d);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = DataType.toInteger(res.result);
+        		res.result = new Float(((Double)res.result).floatValue());
+        	}
+        	return res;
+        }
+
+        case DataType.LONG : {
+        	
+        	Long l = null;
+        	Result res = in.getNext(l);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Float(((Long)res.result).floatValue());
+        	}
+        	return res;
+        }
+        
+        case DataType.FLOAT : {
+        
+        	Result res = in.getNext(f);
+        	
+        	return res;
+        }
+        
+        case DataType.CHARARRAY : {
+        	String str = null;
+        	Result res = in.getNext(str);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new Float(Float.valueOf((String)res.result));
+        	}
+        	return res;
+        }
+        
+        }
+        
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+    
+    @Override
+    public Result getNext(String str) throws ExecException {
+    	PhysicalOperator in = inputs.get(0);
+    	Byte resultType = in.getResultType();
+        switch(resultType) {
+        case DataType.BAG : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;
+        }
+        
+        case DataType.TUPLE : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;
+        }
+
+        case DataType.MAP : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res;        	
+        }
+                
+        case DataType.BYTEARRAY : {
+        	DataByteArray dba = null;
+        	Result res = in.getNext(dba);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = new String(((DataByteArray)res.result).toString());
+        		dba = (DataByteArray) res.result;
+        		try {
+					res.result = load.bytesToCharArray(dba.get());
+				} catch (IOException e) {
+					log.error("Error while casting from ByteArray to CharArray");
+				}
+        	}
+        	return res;
+        }
+        
+        case DataType.BOOLEAN : {
+        	Boolean b = null;
+        	Result res = in.getNext(b);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		if (((Boolean)res.result) == true) res.result = new String("1");
+                else res.result = new String("1");
+        	}
+        	return res;
+        }
+        case DataType.INTEGER : {
+        	Integer dummyI = null;
+        	Result res = in.getNext(dummyI);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new String(((Integer)res.result).toString());
+        	}
+        	return res;
+        }
+
+        case DataType.DOUBLE : {
+        	Double d = null;
+        	Result res = in.getNext(d);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = DataType.toInteger(res.result);
+        		res.result = new String(((Double)res.result).toString());
+        	}
+        	return res;
+        }
+
+        case DataType.LONG : {
+        	
+        	Long l = null;
+        	Result res = in.getNext(l);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new String(((Long)res.result).toString());
+        	}
+        	return res;
+        }
+        
+        case DataType.FLOAT : {
+        	Float f = null;
+        	Result res = in.getNext(f);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		res.result = new String(((Float)res.result).toString());
+        	}
+        	return res;
+        }
+        
+        case DataType.CHARARRAY : {
+        	
+        	Result res = in.getNext(str);
+        	
+        	return res;
+        }
+        
+        }
+        
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+    
+    @Override
+    public Result getNext(DataByteArray dba) throws ExecException {
+    	String str = null;
+    	Result res = getNext(str);
+    	if(res.returnStatus == POStatus.STATUS_OK) {
+    		res.result = new DataByteArray(((String)res.result).getBytes());
+    	}
+    	return res;
+    }
+    
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+    	PhysicalOperator in = inputs.get(0);
+    	Byte resultType = in.getResultType();
+        switch(resultType) {
+        
+        case DataType.TUPLE : {
+        	Result res = in.getNext(t);
+        	return res;
+        }
+        
+        case DataType.BYTEARRAY : {
+        	DataByteArray dba = null;
+        	Result res = in.getNext(dba);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = new String(((DataByteArray)res.result).toString());
+        		dba = (DataByteArray) res.result;
+        		try {
+					res.result = load.bytesToTuple(dba.get());
+				} catch (IOException e) {
+					log.error("Error while casting from ByteArray to Tuple");
+				}
+        	}
+        	return res;
+        }
 
-        if(res.returnStatus != POStatus.STATUS_OK){
-            return res;
+        case DataType.BAG :
+        
+        case DataType.MAP : 
+        	
+        case DataType.INTEGER :
+        	
+        case DataType.DOUBLE :
+        	
+        case DataType.LONG :
+        	
+        case DataType.FLOAT :
+        	
+        case DataType.CHARARRAY :
+        
+        case DataType.BOOLEAN : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res; 
         }
         
-        if(res.result instanceof DataByteArray){
-            String rslt = ((DataByteArray)res.result).toString();
-            res.result = Integer.parseInt(rslt.trim());
-            return res;
+        
         }
-        return new Result();
+        
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
     }
+    
+    @Override
+    public Result getNext(DataBag bag) throws ExecException {
+    	PhysicalOperator in = inputs.get(0);
+    	Byte resultType = in.getResultType();
+        switch(resultType) {
+        
+        case DataType.BAG : {
+        	Result res = in.getNext(bag);
+        	return res;
+        }
+        
+        case DataType.BYTEARRAY : {
+        	DataByteArray dba = null;
+        	Result res = in.getNext(dba);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = new String(((DataByteArray)res.result).toString());
+        		dba = (DataByteArray) res.result;
+        		try {
+					res.result = load.bytesToBag(dba.get());
+				} catch (IOException e) {
+					log.error("Error while casting from ByteArray to DataBag");
+				}
+        	}
+        	return res;
+        }
 
+        case DataType.TUPLE :
+        
+        case DataType.MAP : 
+        	
+        case DataType.INTEGER :
+        	
+        case DataType.DOUBLE :
+        	
+        case DataType.LONG :
+        	
+        case DataType.FLOAT :
+        	
+        case DataType.CHARARRAY :
+        
+        case DataType.BOOLEAN : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res; 
+        }
+        
+        
+        }
+        
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+    
     @Override
-    public Result getNext(String s) throws ExecException {
-        Result res = inputs.get(0).getNext(s);
+    public Result getNext(Map m) throws ExecException {
+    	PhysicalOperator in = inputs.get(0);
+    	Byte resultType = in.getResultType();
+        switch(resultType) {
+        
+        case DataType.MAP : {
+        	Result res = in.getNext(m);
+        	return res;
+        }
+        
+        case DataType.BYTEARRAY : {
+        	DataByteArray dba = null;
+        	Result res = in.getNext(dba);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = new String(((DataByteArray)res.result).toString());
+        		dba = (DataByteArray) res.result;
+        		try {
+					res.result = load.bytesToMap(dba.get());
+				} catch (IOException e) {
+					log.error("Error while casting from ByteArray to Map");
+				}
+        	}
+        	return res;
+        }
 
-        if(res.returnStatus != POStatus.STATUS_OK){
-            return res;
+        case DataType.TUPLE :
+        
+        case DataType.BAG : 
+        	
+        case DataType.INTEGER :
+        	
+        case DataType.DOUBLE :
+        	
+        case DataType.LONG :
+        	
+        case DataType.FLOAT :
+        	
+        case DataType.CHARARRAY :
+        
+        case DataType.BOOLEAN : {
+        	Result res = new Result();
+        	res.returnStatus = POStatus.STATUS_ERR;
+        	return res; 
+        }
+        
+        
+        }
+        
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+    
+    public Result getNext(Boolean b) throws ExecException {
+    	PhysicalOperator in = inputs.get(0);
+    	Byte resultType = in.getResultType();
+        switch(resultType) {
+        
+        case DataType.MAP :
+        
+        case DataType.TUPLE :
+            
+        case DataType.BAG :
+        	
+        case DataType.BYTEARRAY : {
+        	DataByteArray dba = null;
+        	Result res = in.getNext(dba);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		//res.result = new String(((DataByteArray)res.result).toString());
+        		dba = (DataByteArray) res.result;
+        		try {
+					res.result = load.bytesToBoolean(dba.get());
+				} catch (IOException e) {
+					log.error("Error while casting from ByteArray to Boolean");
+				}
+        	}
+        	return res;
+        }
+        	
+        case DataType.INTEGER :
+        	
+        case DataType.DOUBLE :
+        	
+        case DataType.LONG :
+        	
+        case DataType.FLOAT :
+        	
+        case DataType.CHARARRAY : {
+        	String str = null;
+        	Result res = in.getNext(str);
+        	if(res.returnStatus == POStatus.STATUS_OK) {
+        		if(((String)res.result).length() > 0)
+        			res.result = new Boolean(true);
+        		else res.result = new Boolean(false);
+        	}
+        	return res;
+   
         }
         
-        if(res.result instanceof DataByteArray){
-            String rslt = ((DataByteArray)res.result).toString();
-            res.result = rslt;
-            return res;
+        case DataType.BOOLEAN : {
+        	Result res = in.getNext(b);
+        	return res;
+        }
         }
-        return new Result();
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
     }
     
     

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java?rev=657934&r1=657933&r2=657934&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java Mon May 19 12:33:42 2008
@@ -69,7 +69,7 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCastDummy;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator;
 import org.apache.pig.impl.plan.VisitorException;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java?rev=657934&r1=657933&r2=657934&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java Mon May 19 12:33:42 2008
@@ -70,7 +70,7 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCastDummy;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator;
 import org.apache.pig.impl.plan.VisitorException;

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java?rev=657934&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Mon May 19 12:33:42 2008
@@ -0,0 +1,1078 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestPOCast extends TestCase {
+
+	Random r = new Random();
+	final int MAX = 10;
+	
+	@Test
+	public void testIntegerToOther() throws PlanException, ExecException {
+		//Create data
+		DataBag bag = BagFactory.getInstance().newDefaultBag();
+		for(int i = 0; i < MAX; i++) {
+			Tuple t = TupleFactory.getInstance().newTuple();
+			t.append(r.nextInt());
+			bag.add(t);
+		}
+		
+		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		ExprPlan plan = new ExprPlan();
+		plan.add(prj);
+		plan.add(op);
+		plan.connect(prj, op);
+		
+		prj.setResultType(DataType.INTEGER);
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Integer i = (Integer) t.get(0);
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Float f = ((Integer)t.get(0)).floatValue();
+			Result res = op.getNext(f);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+//				System.out.println(res.result + " : " + f);
+				assertEquals(f, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Long l = ((Integer)t.get(0)).longValue();
+			Result res = op.getNext(l);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + l);
+				assertEquals(l, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Double d = ((Integer)t.get(0)).doubleValue();
+			Result res = op.getNext(d);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + d);
+				assertEquals(d, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			String str = ((Integer)t.get(0)).toString();
+			Result res = op.getNext(str);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + str);
+				assertEquals(str, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			DataByteArray dba = new DataByteArray(((Integer)t.get(0)).toString().getBytes());
+			Result res = op.getNext(dba);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + dba);
+				assertEquals(dba, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Map map = null;
+			Result res = op.getNext(map);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Result res = op.getNext(t);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			DataBag b = null;
+			Result res = op.getNext(b);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+	}
+	
+	@Test
+	public void testLongToOther() throws PlanException, ExecException {
+		//Create data
+		DataBag bag = BagFactory.getInstance().newDefaultBag();
+		for(int i = 0; i < MAX; i++) {
+			Tuple t = TupleFactory.getInstance().newTuple();
+			t.append(r.nextLong());
+			bag.add(t);
+		}
+		
+		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		ExprPlan plan = new ExprPlan();
+		plan.add(prj);
+		plan.add(op);
+		plan.connect(prj, op);
+		
+		prj.setResultType(DataType.LONG);
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Integer i = ((Long) t.get(0)).intValue();
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Float f = ((Long)t.get(0)).floatValue();
+			Result res = op.getNext(f);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+//				System.out.println(res.result + " : " + f);
+				assertEquals(f, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Long l = ((Long)t.get(0)).longValue();
+			Result res = op.getNext(l);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + l);
+				assertEquals(l, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Double d = ((Long)t.get(0)).doubleValue();
+			Result res = op.getNext(d);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + d);
+				assertEquals(d, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			String str = ((Long)t.get(0)).toString();
+			Result res = op.getNext(str);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + str);
+				assertEquals(str, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			DataByteArray dba = new DataByteArray(((Long)t.get(0)).toString().getBytes());
+			Result res = op.getNext(dba);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + dba);
+				assertEquals(dba, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Map map = null;
+			Result res = op.getNext(map);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Result res = op.getNext(t);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			DataBag b = null;
+			Result res = op.getNext(b);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+	}
+	
+	@Test
+	public void testFloatToOther() throws PlanException, ExecException {
+		//Create data
+		DataBag bag = BagFactory.getInstance().newDefaultBag();
+		for(int i = 0; i < MAX; i++) {
+			Tuple t = TupleFactory.getInstance().newTuple();
+			t.append(r.nextFloat());
+			bag.add(t);
+		}
+		
+		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		ExprPlan plan = new ExprPlan();
+		plan.add(prj);
+		plan.add(op);
+		plan.connect(prj, op);
+		
+		prj.setResultType(DataType.FLOAT);
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Integer i = ((Float) t.get(0)).intValue();
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Float f = ((Float)t.get(0)).floatValue();
+			Result res = op.getNext(f);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+//				System.out.println(res.result + " : " + f);
+				assertEquals(f, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Long l = ((Float)t.get(0)).longValue();
+			Result res = op.getNext(l);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + l);
+				assertEquals(l, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Double d = ((Float)t.get(0)).doubleValue();
+			Result res = op.getNext(d);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + d);
+				assertEquals(d, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			String str = ((Float)t.get(0)).toString();
+			Result res = op.getNext(str);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + str);
+				assertEquals(str, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			DataByteArray dba = new DataByteArray(((Float)t.get(0)).toString().getBytes());
+			Result res = op.getNext(dba);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + dba);
+				assertEquals(dba, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Map map = null;
+			Result res = op.getNext(map);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Result res = op.getNext(t);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			DataBag b = null;
+			Result res = op.getNext(b);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+	}
+	
+	@Test
+	public void testDoubleToOther() throws PlanException, ExecException {
+		//Create data
+		DataBag bag = BagFactory.getInstance().newDefaultBag();
+		for(int i = 0; i < MAX; i++) {
+			Tuple t = TupleFactory.getInstance().newTuple();
+			t.append(r.nextDouble());
+			bag.add(t);
+		}
+		
+		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		ExprPlan plan = new ExprPlan();
+		plan.add(prj);
+		plan.add(op);
+		plan.connect(prj, op);
+		
+		prj.setResultType(DataType.DOUBLE);
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Integer i = ((Double) t.get(0)).intValue();
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Float f = ((Double)t.get(0)).floatValue();
+			Result res = op.getNext(f);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+//				System.out.println(res.result + " : " + f);
+				assertEquals(f, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Long l = ((Double)t.get(0)).longValue();
+			Result res = op.getNext(l);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + l);
+				assertEquals(l, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Double d = ((Double)t.get(0)).doubleValue();
+			Result res = op.getNext(d);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + d);
+				assertEquals(d, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			String str = ((Double)t.get(0)).toString();
+			Result res = op.getNext(str);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + str);
+				assertEquals(str, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			DataByteArray dba = new DataByteArray(((Double)t.get(0)).toString().getBytes());
+			Result res = op.getNext(dba);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + dba);
+				assertEquals(dba, res.result);
+			}
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Map map = null;
+			Result res = op.getNext(map);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			Result res = op.getNext(t);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+			Tuple t = it.next();
+			plan.attachInput(t);
+			DataBag b = null;
+			Result res = op.getNext(b);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+	}
+	
+	@Test
+	public void testStringToOther() throws PlanException, ExecException {
+		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		ExprPlan plan = new ExprPlan();
+		plan.add(prj);
+		plan.add(op);
+		plan.connect(prj, op);
+		
+		prj.setResultType(DataType.CHARARRAY);
+		
+		TupleFactory tf = TupleFactory.getInstance();
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append((new Integer(r.nextInt())).toString());
+			plan.attachInput(t);
+			Integer i = Integer.valueOf(((String) t.get(0)));
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append((new Float(r.nextFloat())).toString());
+			plan.attachInput(t);
+			Float i = Float.valueOf(((String) t.get(0)));
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append((new Long(r.nextLong())).toString());
+			plan.attachInput(t);
+			Long i = Long.valueOf(((String) t.get(0)));
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append((new Double(r.nextDouble())).toString());
+			plan.attachInput(t);
+			Double i = Double.valueOf(((String) t.get(0)));
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			plan.attachInput(t);
+			String str = (String) t.get(0);
+			Result res = op.getNext(str);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + str);
+				assertEquals(str, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+		
+			plan.attachInput(t);
+			DataByteArray dba = new DataByteArray(((String)t.get(0)).getBytes());
+			Result res = op.getNext(dba);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + dba);
+				assertEquals(dba, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			plan.attachInput(t);
+			Map map = null;
+			Result res = op.getNext(map);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			plan.attachInput(t);
+			Result res = op.getNext(t);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			plan.attachInput(t);
+			DataBag b = null;
+			Result res = op.getNext(b);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+	}
+	
+	public static class TestLoader implements LoadFunc{
+        public void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException {
+            
+        }
+        
+        public Tuple getNext() throws IOException {
+            return null;
+        }
+        
+        public Schema determineSchema(URL filename) {
+            return null;
+        }
+        
+        public void fieldsToRead(Schema schema) {
+            
+        }
+        
+        public DataBag bytesToBag(byte[] b) throws IOException {
+            return null;
+        }
+
+        public Boolean bytesToBoolean(byte[] b) throws IOException {
+        	DataByteArray dba = new DataByteArray(b);
+        	String str = dba.toString();
+        	if(str.length() == 0)
+        		return new Boolean(false);
+        	else return new Boolean(true);
+        }
+        
+        public String bytesToCharArray(byte[] b) throws IOException {
+        	DataByteArray dba = new DataByteArray(b);
+            return dba.toString();
+        }
+        
+        public Double bytesToDouble(byte[] b) throws IOException {
+            return new Double(Double.valueOf(new DataByteArray(b).toString()));
+        }
+        
+        public Float bytesToFloat(byte[] b) throws IOException {
+            return new Float(Float.valueOf(new DataByteArray(b).toString()));
+        }
+        
+        public Integer bytesToInteger(byte[] b) throws IOException {
+            return new Integer(Integer.valueOf(new DataByteArray(b).toString()));
+        }
+
+        public Long bytesToLong(byte[] b) throws IOException {
+            return new Long(Long.valueOf(new DataByteArray(b).toString()));
+        }
+
+        public Map<Object, Object> bytesToMap(byte[] b) throws IOException {
+            return null;
+        }
+
+        public Tuple bytesToTuple(byte[] b) throws IOException {
+            return null;
+        }        
+    }
+	
+	@Test
+	public void testByteArrayToOther() throws PlanException, ExecException {
+		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+		LoadFunc load = new TestLoader();
+		op.setLoad(load);
+		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		ExprPlan plan = new ExprPlan();
+		plan.add(prj);
+		plan.add(op);
+		plan.connect(prj, op);
+		
+		prj.setResultType(DataType.BYTEARRAY);
+		
+		TupleFactory tf = TupleFactory.getInstance();
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(new DataByteArray((new Integer(r.nextInt())).toString().getBytes()));
+			plan.attachInput(t);
+			Integer i = Integer.valueOf(((DataByteArray) t.get(0)).toString());
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(new DataByteArray((new Float(r.nextFloat())).toString().getBytes()));
+			plan.attachInput(t);
+			Float i = Float.valueOf(((DataByteArray) t.get(0)).toString());
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(new DataByteArray((new Long(r.nextLong())).toString().getBytes()));
+			plan.attachInput(t);
+			Long i = Long.valueOf(((DataByteArray) t.get(0)).toString());
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(new DataByteArray((new Double(r.nextDouble())).toString().getBytes()));
+			plan.attachInput(t);
+			Double i = Double.valueOf(((DataByteArray) t.get(0)).toString());
+			Result res = op.getNext(i);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + i);
+				assertEquals(i, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(new DataByteArray(GenRandomData.genRandString(r).getBytes()));
+			plan.attachInput(t);
+			String str = ((DataByteArray) t.get(0)).toString();
+			Result res = op.getNext(str);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + str);
+				assertEquals(str, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(new DataByteArray(GenRandomData.genRandString(r).getBytes()));
+		
+			plan.attachInput(t);
+			DataByteArray dba = (DataByteArray) t.get(0);
+			Result res = op.getNext(dba);
+			if(res.returnStatus == POStatus.STATUS_OK) {
+				//System.out.println(res.result + " : " + dba);
+				assertEquals(dba, res.result);
+			}
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(new DataByteArray(GenRandomData.genRandString(r).getBytes()));
+			plan.attachInput(t);
+			Map map = null;
+			Result res = op.getNext(map);
+			//assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+			assertEquals(POStatus.STATUS_OK, res.returnStatus);
+			assertEquals(null, res.result);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(new DataByteArray(GenRandomData.genRandString(r).getBytes()));
+			plan.attachInput(t);
+			Result res = op.getNext(t);
+			//assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+			assertEquals(POStatus.STATUS_OK, res.returnStatus);
+			assertEquals(null, res.result);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(new DataByteArray(GenRandomData.genRandString(r).getBytes()));
+			plan.attachInput(t);
+			DataBag b = null;
+			Result res = op.getNext(b);
+			//assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+			assertEquals(POStatus.STATUS_OK, res.returnStatus);
+			assertEquals(null, res.result);
+		}
+	}
+	
+	@Test
+	public void testTupleToOther() throws PlanException, ExecException {
+		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		ExprPlan plan = new ExprPlan();
+		plan.add(prj);
+		plan.add(op);
+		plan.connect(prj, op);
+		
+		prj.setResultType(DataType.TUPLE);
+		
+		TupleFactory tf = TupleFactory.getInstance();
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			Tuple tNew = tf.newTuple();
+			tNew.append(t);
+			plan.attachInput(tNew);
+			Map map = null;
+			Result res = op.getNext(map);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			Tuple tNew = tf.newTuple();
+			tNew.append(t);
+			plan.attachInput(tNew);
+			Result res = op.getNext(t);
+			//System.out.println(res.result + " : " + t);
+			assertEquals(t, res.result);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			Tuple tNew = tf.newTuple();
+			tNew.append(t);
+			plan.attachInput(tNew);
+			DataBag b = null;
+			Result res = op.getNext(b);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			Tuple tNew = tf.newTuple();
+			tNew.append(t);
+			plan.attachInput(tNew);
+			Integer i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			Tuple tNew = tf.newTuple();
+			tNew.append(t);
+			plan.attachInput(tNew);
+			Long i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			Tuple tNew = tf.newTuple();
+			tNew.append(t);
+			plan.attachInput(tNew);
+			Float i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			Tuple tNew = tf.newTuple();
+			tNew.append(t);
+			plan.attachInput(tNew);
+			Double i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			Tuple tNew = tf.newTuple();
+			tNew.append(t);
+			plan.attachInput(tNew);
+			String i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandString(r));
+			Tuple tNew = tf.newTuple();
+			tNew.append(t);
+			plan.attachInput(tNew);
+			DataByteArray i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+	}
+	
+	@Test
+	public void testBagToOther() throws PlanException, ExecException {
+		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		ExprPlan plan = new ExprPlan();
+		plan.add(prj);
+		plan.add(op);
+		plan.connect(prj, op);
+		
+		prj.setResultType(DataType.BAG);
+		
+		TupleFactory tf = TupleFactory.getInstance();
+		BagFactory bf = BagFactory.getInstance();
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+			plan.attachInput(t);
+			Map map = null;
+			Result res = op.getNext(map);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+			plan.attachInput(t);
+			Result res = op.getNext(t);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+			plan.attachInput(t);
+			DataBag b = (DataBag) t.get(0);
+			Result res = op.getNext(b);
+			//System.out.println(res.result + " : " + t);
+			assertEquals(b, res.result);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+			plan.attachInput(t);
+			Integer i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+			plan.attachInput(t);
+			Long i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+			plan.attachInput(t);
+			Float i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+			plan.attachInput(t);
+			Double i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+			plan.attachInput(t);
+			String i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+			plan.attachInput(t);
+			DataByteArray i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+	}
+	
+	@Test
+	public void testMapToOther() throws PlanException, ExecException {
+		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+		ExprPlan plan = new ExprPlan();
+		plan.add(prj);
+		plan.add(op);
+		plan.connect(prj, op);
+		
+		prj.setResultType(DataType.MAP);
+		
+		TupleFactory tf = TupleFactory.getInstance();
+		BagFactory bf = BagFactory.getInstance();
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandMap(r, 10));
+			plan.attachInput(t);
+			Map map = (Map) t.get(0);
+			Result res = op.getNext(map);
+			//System.out.println(res.result + " : " + t);
+			assertEquals(map, res.result);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandMap(r, 10));
+			plan.attachInput(t);
+			Result res = op.getNext(t);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandMap(r, 10));
+			plan.attachInput(t);
+			DataBag b = null;
+			Result res = op.getNext(b);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandMap(r, 10));
+			Integer i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandMap(r, 10));
+			Long i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandMap(r, 10));
+			Float i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandMap(r, 10));
+			Double i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandMap(r, 10));
+			String i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+		
+		{
+			Tuple t = tf.newTuple();
+			t.append(GenRandomData.genRandMap(r, 10));
+			DataByteArray i = null;
+			Result res = op.getNext(i);
+			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+		}
+	}
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=657934&r1=657933&r2=657934&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Mon May 19 12:33:42 2008
@@ -48,7 +48,7 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
+//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCastDummy;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Add;
@@ -220,7 +220,7 @@
         prj1.setResultType(sample.getType(grpCol));
         prj1.setOverloaded(false);
         
-        POCast cst = new POCast(new OperatorKey("",r.nextLong()));
+        POCastDummy cst = new POCastDummy(new OperatorKey("",r.nextLong()));
         cst.setResultType(sample.getType(grpCol));
 
         List<Boolean> toBeFlattened = new LinkedList<Boolean>();
@@ -236,14 +236,14 @@
         inputs.add(plan1);
         
         POProject rest[] = new POProject[sample.size()];
-        POCast csts[] = new POCast[sample.size()];
+        POCastDummy csts[] = new POCastDummy[sample.size()];
         int i=-1;
         for (POProject project : rest) {
             project = new POProject(new OperatorKey("", r.nextLong()), -1, ++i);
             project.setResultType(sample.getType(i));
             project.setOverloaded(false);
             
-            csts[i] = new POCast(new OperatorKey("",r.nextLong()));
+            csts[i] = new POCastDummy(new OperatorKey("",r.nextLong()));
             csts[i].setResultType(sample.getType(i));
             
             ExprPlan pl = new ExprPlan();
@@ -347,7 +347,7 @@
         }
         
         
-        POCast[] cst = new POCast[fields.length];
+        POCastDummy[] cst = new POCastDummy[fields.length];
 
         List<Boolean> toBeFlattened = new LinkedList<Boolean>();
         for (POProject project : prj)
@@ -360,7 +360,7 @@
         for (int i=0;i<plans.length;i++) {
             plans[i] = new ExprPlan();
             plans[i].add(prj[i]);
-            cst[i] = new POCast(new OperatorKey("",r.nextLong()));
+            cst[i] = new POCastDummy(new OperatorKey("",r.nextLong()));
             cst[i].setResultType(sample.getType(fields[i]));
             plans[i].add(cst[i]);
             plans[i].connect(prj[i], cst[i]);
@@ -395,7 +395,7 @@
         }
         
         
-        POCast[] cst = new POCast[fields.length];
+        POCastDummy[] cst = new POCastDummy[fields.length];
 
         /*List<Boolean> toBeFlattened = new LinkedList<Boolean>();
         for (POProject project : prj)
@@ -406,7 +406,7 @@
         for (int i=0;i<plans.length;i++) {
             plans[i] = new ExprPlan();
             plans[i].add(prj[i]);
-            cst[i] = new POCast(new OperatorKey("",r.nextLong()));
+            cst[i] = new POCastDummy(new OperatorKey("",r.nextLong()));
             cst[i].setResultType(sample.getType(fields[i]));
             plans[i].add(cst[i]);
             plans[i].connect(prj[i], cst[i]);
@@ -431,7 +431,7 @@
         POLocalRearrange lr = topLocalRearrangeOPWithPlanPlain(index, grpCol, sample);
         List<ExprPlan> plans = lr.getPlans(); 
         ExprPlan ep = plans.get(0);
-        POCast cst = new POCast(new OperatorKey("", r.nextLong()));
+        POCastDummy cst = new POCastDummy(new OperatorKey("", r.nextLong()));
         cst.setResultType(sample.getType(grpCol));
         ep.addAsLeaf(cst);
         lr.setPlans(plans);
@@ -684,7 +684,7 @@
             break;
         }
         
-        POCast cst = new POCast(new OperatorKey("",r.nextLong()));
+        POCastDummy cst = new POCastDummy(new OperatorKey("",r.nextLong()));
         
         cop.setLhs(cst);
         cop.setRhs(ce2);

Added: incubator/pig/branches/types/test/org/apache/pig/test/utils/POCastDummy.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/POCastDummy.java?rev=657934&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/POCastDummy.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/POCastDummy.java Mon May 19 12:33:42 2008
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This is just a cast that converts DataByteArray into either
+ * String or Integer. Just added it for testing the POUnion. 
+ * Need the full operator implementation.
+ */
+public class POCastDummy extends ExpressionOperator {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public POCastDummy(OperatorKey k) {
+        super(k);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCastDummy(OperatorKey k, int rp) {
+        super(k, rp);
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public void visit(ExprPlanVisitor v) throws VisitorException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public String name() {
+        return "Cast - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public Result getNext(Integer i) throws ExecException {
+        Result res = inputs.get(0).getNext(i);
+
+        if(res.returnStatus != POStatus.STATUS_OK){
+            return res;
+        }
+        
+        if(res.result instanceof DataByteArray){
+            String rslt = ((DataByteArray)res.result).toString();
+            res.result = Integer.parseInt(rslt.trim());
+            return res;
+        }
+        return new Result();
+    }
+
+    @Override
+    public Result getNext(String s) throws ExecException {
+        Result res = inputs.get(0).getNext(s);
+
+        if(res.returnStatus != POStatus.STATUS_OK){
+            return res;
+        }
+        
+        if(res.result instanceof DataByteArray){
+            String rslt = ((DataByteArray)res.result).toString();
+            res.result = rslt;
+            return res;
+        }
+        return new Result();
+    }
+    
+    
+
+}