You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/12/10 02:39:03 UTC

svn commit: r1044192 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java test/org/apache/pig/test/TestPOCast.java

Author: daijy
Date: Fri Dec 10 01:39:02 2010
New Revision: 1044192

URL: http://svn.apache.org/viewvc?rev=1044192&view=rev
Log:
PIG-1758: Deep cast of complex type

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
    pig/trunk/test/org/apache/pig/test/TestPOCast.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1044192&r1=1044191&r2=1044192&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Dec 10 01:39:02 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1758: Deep cast of complex type (daijy)
+
 PIG-1728: doc updates (chandec via olgan)
 
 PIG-1752: Enable UDFs to indicate files to load into the Distributed Cache

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1044192&r1=1044191&r2=1044192&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Fri Dec 10 01:39:02 2010
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -29,6 +31,8 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -45,6 +49,7 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.StreamToPig;
 import org.apache.pig.impl.util.CastUtils;
+import org.apache.pig.impl.util.LogUtils;
 
 /**
  * This is just a cast that converts DataByteArray into either String or
@@ -732,6 +737,16 @@ public class POCast extends ExpressionOp
 
         case DataType.TUPLE: {
             Result res = in.getNext(t);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                try {
+                    res.result = convertWithSchema(res.result, fieldSchema);
+                } catch (IOException e) {
+                    LogUtils.warn(this, "Unable to interpret value " + res.result + " in field being " +
+                            "converted to type tuple, caught ParseException <" +
+                            e.getMessage() + "> field discarded", 
+                            PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, log);
+                }
+            }
             return res;
         }
 
@@ -811,6 +826,268 @@ public class POCast extends ExpressionOp
         return res;
     }
 
+    private Object convertWithSchema(Object obj, ResourceFieldSchema fs) throws IOException {
+        Object result = null;
+        
+        if (fs==null) {
+            return obj;
+        }
+        
+        switch (fs.getType()) {
+        case DataType.BAG:
+            if (obj instanceof DataBag) {
+                DataBag db = (DataBag)obj;
+                // Get inner schema of a bag
+                if (fs.getSchema()!=null) {
+                    ResourceFieldSchema tupleFs = fs.getSchema().getFields()[0];
+                    Iterator<Tuple> iter = db.iterator();
+                    
+                    while (iter.hasNext()) {
+                        Tuple t = iter.next();
+                        convertWithSchema(t, tupleFs);
+                    }
+                }
+                result = db;
+            } else if (obj instanceof DataByteArray) {
+                if (null != caster) {
+                    result = caster.bytesToBag(((DataByteArray)obj).get(), fs);
+                } else {
+                    int errCode = 1075;
+                    String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to bag.";
+                    throw new ExecException(msg, errCode, PigException.INPUT);
+                }                
+            } else {
+                throw new ExecException("Cannot cast " + obj + " to bag.", 1120, PigException.INPUT);
+            }
+            break;
+        case DataType.TUPLE:
+            if (obj instanceof Tuple) {
+                try {
+                    Tuple t = (Tuple)obj;
+                    ResourceSchema innerSchema = fs.getSchema();
+                    int i=0;
+                    for (ResourceFieldSchema fieldSchema : innerSchema.getFields()) {
+                        Object field = convertWithSchema(t.get(i), fieldSchema);
+                        t.set(i, field);
+                        i++;
+                    }
+                    result = t;
+                } catch (Exception e) {
+                    throw new ExecException("Cannot convert "+ obj + " to " + fs);
+                }
+            } else if (obj instanceof DataByteArray) {
+                if (null != caster) {
+                    result = caster.bytesToTuple(((DataByteArray)obj).get(), fs);
+                } else {
+                    int errCode = 1075;
+                    String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to tuple.";
+                    throw new ExecException(msg, errCode, PigException.INPUT);
+                }
+            } else {
+                throw new ExecException("Cannot cast " + obj + " to tuple.", 1120, PigException.INPUT);
+            }
+            break;
+        case DataType.MAP:
+            if (obj instanceof Map) {
+                result = obj;
+            } else if (obj instanceof DataByteArray) {
+                if (null != caster) {
+                    result = caster.bytesToMap(((DataByteArray)obj).get());
+                } else {
+                    int errCode = 1075;
+                    String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to tuple.";
+                    throw new ExecException(msg, errCode, PigException.INPUT);
+                }
+            } else {
+                throw new ExecException("Cannot cast " + obj + " to map.", 1120, PigException.INPUT);
+            }
+            break;
+        case DataType.INTEGER:
+            switch (DataType.findType(obj)) {
+            case DataType.BYTEARRAY:
+                if (null != caster) {
+                    result = caster.bytesToInteger(((DataByteArray) obj).get());
+                } else {
+                    int errCode = 1075;
+                    String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to int.";
+                    throw new ExecException(msg, errCode, PigException.INPUT);
+                }
+                break;
+            case DataType.BOOLEAN:
+                if (((Boolean) obj) == true)
+                    result = Integer.valueOf(1);
+                else
+                    result = Integer.valueOf(0);
+                break;
+            case DataType.INTEGER:
+                result = obj;
+                break;
+            case DataType.DOUBLE:
+                result = Integer.valueOf(((Double)obj).intValue());
+                break;
+            case DataType.LONG:
+                result = Integer.valueOf(((Long)obj).intValue());
+                break;
+            case DataType.FLOAT:
+                result = Integer.valueOf(((Float)obj).intValue());
+                break;
+            case DataType.CHARARRAY:
+                result = CastUtils.stringToInteger((String)obj);
+                break;
+            default:
+                throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+            }
+            break;
+        case DataType.DOUBLE:
+            switch (DataType.findType(obj)) {
+            case DataType.BYTEARRAY:
+                if (null != caster) {
+                    result = caster.bytesToDouble(((DataByteArray) obj).get());
+                } else {
+                    int errCode = 1075;
+                    String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to double.";
+                    throw new ExecException(msg, errCode, PigException.INPUT);
+                }
+                break;
+            case DataType.BOOLEAN:
+                if (((Boolean) obj) == true)
+                    result = new Double(1);
+                else
+                    result = new Double(1);
+                break;
+            case DataType.INTEGER:
+                result = new Double(((Integer)obj).doubleValue());
+                break;
+            case DataType.DOUBLE:
+                result = (Double)obj;
+                break;
+            case DataType.LONG:
+                result = new Double(((Long)obj).doubleValue());
+                break;
+            case DataType.FLOAT:
+                result = new Double(((Float)obj).doubleValue());
+                break;
+            case DataType.CHARARRAY:
+                result = CastUtils.stringToDouble((String)obj);
+                break;
+            default:
+                throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+            }
+            break;
+        case DataType.LONG:
+            switch (DataType.findType(obj)) {
+            case DataType.BYTEARRAY:
+                if (null != caster) {
+                    result = caster.bytesToLong(((DataByteArray)obj).get());
+                } else {
+                    int errCode = 1075;
+                    String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to long.";
+                    throw new ExecException(msg, errCode, PigException.INPUT);
+                }
+                break;
+            case DataType.BOOLEAN:
+                if (((Boolean) obj) == true)
+                    result = Long.valueOf(1);
+                else
+                    result = Long.valueOf(0);
+                break;
+            case DataType.INTEGER:
+                result = Long.valueOf(((Integer)obj).longValue());
+                break;
+            case DataType.DOUBLE:
+                result = Long.valueOf(((Double)obj).longValue());
+                break;
+            case DataType.LONG:
+                result = (Long)obj;
+                break;
+            case DataType.FLOAT:
+                result = Long.valueOf(((Float)obj).longValue());
+                break;
+            case DataType.CHARARRAY:
+                result = CastUtils.stringToLong((String)obj);
+                break;
+            default:
+                throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+            }
+            break;
+        case DataType.FLOAT:
+            switch (DataType.findType(obj)) {
+            case DataType.BYTEARRAY:
+                if (null != caster) {
+                    result = caster.bytesToFloat(((DataByteArray)obj).get());
+                } else {
+                    int errCode = 1075;
+                    String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to float.";
+                    throw new ExecException(msg, errCode, PigException.INPUT);
+                }
+                break;
+            case DataType.BOOLEAN:
+                if (((Boolean) obj) == true)
+                    result = new Float(1);
+                else
+                    result = new Float(0);
+                break;
+            case DataType.INTEGER:
+                result = new Float(((Integer) obj).floatValue());
+                break;
+            case DataType.DOUBLE:
+                result = new Float(((Double)obj).floatValue());
+                break;
+            case DataType.LONG:
+                result = new Float(((Long)obj).floatValue());
+                break;
+            case DataType.FLOAT:
+                result = obj;
+                break;
+            case DataType.CHARARRAY:
+                result = CastUtils.stringToFloat((String)obj);
+                break;
+            default:
+                throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+            }
+            break;
+        case DataType.CHARARRAY:
+            switch (DataType.findType(obj)) {
+            case DataType.BYTEARRAY:
+                if (null != caster) {
+                    result = caster.bytesToCharArray(((DataByteArray)obj).get());
+                } else {
+                    int errCode = 1075;
+                    String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to float.";
+                    throw new ExecException(msg, errCode, PigException.INPUT);
+                }
+                break;
+            case DataType.BOOLEAN:
+                if (((Boolean) obj) == true)
+                    result = "1";
+                else
+                    result = "0";
+                break;
+            case DataType.INTEGER:
+                result = ((Integer) obj).toString();
+                break;
+            case DataType.DOUBLE:
+                result = ((Double) obj).toString();
+                break;
+            case DataType.LONG:
+                result = ((Long) obj).toString();
+                break;
+            case DataType.FLOAT:
+                result = ((Float) obj).toString();
+                break;
+            case DataType.CHARARRAY:
+                result = obj;
+                break;
+            default:
+                throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+            }
+            break;
+        default:
+            throw new ExecException("Don't know how to convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+        }
+        return result;
+    }
+    
     @Override
     public Result getNext(DataBag bag) throws ExecException {
         PhysicalOperator in = inputs.get(0);
@@ -819,7 +1096,17 @@ public class POCast extends ExpressionOp
         switch (resultType) {
 
         case DataType.BAG: {
-            Result res = in.getNext(bag);
+            res = in.getNext(bag);
+            if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+                try {
+                    res.result = convertWithSchema(res.result, fieldSchema);
+                } catch (IOException e) {
+                    LogUtils.warn(this, "Unable to interpret value " + res.result + " in field being " +
+                            "converted to type bag, caught ParseException <" +
+                            e.getMessage() + "> field discarded", 
+                            PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, log);
+                }
+            }
             return res;
         }
 
@@ -987,6 +1274,13 @@ public class POCast extends ExpressionOp
         return res;
     }
 
+    @Override
+    public Result getNext(DataByteArray dba) throws ExecException {
+        Result res = new Result();
+        res.returnStatus = POStatus.STATUS_ERR;
+        return res;
+    }
+    
     private void readObject(ObjectInputStream is) throws IOException,
             ClassNotFoundException {
         is.defaultReadObject();

Modified: pig/trunk/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOCast.java?rev=1044192&r1=1044191&r2=1044192&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOCast.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOCast.java Fri Dec 10 01:39:02 2010
@@ -32,6 +32,7 @@ import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -45,6 +46,7 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -53,6 +55,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.builtin.BinStorage;
@@ -1464,7 +1467,7 @@ public class TestPOCast extends TestCase
 	}
 	
 	@Test
-	public void testBagToOther() throws IOException {
+	public void testBagToOther() throws IOException, ParseException {
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		op.setFuncSpec(new FuncSpec(PigStorage.class.getName()));
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
@@ -1570,6 +1573,48 @@ public class TestPOCast extends TestCase
 			res = op.getNext(i);
 			assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 		}
+		
+        {
+            Tuple t = tf.newTuple();
+            t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+            Schema s = Utils.getSchemaFromString("b:bag{t:tuple(a:chararray, b:float)}");
+            op.setFieldSchema(new ResourceSchema.ResourceFieldSchema(s.getField(0)));
+            plan.attachInput(t);
+            DataBag db = null;
+            Result res = op.getNext(db);
+            Iterator<Tuple> expectedBagIterator = ((DataBag)(t.get(0))).iterator();
+            Iterator<Tuple> convertedBagIterator = ((DataBag)(res.result)).iterator();
+            
+            while(expectedBagIterator.hasNext()) {
+                Tuple expectedBagTuple = expectedBagIterator.next();
+                Tuple convertedBagTuple = convertedBagIterator.next();
+                assertTrue(convertedBagTuple.get(0) instanceof String);
+                assertTrue(convertedBagTuple.get(1) instanceof Float);
+                assertTrue(expectedBagTuple.get(0).equals(convertedBagTuple.get(0)));
+                assertTrue(((Float)(expectedBagTuple.get(1))).floatValue()==(Float)(convertedBagTuple.get(1)));
+            }
+        }
+        
+        {
+            Tuple t = tf.newTuple();
+            t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+            Schema s = Utils.getSchemaFromString("b:bag{}");
+            op.setFieldSchema(new ResourceSchema.ResourceFieldSchema(s.getField(0)));
+            plan.attachInput(t);
+            DataBag db = null;
+            Result res = op.getNext(db);
+            Iterator<Tuple> expectedBagIterator = ((DataBag)(t.get(0))).iterator();
+            Iterator<Tuple> convertedBagIterator = ((DataBag)(res.result)).iterator();
+            
+            while(expectedBagIterator.hasNext()) {
+                Tuple expectedBagTuple = expectedBagIterator.next();
+                Tuple convertedBagTuple = convertedBagIterator.next();
+                assertTrue(convertedBagTuple.get(0) instanceof String);
+                assertTrue(convertedBagTuple.get(1) instanceof Integer);
+                assertTrue(expectedBagTuple.get(0).equals(convertedBagTuple.get(0)));
+                assertTrue(((Integer)(expectedBagTuple.get(1)))==(Integer)(convertedBagTuple.get(1)));
+            }
+        }
 	}
 	
 	@Test