You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/27 21:43:47 UTC

svn commit: r699726 - in /incubator/pig/branches/types: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java test/org/apache/pig/test/TestPOCast.java

Author: olga
Date: Sat Sep 27 12:43:46 2008
New Revision: 699726

URL: http://svn.apache.org/viewvc?rev=699726&view=rev
Log:
PIG-463: POCast changes

Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=699726&r1=699725&r2=699726&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Sat Sep 27 12:43:46 2008
@@ -257,3 +257,5 @@
     PIG-443:  Illustrate for the Types branch (shubham via olgan)
     
     PIG-376: set job name (olgan)
+
+    PIG-463: POCast changes (pradeepk via olgan)

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=699726&r1=699725&r2=699726&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Sat Sep 27 12:43:46 2008
@@ -47,6 +47,7 @@
     private String loadFSpec;
 	transient private LoadFunc load;
 	private Log log = LogFactory.getLog(getClass());
+    private boolean castNotNeeded = false;
 	
     private static final long serialVersionUID = 1L;
 
@@ -90,6 +91,7 @@
     @Override
     public Result getNext(Integer i) throws ExecException {
     	PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.INTEGER;
     	Byte resultType = in.getResultType();
         switch(resultType) {
         case DataType.BAG : {
@@ -109,7 +111,30 @@
         	Result res = in.getNext(dba);
         	if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
         		//res.result = new Integer(Integer.valueOf((((DataByteArray)res.result).toString())));
-        		dba = (DataByteArray) res.result;
+        	    if(castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+        		try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if(DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded  = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
         		try {
 					res.result = load.bytesToInteger(dba.get());
 				} catch (IOException e) {
@@ -187,6 +212,7 @@
     @Override
     public Result getNext(Long l) throws ExecException {
     	PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.LONG;
     	Byte resultType = in.getResultType();
         switch(resultType) {
         case DataType.BAG : {
@@ -211,8 +237,31 @@
         	DataByteArray dba = null;
         	Result res = in.getNext(dba);
         	if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+        	    if(castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
         		//res.result = new Long(Long.valueOf((((DataByteArray)res.result).toString())));
-        		dba = (DataByteArray) res.result;
+        		try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if(DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded  = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
         		try {
 					res.result = load.bytesToLong(dba.get());
 				} catch (IOException e) {
@@ -285,6 +334,7 @@
     @Override
     public Result getNext(Double d) throws ExecException {
     	PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.DOUBLE;
     	Byte resultType = in.getResultType();
         switch(resultType) {
         case DataType.BAG : {
@@ -310,7 +360,30 @@
         	Result res = in.getNext(dba);
         	if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
         		//res.result = new Double(Double.valueOf((((DataByteArray)res.result).toString())));
-        		dba = (DataByteArray) res.result;
+        	    if(castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+        		try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if(DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded  = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
         		try {
 					res.result = load.bytesToDouble(dba.get());
 				} catch (IOException e) {
@@ -382,6 +455,7 @@
     @Override
     public Result getNext(Float f) throws ExecException {
     	PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.FLOAT;
     	Byte resultType = in.getResultType();
         switch(resultType) {
         case DataType.BAG : {
@@ -407,7 +481,30 @@
         	Result res = in.getNext(dba);
         	if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
         		//res.result = new Float(Float.valueOf((((DataByteArray)res.result).toString())));
-        		dba = (DataByteArray) res.result;
+        	    if(castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+        		try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if(DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded  = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
         		try {
 					res.result = load.bytesToFloat(dba.get());
 				} catch (IOException e) {
@@ -481,6 +578,7 @@
     @Override
     public Result getNext(String str) throws ExecException {
     	PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.CHARARRAY;
     	Byte resultType = in.getResultType();
         switch(resultType) {
         case DataType.BAG : {
@@ -506,7 +604,30 @@
         	Result res = in.getNext(dba);
         	if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
         		//res.result = new String(((DataByteArray)res.result).toString());
-        		dba = (DataByteArray) res.result;
+        	    if(castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+        		try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if(DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded  = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
         		try {
 					res.result = load.bytesToCharArray(dba.get());
 				} catch (IOException e) {
@@ -580,6 +701,7 @@
     @Override
     public Result getNext(Tuple t) throws ExecException {
     	PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.TUPLE;
     	Byte resultType = in.getResultType();
         switch(resultType) {
         
@@ -593,7 +715,30 @@
         	Result res = in.getNext(dba);
         	if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
         		//res.result = new String(((DataByteArray)res.result).toString());
-        		dba = (DataByteArray) res.result;
+        	    if(castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+        		try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if(DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded  = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
         		try {
 					res.result = load.bytesToTuple(dba.get());
 				} catch (IOException e) {
@@ -634,6 +779,7 @@
     @Override
     public Result getNext(DataBag bag) throws ExecException {
     	PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.BAG;
     	Byte resultType = in.getResultType();
         switch(resultType) {
         
@@ -647,7 +793,30 @@
         	Result res = in.getNext(dba);
         	if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
         		//res.result = new String(((DataByteArray)res.result).toString());
-        		dba = (DataByteArray) res.result;
+        	    if(castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+        		try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if(DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded  = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
         		try {
 					res.result = load.bytesToBag(dba.get());
 				} catch (IOException e) {
@@ -688,6 +857,7 @@
     @Override
     public Result getNext(Map m) throws ExecException {
     	PhysicalOperator in = inputs.get(0);
+        Byte castToType = DataType.MAP;
     	Byte resultType = in.getResultType();
         switch(resultType) {
         
@@ -701,7 +871,30 @@
         	Result res = in.getNext(dba);
         	if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
         		//res.result = new String(((DataByteArray)res.result).toString());
-        		dba = (DataByteArray) res.result;
+        	    if(castNotNeeded) {
+                    // we examined the data once before and
+                    // determined that the input is the same
+                    // type as the type we are casting to
+                    // so just send the input out as output
+                    return res;
+                }
+        		try {
+                    dba = (DataByteArray) res.result;
+                } catch (ClassCastException e) {
+                    // check if the type of res.result is
+                    // same as the type we are trying to cast to
+                    if(DataType.findType(res.result) == castToType) {
+                        // remember this for future calls
+                        castNotNeeded  = true;
+                        // just return the output
+                        return res;
+                    } else {
+                        // the input is a differen type
+                        // rethrow the exception
+                        throw e;
+                    }
+
+                }
         		try {
 					res.result = load.bytesToMap(dba.get());
 				} catch (IOException e) {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java?rev=699726&r1=699725&r2=699726&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Sat Sep 27 12:43:46 2008
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
@@ -29,6 +30,7 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -732,6 +734,7 @@
 				//System.out.println(res.result + " : " + i);
 				assertEquals(i, res.result);
 			}
+						
 		}
 		
 		{
@@ -825,6 +828,155 @@
 		}
 	}
 	
+	private PhysicalPlan constructPlan(POCast op) throws PlanException {
+	    LoadFunc load = new TestLoader();
+        op.setLoadFSpec(load.getClass().getName());
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        PhysicalPlan plan = new PhysicalPlan();
+        plan.add(prj);
+        plan.add(op);
+        plan.connect(prj, op);
+        prj.setResultType(DataType.BYTEARRAY);
+        return plan;
+	}
+	
+	/* 
+	 * Test that if the input type is actually same 
+     * as output type and we think that the input type is a
+     * bytearray we still can handle it. This can happen in the
+     * following situation:
+     * If a map in pig (say returned from a UDF) has a key with 
+     * the value being a string, then a lookup of that key being used
+     * in a context which expects a string will cause an implicit cast
+     * to a string. This is because the Pig frontend (logical layer) 
+     * thinks of all map "values" as bytearrays and hence introduces 
+     * a Cast to convert the bytearray to string. Though in reality
+     * the input to the cast is already a string
+     */
+	@Test
+    public void testByteArrayToOtherNoCast() throws PlanException, ExecException {
+        POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+        PhysicalPlan plan = constructPlan(op);
+        TupleFactory tf = TupleFactory.getInstance();
+        
+        {
+            Tuple t = tf.newTuple();
+            Integer input = new Integer(r.nextInt()); 
+            t.append(input);
+            plan.attachInput(t);
+            Result res = op.getNext(new Integer(0));
+            if(res.returnStatus == POStatus.STATUS_OK) {
+                //System.out.println(res.result + " : " + i);
+                assertEquals(input, res.result);
+            }
+        }
+        
+        {
+            // create a new POCast each time since we 
+            // maintain a state variable per POCast object
+            // indicating if cast is really required
+            POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+            plan = constructPlan(newOp);
+            Tuple t = tf.newTuple();
+            Float input = new Float(r.nextFloat());
+            t.append(input);
+            plan.attachInput(t);
+            Result res = newOp.getNext(new Float(0));
+            if(res.returnStatus == POStatus.STATUS_OK) {
+                //System.out.println(res.result + " : " + i);
+                assertEquals(input, res.result);
+            }
+        }
+        
+        {
+            // create a new POCast each time since we 
+            // maintain a state variable per POCast object
+            // indicating if cast is really required
+            POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+            plan = constructPlan(newOp);
+            Tuple t = tf.newTuple();
+            Long input = new Long(r.nextLong());
+            t.append(input);
+            plan.attachInput(t);
+            Result res = newOp.getNext(new Long(0));
+            if(res.returnStatus == POStatus.STATUS_OK) {
+                //System.out.println(res.result + " : " + i);
+                assertEquals(input, res.result);
+            }
+        }
+        
+        {
+            // create a new POCast each time since we 
+            // maintain a state variable per POCast object
+            // indicating if cast is really required
+            POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+            plan = constructPlan(newOp);
+            Tuple t = tf.newTuple();
+            Double input = new Double(r.nextDouble());
+            t.append(input);
+            plan.attachInput(t);
+            Result res = newOp.getNext(new Double(0));
+            if(res.returnStatus == POStatus.STATUS_OK) {
+                //System.out.println(res.result + " : " + i);
+                assertEquals(input, res.result);
+            }
+        }
+        
+        {
+            // create a new POCast each time since we 
+            // maintain a state variable per POCast object
+            // indicating if cast is really required
+            POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+            plan = constructPlan(newOp);
+            Tuple t = tf.newTuple();
+            Tuple input = GenRandomData.genRandSmallTuple("test", 1);
+            t.append(input);
+            plan.attachInput(t);
+            Result res = newOp.getNext(tf.newTuple());
+            if(res.returnStatus == POStatus.STATUS_OK) {
+                //System.out.println(res.result + " : " + str);
+                assertEquals(input, res.result);
+            }
+        }
+        
+        {
+            // create a new POCast each time since we 
+            // maintain a state variable per POCast object
+            // indicating if cast is really required
+            POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+            plan = constructPlan(newOp);
+            Tuple t = tf.newTuple();
+            DataBag input = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+            t.append(input);
+            plan.attachInput(t);
+            Result res = newOp.getNext(DefaultBagFactory.getInstance().newDefaultBag());
+            if(res.returnStatus == POStatus.STATUS_OK) {
+                //System.out.println(res.result + " : " + str);
+                assertEquals(input, res.result);
+            }
+        }
+        
+        {
+            // create a new POCast each time since we 
+            // maintain a state variable per POCast object
+            // indicating if cast is really required
+            POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+            plan = constructPlan(newOp);
+            Tuple t = tf.newTuple();
+            Map<Object, Object> input = new HashMap<Object, Object>();
+            input.put("key1", "value1");
+            input.put("key2", "value2");
+            t.append(input);
+            plan.attachInput(t);
+            Result res = newOp.getNext(new HashMap<Object, Object>());
+            if(res.returnStatus == POStatus.STATUS_OK) {
+                //System.out.println(res.result + " : " + str);
+                assertEquals(input, res.result);
+            }
+        }
+        
+    }
+	
 	@Test
 	public void testTupleToOther() throws PlanException, ExecException {
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);