You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2009/01/24 19:00:54 UTC

svn commit: r737407 - in /hadoop/pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java

Author: sms
Date: Sat Jan 24 18:00:54 2009
New Revision: 737407

URL: http://svn.apache.org/viewvc?rev=737407&view=rev
Log:
PIG-635: POCast.java has incorrect formatting

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=737407&r1=737406&r2=737407&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Sat Jan 24 18:00:54 2009
@@ -376,3 +376,5 @@
     PIG-589: error handling, phase 1-2 (sms via olgan)
 
     PIG-615: Wrong number of jobs with limit (shravanmn via sms)
+
+    PIG-635: POCast.java has incorrect formatting (sms)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=737407&r1=737406&r2=737407&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Sat Jan 24 18:00:54 2009
@@ -45,996 +45,997 @@
  * implementation.
  */
 public class POCast extends ExpressionOperator {
-	private FuncSpec loadFSpec = null;
-	transient private LoadFunc load;
-	private Log log = LogFactory.getLog(getClass());
-	private boolean castNotNeeded = false;
-
-	private static final long serialVersionUID = 1L;
-
-	public POCast(OperatorKey k) {
-		super(k);
-		// TODO Auto-generated constructor stub
-	}
-
-	public POCast(OperatorKey k, int rp) {
-		super(k, rp);
-		// TODO Auto-generated constructor stub
-	}
-
-	private void instantiateFunc() {
-		if (load != null)
-			return;
-		if (this.loadFSpec != null) {
-			this.load = (LoadFunc) PigContext
-					.instantiateFuncFromSpec(this.loadFSpec);
-		}
-	}
-
-	public void setLoadFSpec(FuncSpec lf) {
-		this.loadFSpec = lf;
-		instantiateFunc();
-	}
-
-	@Override
-	public void visit(PhyPlanVisitor v) throws VisitorException {
-		v.visitCast(this);
-
-	}
-
-	@Override
-	public String name() {
-		return "Cast" + "[" + DataType.findTypeName(resultType) + "]" + " - "
-				+ mKey.toString();
-	}
-
-	@Override
-	public boolean supportsMultipleInputs() {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public Result getNext(Integer i) throws ExecException {
-		PhysicalOperator in = inputs.get(0);
-		Byte castToType = DataType.INTEGER;
-		Byte resultType = in.getResultType();
-		switch (resultType) {
-		case DataType.BAG: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.TUPLE: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.BYTEARRAY: {
-			DataByteArray dba = null;
-			Result res = in.getNext(dba);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = new
-				// Integer(Integer.valueOf((((DataByteArray)res.result).toString())));
-				if (castNotNeeded) {
-					// we examined the data once before and
-					// determined that the input is the same
-					// type as the type we are casting to
-					// so just send the input out as output
-					return res;
-				}
-				try {
-					dba = (DataByteArray) res.result;
-				} catch (ClassCastException e) {
-					// check if the type of res.result is
-					// same as the type we are trying to cast to
-					if (DataType.findType(res.result) == castToType) {
-						// remember this for future calls
-						castNotNeeded = true;
-						// just return the output
-						return res;
-					} else {
-						// the input is a differen type
-						// rethrow the exception
-						throw e;
-					}
-
-				}
-				try {
-					if (null != load) {
-						res.result = load.bytesToInteger(dba.get());
-					} else {
-						String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to int."
-								+ " castToType: "
-								+ castToType
-								+ " name: "
-								+ DataType.findTypeName(castToType);
-						log.error(msg);
-						throw new ExecException(msg);
-					}
-				} catch (ExecException ee) {
-					throw ee;
-				} catch (IOException e) {
-					log.error("Error while casting from ByteArray to Integer");
-				}
-			}
-			return res;
-		}
-
-		case DataType.MAP: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.BOOLEAN: {
-			Boolean b = null;
-			Result res = in.getNext(b);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				if (((Boolean) res.result) == true)
-					res.result = new Integer(1);
-				else
-					res.result = new Integer(0);
-			}
-			return res;
-		}
-		case DataType.INTEGER: {
-
-			Result res = in.getNext(i);
-			return res;
-		}
-
-		case DataType.DOUBLE: {
-			Double d = null;
-			Result res = in.getNext(d);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = DataType.toInteger(res.result);
-				res.result = new Integer(((Double) res.result).intValue());
-			}
-			return res;
-		}
-
-		case DataType.LONG: {
-			Long l = null;
-			Result res = in.getNext(l);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Integer(((Long) res.result).intValue());
-			}
-			return res;
-		}
-
-		case DataType.FLOAT: {
-			Float f = null;
-			Result res = in.getNext(f);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Integer(((Float) res.result).intValue());
-			}
-			return res;
-		}
-
-		case DataType.CHARARRAY: {
-			String str = null;
-			Result res = in.getNext(str);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Integer(Integer.valueOf((String) res.result));
-			}
-			return res;
-		}
-
-		}
-
-		Result res = new Result();
-		res.returnStatus = POStatus.STATUS_ERR;
-		return res;
-	}
-
-	@Override
-	public Result getNext(Long l) throws ExecException {
-		PhysicalOperator in = inputs.get(0);
-		Byte castToType = DataType.LONG;
-		Byte resultType = in.getResultType();
-		switch (resultType) {
-		case DataType.BAG: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.TUPLE: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.MAP: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.BYTEARRAY: {
-			DataByteArray dba = null;
-			Result res = in.getNext(dba);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				if (castNotNeeded) {
-					// we examined the data once before and
-					// determined that the input is the same
-					// type as the type we are casting to
-					// so just send the input out as output
-					return res;
-				}
-				// res.result = new
-				// Long(Long.valueOf((((DataByteArray)res.result).toString())));
-				try {
-					dba = (DataByteArray) res.result;
-				} catch (ClassCastException e) {
-					// check if the type of res.result is
-					// same as the type we are trying to cast to
-					if (DataType.findType(res.result) == castToType) {
-						// remember this for future calls
-						castNotNeeded = true;
-						// just return the output
-						return res;
-					} else {
-						// the input is a differen type
-						// rethrow the exception
-						throw e;
-					}
-
-				}
-				try {
-					if (null != load) {
-						res.result = load.bytesToLong(dba.get());
-					} else {
-						String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to long.";
-						log.error(msg);
-						throw new ExecException(msg);
-					}
-				} catch (ExecException ee) {
-					throw ee;
-				} catch (IOException e) {
-					log.error("Error while casting from ByteArray to Long");
-				}
-			}
-			return res;
-		}
-
-		case DataType.BOOLEAN: {
-			Boolean b = null;
-			Result res = in.getNext(b);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				if (((Boolean) res.result) == true)
-					res.result = new Long(1);
-				else
-					res.result = new Long(0);
-			}
-			return res;
-		}
-		case DataType.INTEGER: {
-			Integer dummyI = null;
-			Result res = in.getNext(dummyI);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Long(((Integer) res.result).longValue());
-			}
-			return res;
-		}
-
-		case DataType.DOUBLE: {
-			Double d = null;
-			Result res = in.getNext(d);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = DataType.toInteger(res.result);
-				res.result = new Long(((Double) res.result).longValue());
-			}
-			return res;
-		}
-
-		case DataType.LONG: {
-
-			Result res = in.getNext(l);
-
-			return res;
-		}
-
-		case DataType.FLOAT: {
-			Float f = null;
-			Result res = in.getNext(f);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Long(((Float) res.result).longValue());
-			}
-			return res;
-		}
-
-		case DataType.CHARARRAY: {
-			String str = null;
-			Result res = in.getNext(str);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Long(Long.valueOf((String) res.result));
-			}
-			return res;
-		}
-
-		}
-
-		Result res = new Result();
-		res.returnStatus = POStatus.STATUS_ERR;
-		return res;
-	}
-
-	@Override
-	public Result getNext(Double d) throws ExecException {
-		PhysicalOperator in = inputs.get(0);
-		Byte castToType = DataType.DOUBLE;
-		Byte resultType = in.getResultType();
-		switch (resultType) {
-		case DataType.BAG: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.TUPLE: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.MAP: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.BYTEARRAY: {
-			DataByteArray dba = null;
-			Result res = in.getNext(dba);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = new
-				// Double(Double.valueOf((((DataByteArray)res.result).toString())));
-				if (castNotNeeded) {
-					// we examined the data once before and
-					// determined that the input is the same
-					// type as the type we are casting to
-					// so just send the input out as output
-					return res;
-				}
-				try {
-					dba = (DataByteArray) res.result;
-				} catch (ClassCastException e) {
-					// check if the type of res.result is
-					// same as the type we are trying to cast to
-					if (DataType.findType(res.result) == castToType) {
-						// remember this for future calls
-						castNotNeeded = true;
-						// just return the output
-						return res;
-					} else {
-						// the input is a differen type
-						// rethrow the exception
-						throw e;
-					}
-
-				}
-				try {
-					if (null != load) {
-						res.result = load.bytesToDouble(dba.get());
-					} else {
-						String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to double.";
-						log.error(msg);
-						throw new ExecException(msg);
-					}
-				} catch (ExecException ee) {
-					throw ee;
-				} catch (IOException e) {
-					log.error("Error while casting from ByteArray to Double");
-				}
-			}
-			return res;
-		}
-
-		case DataType.BOOLEAN: {
-			Boolean b = null;
-			Result res = in.getNext(b);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				if (((Boolean) res.result) == true)
-					res.result = new Double(1);
-				else
-					res.result = new Double(0);
-			}
-			return res;
-		}
-		case DataType.INTEGER: {
-			Integer dummyI = null;
-			Result res = in.getNext(dummyI);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Double(((Integer) res.result).doubleValue());
-			}
-			return res;
-		}
-
-		case DataType.DOUBLE: {
-
-			Result res = in.getNext(d);
-
-			return res;
-		}
-
-		case DataType.LONG: {
-			Long l = null;
-			Result res = in.getNext(l);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Double(((Long) res.result).doubleValue());
-			}
-			return res;
-		}
-
-		case DataType.FLOAT: {
-			Float f = null;
-			Result res = in.getNext(f);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Double(((Float) res.result).doubleValue());
-			}
-			return res;
-		}
-
-		case DataType.CHARARRAY: {
-			String str = null;
-			Result res = in.getNext(str);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Double(Double.valueOf((String) res.result));
-			}
-			return res;
-		}
-
-		}
-
-		Result res = new Result();
-		res.returnStatus = POStatus.STATUS_ERR;
-		return res;
-	}
-
-	@Override
-	public Result getNext(Float f) throws ExecException {
-		PhysicalOperator in = inputs.get(0);
-		Byte castToType = DataType.FLOAT;
-		Byte resultType = in.getResultType();
-		switch (resultType) {
-		case DataType.BAG: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.TUPLE: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.MAP: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.BYTEARRAY: {
-			DataByteArray dba = null;
-			Result res = in.getNext(dba);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = new
-				// Float(Float.valueOf((((DataByteArray)res.result).toString())));
-				if (castNotNeeded) {
-					// we examined the data once before and
-					// determined that the input is the same
-					// type as the type we are casting to
-					// so just send the input out as output
-					return res;
-				}
-				try {
-					dba = (DataByteArray) res.result;
-				} catch (ClassCastException e) {
-					// check if the type of res.result is
-					// same as the type we are trying to cast to
-					if (DataType.findType(res.result) == castToType) {
-						// remember this for future calls
-						castNotNeeded = true;
-						// just return the output
-						return res;
-					} else {
-						// the input is a differen type
-						// rethrow the exception
-						throw e;
-					}
-
-				}
-				try {
-					if (null != load) {
-						res.result = load.bytesToFloat(dba.get());
-					} else {
-						String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to float.";
-						log.error(msg);
-						throw new ExecException(msg);
-					}
-				} catch (ExecException ee) {
-					throw ee;
-				} catch (IOException e) {
-					log.error("Error while casting from ByteArray to Float");
-				}
-			}
-			return res;
-		}
-
-		case DataType.BOOLEAN: {
-			Boolean b = null;
-			Result res = in.getNext(b);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				if (((Boolean) res.result) == true)
-					res.result = new Float(1);
-				else
-					res.result = new Float(0);
-			}
-			return res;
-		}
-		case DataType.INTEGER: {
-			Integer dummyI = null;
-			Result res = in.getNext(dummyI);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Float(((Integer) res.result).floatValue());
-			}
-			return res;
-		}
-
-		case DataType.DOUBLE: {
-			Double d = null;
-			Result res = in.getNext(d);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = DataType.toInteger(res.result);
-				res.result = new Float(((Double) res.result).floatValue());
-			}
-			return res;
-		}
-
-		case DataType.LONG: {
-
-			Long l = null;
-			Result res = in.getNext(l);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Float(((Long) res.result).floatValue());
-			}
-			return res;
-		}
-
-		case DataType.FLOAT: {
-
-			Result res = in.getNext(f);
-
-			return res;
-		}
-
-		case DataType.CHARARRAY: {
-			String str = null;
-			Result res = in.getNext(str);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new Float(Float.valueOf((String) res.result));
-			}
-			return res;
-		}
-
-		}
-
-		Result res = new Result();
-		res.returnStatus = POStatus.STATUS_ERR;
-		return res;
-	}
-
-	@Override
-	public Result getNext(String str) throws ExecException {
-		PhysicalOperator in = inputs.get(0);
-		Byte castToType = DataType.CHARARRAY;
-		Byte resultType = in.getResultType();
-		switch (resultType) {
-		case DataType.BAG: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.TUPLE: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.MAP: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		case DataType.BYTEARRAY: {
-			DataByteArray dba = null;
-			Result res = in.getNext(dba);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = new
-				// String(((DataByteArray)res.result).toString());
-				if (castNotNeeded) {
-					// we examined the data once before and
-					// determined that the input is the same
-					// type as the type we are casting to
-					// so just send the input out as output
-					return res;
-				}
-				try {
-					dba = (DataByteArray) res.result;
-				} catch (ClassCastException e) {
-					// check if the type of res.result is
-					// same as the type we are trying to cast to
-					if (DataType.findType(res.result) == castToType) {
-						// remember this for future calls
-						castNotNeeded = true;
-						// just return the output
-						return res;
-					} else {
-						// the input is a differen type
-						// rethrow the exception
-						throw e;
-					}
-
-				}
-				try {
-					if (null != load) {
-						res.result = load.bytesToCharArray(dba.get());
-					} else {
-						String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to string.";
-						log.error(msg);
-						throw new ExecException(msg);
-					}
-				} catch (ExecException ee) {
-					throw ee;
-				} catch (IOException e) {
-					log.error("Error while casting from ByteArray to CharArray");
-				}
-			}
-			return res;
-		}
-
-		case DataType.BOOLEAN: {
-			Boolean b = null;
-			Result res = in.getNext(b);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				if (((Boolean) res.result) == true)
-					res.result = new String("1");
-				else
-					res.result = new String("1");
-			}
-			return res;
-		}
-		case DataType.INTEGER: {
-			Integer dummyI = null;
-			Result res = in.getNext(dummyI);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new String(((Integer) res.result).toString());
-			}
-			return res;
-		}
-
-		case DataType.DOUBLE: {
-			Double d = null;
-			Result res = in.getNext(d);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = DataType.toInteger(res.result);
-				res.result = new String(((Double) res.result).toString());
-			}
-			return res;
-		}
-
-		case DataType.LONG: {
-
-			Long l = null;
-			Result res = in.getNext(l);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new String(((Long) res.result).toString());
-			}
-			return res;
-		}
-
-		case DataType.FLOAT: {
-			Float f = null;
-			Result res = in.getNext(f);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				res.result = new String(((Float) res.result).toString());
-			}
-			return res;
-		}
-
-		case DataType.CHARARRAY: {
-
-			Result res = in.getNext(str);
-
-			return res;
-		}
-
-		}
-
-		Result res = new Result();
-		res.returnStatus = POStatus.STATUS_ERR;
-		return res;
-	}
-
-	@Override
-	public Result getNext(Tuple t) throws ExecException {
-		PhysicalOperator in = inputs.get(0);
-		Byte castToType = DataType.TUPLE;
-		Byte resultType = in.getResultType();
-		switch (resultType) {
-
-		case DataType.TUPLE: {
-			Result res = in.getNext(t);
-			return res;
-		}
-
-		case DataType.BYTEARRAY: {
-			DataByteArray dba = null;
-			Result res = in.getNext(dba);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = new
-				// String(((DataByteArray)res.result).toString());
-				if (castNotNeeded) {
-					// we examined the data once before and
-					// determined that the input is the same
-					// type as the type we are casting to
-					// so just send the input out as output
-					return res;
-				}
-				try {
-					dba = (DataByteArray) res.result;
-				} catch (ClassCastException e) {
-					// check if the type of res.result is
-					// same as the type we are trying to cast to
-					if (DataType.findType(res.result) == castToType) {
-						// remember this for future calls
-						castNotNeeded = true;
-						// just return the output
-						return res;
-					} else {
-						// the input is a differen type
-						// rethrow the exception
-						throw e;
-					}
-
-				}
-				try {
-					if (null != load) {
-						res.result = load.bytesToTuple(dba.get());
-					} else {
-						String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to tuple.";
-						log.error(msg);
-						throw new ExecException(msg);
-					}
-				} catch (ExecException ee) {
-					throw ee;
-				} catch (IOException e) {
-					log.error("Error while casting from ByteArray to Tuple");
-				}
-			}
-			return res;
-		}
-
-		case DataType.BAG:
-
-		case DataType.MAP:
-
-		case DataType.INTEGER:
-
-		case DataType.DOUBLE:
-
-		case DataType.LONG:
-
-		case DataType.FLOAT:
-
-		case DataType.CHARARRAY:
-
-		case DataType.BOOLEAN: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		}
-
-		Result res = new Result();
-		res.returnStatus = POStatus.STATUS_ERR;
-		return res;
-	}
-
-	@Override
-	public Result getNext(DataBag bag) throws ExecException {
-		PhysicalOperator in = inputs.get(0);
-		Byte castToType = DataType.BAG;
-		Byte resultType = in.getResultType();
-		switch (resultType) {
-
-		case DataType.BAG: {
-			Result res = in.getNext(bag);
-			return res;
-		}
-
-		case DataType.BYTEARRAY: {
-			DataByteArray dba = null;
-			Result res = in.getNext(dba);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = new
-				// String(((DataByteArray)res.result).toString());
-				if (castNotNeeded) {
-					// we examined the data once before and
-					// determined that the input is the same
-					// type as the type we are casting to
-					// so just send the input out as output
-					return res;
-				}
-				try {
-					dba = (DataByteArray) res.result;
-				} catch (ClassCastException e) {
-					// check if the type of res.result is
-					// same as the type we are trying to cast to
-					if (DataType.findType(res.result) == castToType) {
-						// remember this for future calls
-						castNotNeeded = true;
-						// just return the output
-						return res;
-					} else {
-						// the input is a differen type
-						// rethrow the exception
-						throw e;
-					}
-
-				}
-				try {
-					if (null != load) {
-						res.result = load.bytesToBag(dba.get());
-					} else {
-						String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to bag.";
-						log.error(msg);
-						throw new ExecException(msg);
-					}
-				} catch (ExecException ee) {
-					throw ee;
-				} catch (IOException e) {
-					log.error("Error while casting from ByteArray to DataBag");
-				}
-			}
-			return res;
-		}
-
-		case DataType.TUPLE:
-
-		case DataType.MAP:
-
-		case DataType.INTEGER:
-
-		case DataType.DOUBLE:
-
-		case DataType.LONG:
-
-		case DataType.FLOAT:
-
-		case DataType.CHARARRAY:
-
-		case DataType.BOOLEAN: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		}
-
-		Result res = new Result();
-		res.returnStatus = POStatus.STATUS_ERR;
-		return res;
-	}
-
-	@Override
-	public Result getNext(Map m) throws ExecException {
-		PhysicalOperator in = inputs.get(0);
-		Byte castToType = DataType.MAP;
-		Byte resultType = in.getResultType();
-		switch (resultType) {
-
-		case DataType.MAP: {
-			Result res = in.getNext(m);
-			return res;
-		}
-
-		case DataType.BYTEARRAY: {
-			DataByteArray dba = null;
-			Result res = in.getNext(dba);
-			if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-				// res.result = new
-				// String(((DataByteArray)res.result).toString());
-				if (castNotNeeded) {
-					// we examined the data once before and
-					// determined that the input is the same
-					// type as the type we are casting to
-					// so just send the input out as output
-					return res;
-				}
-				try {
-					dba = (DataByteArray) res.result;
-				} catch (ClassCastException e) {
-					// check if the type of res.result is
-					// same as the type we are trying to cast to
-					if (DataType.findType(res.result) == castToType) {
-						// remember this for future calls
-						castNotNeeded = true;
-						// just return the output
-						return res;
-					} else {
-						// the input is a differen type
-						// rethrow the exception
-						throw e;
-					}
-
-				}
-				try {
-					if (null != load) {
-						res.result = load.bytesToMap(dba.get());
-					} else {
-						String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to map.";
-						log.error(msg);
-						throw new ExecException(msg);
-					}
-        		} catch (ExecException ee) {
-        			throw ee;
-				} catch (IOException e) {
-					log.error("Error while casting from ByteArray to Map");
-				}
-			}
-			return res;
-		}
-
-		case DataType.TUPLE:
-
-		case DataType.BAG:
-
-		case DataType.INTEGER:
-
-		case DataType.DOUBLE:
-
-		case DataType.LONG:
-
-		case DataType.FLOAT:
-
-		case DataType.CHARARRAY:
-
-		case DataType.BOOLEAN: {
-			Result res = new Result();
-			res.returnStatus = POStatus.STATUS_ERR;
-			return res;
-		}
-
-		}
-
-		Result res = new Result();
-		res.returnStatus = POStatus.STATUS_ERR;
-		return res;
-	}
-
-	private void readObject(ObjectInputStream is) throws IOException,
-			ClassNotFoundException {
-		is.defaultReadObject();
-		instantiateFunc();
-	}
-
-	@Override
-	public POCast clone() throws CloneNotSupportedException {
-		POCast clone = new POCast(new OperatorKey(mKey.scope, NodeIdGenerator
-				.getGenerator().getNextNodeId(mKey.scope)));
-		clone.cloneHelper(this);
-		clone.loadFSpec = loadFSpec;
-		clone.instantiateFunc();
-		return clone;
-	}
+    private FuncSpec loadFSpec = null;
+    transient private LoadFunc load;
+    private Log log = LogFactory.getLog(getClass());
+    private boolean castNotNeeded = false;
+
+    private static final long serialVersionUID = 1L;
+
+    public POCast(OperatorKey k) {
+        super(k);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCast(OperatorKey k, int rp) {
+        super(k, rp);
+        // TODO Auto-generated constructor stub
+    }
+
+    private void instantiateFunc() {
+        if (load != null)
+            return;
+        if (this.loadFSpec != null) {
+            this.load = (LoadFunc) PigContext
+                    .instantiateFuncFromSpec(this.loadFSpec);
+        }
+    }
+
+    public void setLoadFSpec(FuncSpec lf) {
+        this.loadFSpec = lf;
+        instantiateFunc();
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitCast(this);
+
+    }
+
+    @Override
+    public String name() {
+        return "Cast" + "[" + DataType.findTypeName(resultType) + "]" + " - "
+                + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public Result getNext(Integer i) throws ExecException {
+        PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.INTEGER;
+        Byte resultType = in.getResultType();
+        switch (resultType) {
+        case DataType.BAG: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.TUPLE: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.BYTEARRAY: {
+            DataByteArray dba = null;
+            Result res = in.getNext(dba);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = new
+                // Integer(Integer.valueOf((((DataByteArray)res.result).toString())));
+                if (castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+                try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if (DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
+                try {
+                    if (null != load) {
+                        res.result = load.bytesToInteger(dba.get());
+                    } else {
+                        String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to int."
+                                + " castToType: "
+                                + castToType
+                                + " name: "
+                                + DataType.findTypeName(castToType);
+                        log.error(msg);
+                        throw new ExecException(msg);
+                    }
+                } catch (ExecException ee) {
+                    throw ee;
+                } catch (IOException e) {
+                    log.error("Error while casting from ByteArray to Integer");
+                }
+            }
+            return res;
+        }
+
+        case DataType.MAP: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.BOOLEAN: {
+            Boolean b = null;
+            Result res = in.getNext(b);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                if (((Boolean) res.result) == true)
+                    res.result = new Integer(1);
+                else
+                    res.result = new Integer(0);
+            }
+            return res;
+        }
+        case DataType.INTEGER: {
+
+            Result res = in.getNext(i);
+            return res;
+        }
+
+        case DataType.DOUBLE: {
+            Double d = null;
+            Result res = in.getNext(d);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = DataType.toInteger(res.result);
+                res.result = new Integer(((Double) res.result).intValue());
+            }
+            return res;
+        }
+
+        case DataType.LONG: {
+            Long l = null;
+            Result res = in.getNext(l);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Integer(((Long) res.result).intValue());
+            }
+            return res;
+        }
+
+        case DataType.FLOAT: {
+            Float f = null;
+            Result res = in.getNext(f);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Integer(((Float) res.result).intValue());
+            }
+            return res;
+        }
+
+        case DataType.CHARARRAY: {
+            String str = null;
+            Result res = in.getNext(str);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Integer(Integer.valueOf((String) res.result));
+            }
+            return res;
+        }
+
+        }
+
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+
+    @Override
+    public Result getNext(Long l) throws ExecException {
+        PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.LONG;
+        Byte resultType = in.getResultType();
+        switch (resultType) {
+        case DataType.BAG: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.TUPLE: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.MAP: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.BYTEARRAY: {
+            DataByteArray dba = null;
+            Result res = in.getNext(dba);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                if (castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+                // res.result = new
+                // Long(Long.valueOf((((DataByteArray)res.result).toString())));
+                try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if (DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
+                try {
+                    if (null != load) {
+                        res.result = load.bytesToLong(dba.get());
+                    } else {
+                        String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to long.";
+                        log.error(msg);
+                        throw new ExecException(msg);
+                    }
+                } catch (ExecException ee) {
+                    throw ee;
+                } catch (IOException e) {
+                    log.error("Error while casting from ByteArray to Long");
+                }
+            }
+            return res;
+        }
+
+        case DataType.BOOLEAN: {
+            Boolean b = null;
+            Result res = in.getNext(b);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                if (((Boolean) res.result) == true)
+                    res.result = new Long(1);
+                else
+                    res.result = new Long(0);
+            }
+            return res;
+        }
+        case DataType.INTEGER: {
+            Integer dummyI = null;
+            Result res = in.getNext(dummyI);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Long(((Integer) res.result).longValue());
+            }
+            return res;
+        }
+
+        case DataType.DOUBLE: {
+            Double d = null;
+            Result res = in.getNext(d);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = DataType.toInteger(res.result);
+                res.result = new Long(((Double) res.result).longValue());
+            }
+            return res;
+        }
+
+        case DataType.LONG: {
+
+            Result res = in.getNext(l);
+
+            return res;
+        }
+
+        case DataType.FLOAT: {
+            Float f = null;
+            Result res = in.getNext(f);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Long(((Float) res.result).longValue());
+            }
+            return res;
+        }
+
+        case DataType.CHARARRAY: {
+            String str = null;
+            Result res = in.getNext(str);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Long(Long.valueOf((String) res.result));
+            }
+            return res;
+        }
+
+        }
+
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+
+    @Override
+    public Result getNext(Double d) throws ExecException {
+        PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.DOUBLE;
+        Byte resultType = in.getResultType();
+        switch (resultType) {
+        case DataType.BAG: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.TUPLE: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.MAP: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.BYTEARRAY: {
+            DataByteArray dba = null;
+            Result res = in.getNext(dba);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = new
+                // Double(Double.valueOf((((DataByteArray)res.result).toString())));
+                if (castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+                try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if (DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
+                try {
+                    if (null != load) {
+                        res.result = load.bytesToDouble(dba.get());
+                    } else {
+                        String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to double.";
+                        log.error(msg);
+                        throw new ExecException(msg);
+                    }
+                } catch (ExecException ee) {
+                    throw ee;
+                } catch (IOException e) {
+                    log.error("Error while casting from ByteArray to Double");
+                }
+            }
+            return res;
+        }
+
+        case DataType.BOOLEAN: {
+            Boolean b = null;
+            Result res = in.getNext(b);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                if (((Boolean) res.result) == true)
+                    res.result = new Double(1);
+                else
+                    res.result = new Double(0);
+            }
+            return res;
+        }
+        case DataType.INTEGER: {
+            Integer dummyI = null;
+            Result res = in.getNext(dummyI);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Double(((Integer) res.result).doubleValue());
+            }
+            return res;
+        }
+
+        case DataType.DOUBLE: {
+
+            Result res = in.getNext(d);
+
+            return res;
+        }
+
+        case DataType.LONG: {
+            Long l = null;
+            Result res = in.getNext(l);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Double(((Long) res.result).doubleValue());
+            }
+            return res;
+        }
+
+        case DataType.FLOAT: {
+            Float f = null;
+            Result res = in.getNext(f);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Double(((Float) res.result).doubleValue());
+            }
+            return res;
+        }
+
+        case DataType.CHARARRAY: {
+            String str = null;
+            Result res = in.getNext(str);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Double(Double.valueOf((String) res.result));
+            }
+            return res;
+        }
+
+        }
+
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+
+    @Override
+    public Result getNext(Float f) throws ExecException {
+        PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.FLOAT;
+        Byte resultType = in.getResultType();
+        switch (resultType) {
+        case DataType.BAG: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.TUPLE: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.MAP: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.BYTEARRAY: {
+            DataByteArray dba = null;
+            Result res = in.getNext(dba);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = new
+                // Float(Float.valueOf((((DataByteArray)res.result).toString())));
+                if (castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+                try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if (DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
+                try {
+                    if (null != load) {
+                        res.result = load.bytesToFloat(dba.get());
+                    } else {
+                        String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to float.";
+                        log.error(msg);
+                        throw new ExecException(msg);
+                    }
+                } catch (ExecException ee) {
+                    throw ee;
+                } catch (IOException e) {
+                    log.error("Error while casting from ByteArray to Float");
+                }
+            }
+            return res;
+        }
+
+        case DataType.BOOLEAN: {
+            Boolean b = null;
+            Result res = in.getNext(b);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                if (((Boolean) res.result) == true)
+                    res.result = new Float(1);
+                else
+                    res.result = new Float(0);
+            }
+            return res;
+        }
+        case DataType.INTEGER: {
+            Integer dummyI = null;
+            Result res = in.getNext(dummyI);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Float(((Integer) res.result).floatValue());
+            }
+            return res;
+        }
+
+        case DataType.DOUBLE: {
+            Double d = null;
+            Result res = in.getNext(d);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = DataType.toInteger(res.result);
+                res.result = new Float(((Double) res.result).floatValue());
+            }
+            return res;
+        }
+
+        case DataType.LONG: {
+
+            Long l = null;
+            Result res = in.getNext(l);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Float(((Long) res.result).floatValue());
+            }
+            return res;
+        }
+
+        case DataType.FLOAT: {
+
+            Result res = in.getNext(f);
+
+            return res;
+        }
+
+        case DataType.CHARARRAY: {
+            String str = null;
+            Result res = in.getNext(str);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new Float(Float.valueOf((String) res.result));
+            }
+            return res;
+        }
+
+        }
+
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+
+    @Override
+    public Result getNext(String str) throws ExecException {
+        PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.CHARARRAY;
+        Byte resultType = in.getResultType();
+        switch (resultType) {
+        case DataType.BAG: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.TUPLE: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.MAP: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        case DataType.BYTEARRAY: {
+            DataByteArray dba = null;
+            Result res = in.getNext(dba);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = new
+                // String(((DataByteArray)res.result).toString());
+                if (castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+                try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if (DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
+                try {
+                    if (null != load) {
+                        res.result = load.bytesToCharArray(dba.get());
+                    } else {
+                        String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to string.";
+                        log.error(msg);
+                        throw new ExecException(msg);
+                    }
+                } catch (ExecException ee) {
+                    throw ee;
+                } catch (IOException e) {
+                    log
+                            .error("Error while casting from ByteArray to CharArray");
+                }
+            }
+            return res;
+        }
+
+        case DataType.BOOLEAN: {
+            Boolean b = null;
+            Result res = in.getNext(b);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                if (((Boolean) res.result) == true)
+                    res.result = new String("1");
+                else
+                    res.result = new String("1");
+            }
+            return res;
+        }
+        case DataType.INTEGER: {
+            Integer dummyI = null;
+            Result res = in.getNext(dummyI);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new String(((Integer) res.result).toString());
+            }
+            return res;
+        }
+
+        case DataType.DOUBLE: {
+            Double d = null;
+            Result res = in.getNext(d);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = DataType.toInteger(res.result);
+                res.result = new String(((Double) res.result).toString());
+            }
+            return res;
+        }
+
+        case DataType.LONG: {
+
+            Long l = null;
+            Result res = in.getNext(l);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new String(((Long) res.result).toString());
+            }
+            return res;
+        }
+
+        case DataType.FLOAT: {
+            Float f = null;
+            Result res = in.getNext(f);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                res.result = new String(((Float) res.result).toString());
+            }
+            return res;
+        }
+
+        case DataType.CHARARRAY: {
+
+            Result res = in.getNext(str);
+
+            return res;
+        }
+
+        }
+
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.TUPLE;
+        Byte resultType = in.getResultType();
+        switch (resultType) {
+
+        case DataType.TUPLE: {
+            Result res = in.getNext(t);
+            return res;
+        }
+
+        case DataType.BYTEARRAY: {
+            DataByteArray dba = null;
+            Result res = in.getNext(dba);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = new
+                // String(((DataByteArray)res.result).toString());
+                if (castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+                try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if (DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
+                try {
+                    if (null != load) {
+                        res.result = load.bytesToTuple(dba.get());
+                    } else {
+                        String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to tuple.";
+                        log.error(msg);
+                        throw new ExecException(msg);
+                    }
+                } catch (ExecException ee) {
+                    throw ee;
+                } catch (IOException e) {
+                    log.error("Error while casting from ByteArray to Tuple");
+                }
+            }
+            return res;
+        }
+
+        case DataType.BAG:
+
+        case DataType.MAP:
+
+        case DataType.INTEGER:
+
+        case DataType.DOUBLE:
+
+        case DataType.LONG:
+
+        case DataType.FLOAT:
+
+        case DataType.CHARARRAY:
+
+        case DataType.BOOLEAN: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        }
+
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+
+    @Override
+    public Result getNext(DataBag bag) throws ExecException {
+        PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.BAG;
+        Byte resultType = in.getResultType();
+        switch (resultType) {
+
+        case DataType.BAG: {
+            Result res = in.getNext(bag);
+            return res;
+        }
+
+        case DataType.BYTEARRAY: {
+            DataByteArray dba = null;
+            Result res = in.getNext(dba);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = new
+                // String(((DataByteArray)res.result).toString());
+                if (castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+                try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if (DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
+                try {
+                    if (null != load) {
+                        res.result = load.bytesToBag(dba.get());
+                    } else {
+                        String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to bag.";
+                        log.error(msg);
+                        throw new ExecException(msg);
+                    }
+                } catch (ExecException ee) {
+                    throw ee;
+                } catch (IOException e) {
+                    log.error("Error while casting from ByteArray to DataBag");
+                }
+            }
+            return res;
+        }
+
+        case DataType.TUPLE:
+
+        case DataType.MAP:
+
+        case DataType.INTEGER:
+
+        case DataType.DOUBLE:
+
+        case DataType.LONG:
+
+        case DataType.FLOAT:
+
+        case DataType.CHARARRAY:
+
+        case DataType.BOOLEAN: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        }
+
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+
+    @Override
+    public Result getNext(Map m) throws ExecException {
+        PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.MAP;
+        Byte resultType = in.getResultType();
+        switch (resultType) {
+
+        case DataType.MAP: {
+            Result res = in.getNext(m);
+            return res;
+        }
+
+        case DataType.BYTEARRAY: {
+            DataByteArray dba = null;
+            Result res = in.getNext(dba);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                // res.result = new
+                // String(((DataByteArray)res.result).toString());
+                if (castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+                try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if (DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
+                try {
+                    if (null != load) {
+                        res.result = load.bytesToMap(dba.get());
+                    } else {
+                        String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to map.";
+                        log.error(msg);
+                        throw new ExecException(msg);
+                    }
+                } catch (ExecException ee) {
+                    throw ee;
+                } catch (IOException e) {
+                    log.error("Error while casting from ByteArray to Map");
+                }
+            }
+            return res;
+        }
+
+        case DataType.TUPLE:
+
+        case DataType.BAG:
+
+        case DataType.INTEGER:
+
+        case DataType.DOUBLE:
+
+        case DataType.LONG:
+
+        case DataType.FLOAT:
+
+        case DataType.CHARARRAY:
+
+        case DataType.BOOLEAN: {
+            Result res = new Result();
+            res.returnStatus = POStatus.STATUS_ERR;
+            return res;
+        }
+
+        }
+
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+
+    private void readObject(ObjectInputStream is) throws IOException,
+            ClassNotFoundException {
+        is.defaultReadObject();
+        instantiateFunc();
+    }
+
+    @Override
+    public POCast clone() throws CloneNotSupportedException {
+        POCast clone = new POCast(new OperatorKey(mKey.scope, NodeIdGenerator
+                .getGenerator().getNextNodeId(mKey.scope)));
+        clone.cloneHelper(this);
+        clone.loadFSpec = loadFSpec;
+        clone.instantiateFunc();
+        return clone;
+    }
 
 }