You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/12/10 02:39:03 UTC
svn commit: r1044192 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
test/org/apache/pig/test/TestPOCast.java
Author: daijy
Date: Fri Dec 10 01:39:02 2010
New Revision: 1044192
URL: http://svn.apache.org/viewvc?rev=1044192&view=rev
Log:
PIG-1758: Deep cast of complex type
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
pig/trunk/test/org/apache/pig/test/TestPOCast.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1044192&r1=1044191&r2=1044192&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Dec 10 01:39:02 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1758: Deep cast of complex type (daijy)
+
PIG-1728: doc updates (chandec via olgan)
PIG-1752: Enable UDFs to indicate files to load into the Distributed Cache
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1044192&r1=1044191&r2=1044192&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Fri Dec 10 01:39:02 2010
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -29,6 +31,8 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
+import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -45,6 +49,7 @@ import org.apache.pig.impl.plan.NodeIdGe
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.StreamToPig;
import org.apache.pig.impl.util.CastUtils;
+import org.apache.pig.impl.util.LogUtils;
/**
* This is just a cast that converts DataByteArray into either String or
@@ -732,6 +737,16 @@ public class POCast extends ExpressionOp
case DataType.TUPLE: {
Result res = in.getNext(t);
+ if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+ try {
+ res.result = convertWithSchema(res.result, fieldSchema);
+ } catch (IOException e) {
+ LogUtils.warn(this, "Unable to interpret value " + res.result + " in field being " +
+ "converted to type tuple, caught ParseException <" +
+ e.getMessage() + "> field discarded",
+ PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, log);
+ }
+ }
return res;
}
@@ -811,6 +826,268 @@ public class POCast extends ExpressionOp
return res;
}
+ private Object convertWithSchema(Object obj, ResourceFieldSchema fs) throws IOException {
+ Object result = null;
+
+ if (fs==null) {
+ return obj;
+ }
+
+ switch (fs.getType()) {
+ case DataType.BAG:
+ if (obj instanceof DataBag) {
+ DataBag db = (DataBag)obj;
+ // Get inner schema of a bag
+ if (fs.getSchema()!=null) {
+ ResourceFieldSchema tupleFs = fs.getSchema().getFields()[0];
+ Iterator<Tuple> iter = db.iterator();
+
+ while (iter.hasNext()) {
+ Tuple t = iter.next();
+ convertWithSchema(t, tupleFs);
+ }
+ }
+ result = db;
+ } else if (obj instanceof DataByteArray) {
+ if (null != caster) {
+ result = caster.bytesToBag(((DataByteArray)obj).get(), fs);
+ } else {
+ int errCode = 1075;
+ String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to bag.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
+ } else {
+ throw new ExecException("Cannot cast " + obj + " to bag.", 1120, PigException.INPUT);
+ }
+ break;
+ case DataType.TUPLE:
+ if (obj instanceof Tuple) {
+ try {
+ Tuple t = (Tuple)obj;
+ ResourceSchema innerSchema = fs.getSchema();
+ int i=0;
+ for (ResourceFieldSchema fieldSchema : innerSchema.getFields()) {
+ Object field = convertWithSchema(t.get(i), fieldSchema);
+ t.set(i, field);
+ i++;
+ }
+ result = t;
+ } catch (Exception e) {
+ throw new ExecException("Cannot convert "+ obj + " to " + fs);
+ }
+ } else if (obj instanceof DataByteArray) {
+ if (null != caster) {
+ result = caster.bytesToTuple(((DataByteArray)obj).get(), fs);
+ } else {
+ int errCode = 1075;
+ String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to tuple.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
+ } else {
+ throw new ExecException("Cannot cast " + obj + " to tuple.", 1120, PigException.INPUT);
+ }
+ break;
+ case DataType.MAP:
+ if (obj instanceof Map) {
+ result = obj;
+ } else if (obj instanceof DataByteArray) {
+ if (null != caster) {
+ result = caster.bytesToMap(((DataByteArray)obj).get());
+ } else {
+ int errCode = 1075;
+ String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to tuple.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
+ } else {
+ throw new ExecException("Cannot cast " + obj + " to map.", 1120, PigException.INPUT);
+ }
+ break;
+ case DataType.INTEGER:
+ switch (DataType.findType(obj)) {
+ case DataType.BYTEARRAY:
+ if (null != caster) {
+ result = caster.bytesToInteger(((DataByteArray) obj).get());
+ } else {
+ int errCode = 1075;
+ String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to int.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
+ break;
+ case DataType.BOOLEAN:
+ if (((Boolean) obj) == true)
+ result = Integer.valueOf(1);
+ else
+ result = Integer.valueOf(0);
+ break;
+ case DataType.INTEGER:
+ result = obj;
+ break;
+ case DataType.DOUBLE:
+ result = Integer.valueOf(((Double)obj).intValue());
+ break;
+ case DataType.LONG:
+ result = Integer.valueOf(((Long)obj).intValue());
+ break;
+ case DataType.FLOAT:
+ result = Integer.valueOf(((Float)obj).intValue());
+ break;
+ case DataType.CHARARRAY:
+ result = CastUtils.stringToInteger((String)obj);
+ break;
+ default:
+ throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+ }
+ break;
+ case DataType.DOUBLE:
+ switch (DataType.findType(obj)) {
+ case DataType.BYTEARRAY:
+ if (null != caster) {
+ result = caster.bytesToDouble(((DataByteArray) obj).get());
+ } else {
+ int errCode = 1075;
+ String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to double.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
+ break;
+ case DataType.BOOLEAN:
+ if (((Boolean) obj) == true)
+ result = new Double(1);
+ else
+ result = new Double(1);
+ break;
+ case DataType.INTEGER:
+ result = new Double(((Integer)obj).doubleValue());
+ break;
+ case DataType.DOUBLE:
+ result = (Double)obj;
+ break;
+ case DataType.LONG:
+ result = new Double(((Long)obj).doubleValue());
+ break;
+ case DataType.FLOAT:
+ result = new Double(((Float)obj).doubleValue());
+ break;
+ case DataType.CHARARRAY:
+ result = CastUtils.stringToDouble((String)obj);
+ break;
+ default:
+ throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+ }
+ break;
+ case DataType.LONG:
+ switch (DataType.findType(obj)) {
+ case DataType.BYTEARRAY:
+ if (null != caster) {
+ result = caster.bytesToLong(((DataByteArray)obj).get());
+ } else {
+ int errCode = 1075;
+ String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to long.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
+ break;
+ case DataType.BOOLEAN:
+ if (((Boolean) obj) == true)
+ result = Long.valueOf(1);
+ else
+ result = Long.valueOf(0);
+ break;
+ case DataType.INTEGER:
+ result = Long.valueOf(((Integer)obj).longValue());
+ break;
+ case DataType.DOUBLE:
+ result = Long.valueOf(((Double)obj).longValue());
+ break;
+ case DataType.LONG:
+ result = (Long)obj;
+ break;
+ case DataType.FLOAT:
+ result = Long.valueOf(((Float)obj).longValue());
+ break;
+ case DataType.CHARARRAY:
+ result = CastUtils.stringToLong((String)obj);
+ break;
+ default:
+ throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+ }
+ break;
+ case DataType.FLOAT:
+ switch (DataType.findType(obj)) {
+ case DataType.BYTEARRAY:
+ if (null != caster) {
+ result = caster.bytesToFloat(((DataByteArray)obj).get());
+ } else {
+ int errCode = 1075;
+ String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to float.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
+ break;
+ case DataType.BOOLEAN:
+ if (((Boolean) obj) == true)
+ result = new Float(1);
+ else
+ result = new Float(0);
+ break;
+ case DataType.INTEGER:
+ result = new Float(((Integer) obj).floatValue());
+ break;
+ case DataType.DOUBLE:
+ result = new Float(((Double)obj).floatValue());
+ break;
+ case DataType.LONG:
+ result = new Float(((Long)obj).floatValue());
+ break;
+ case DataType.FLOAT:
+ result = obj;
+ break;
+ case DataType.CHARARRAY:
+ result = CastUtils.stringToFloat((String)obj);
+ break;
+ default:
+ throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+ }
+ break;
+ case DataType.CHARARRAY:
+ switch (DataType.findType(obj)) {
+ case DataType.BYTEARRAY:
+ if (null != caster) {
+ result = caster.bytesToCharArray(((DataByteArray)obj).get());
+ } else {
+ int errCode = 1075;
+ String msg = "Received a bytearray from the UDF. Cannot determine how to convert the bytearray to float.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ }
+ break;
+ case DataType.BOOLEAN:
+ if (((Boolean) obj) == true)
+ result = "1";
+ else
+ result = "0";
+ break;
+ case DataType.INTEGER:
+ result = ((Integer) obj).toString();
+ break;
+ case DataType.DOUBLE:
+ result = ((Double) obj).toString();
+ break;
+ case DataType.LONG:
+ result = ((Long) obj).toString();
+ break;
+ case DataType.FLOAT:
+ result = ((Float) obj).toString();
+ break;
+ case DataType.CHARARRAY:
+ result = obj;
+ break;
+ default:
+ throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+ }
+ break;
+ default:
+ throw new ExecException("Don't know how to convert "+ obj + " to " + fs, 1120, PigException.INPUT);
+ }
+ return result;
+ }
+
@Override
public Result getNext(DataBag bag) throws ExecException {
PhysicalOperator in = inputs.get(0);
@@ -819,7 +1096,17 @@ public class POCast extends ExpressionOp
switch (resultType) {
case DataType.BAG: {
- Result res = in.getNext(bag);
+ res = in.getNext(bag);
+ if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+ try {
+ res.result = convertWithSchema(res.result, fieldSchema);
+ } catch (IOException e) {
+ LogUtils.warn(this, "Unable to interpret value " + res.result + " in field being " +
+ "converted to type bag, caught ParseException <" +
+ e.getMessage() + "> field discarded",
+ PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, log);
+ }
+ }
return res;
}
@@ -987,6 +1274,13 @@ public class POCast extends ExpressionOp
return res;
}
+ @Override
+ public Result getNext(DataByteArray dba) throws ExecException {
+ Result res = new Result();
+ res.returnStatus = POStatus.STATUS_ERR;
+ return res;
+ }
+
private void readObject(ObjectInputStream is) throws IOException,
ClassNotFoundException {
is.defaultReadObject();
Modified: pig/trunk/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOCast.java?rev=1044192&r1=1044191&r2=1044192&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOCast.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOCast.java Fri Dec 10 01:39:02 2010
@@ -32,6 +32,7 @@ import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
+import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
@@ -45,6 +46,7 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -53,6 +55,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.BinStorage;
@@ -1464,7 +1467,7 @@ public class TestPOCast extends TestCase
}
@Test
- public void testBagToOther() throws IOException {
+ public void testBagToOther() throws IOException, ParseException {
POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
op.setFuncSpec(new FuncSpec(PigStorage.class.getName()));
POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
@@ -1570,6 +1573,48 @@ public class TestPOCast extends TestCase
res = op.getNext(i);
assertEquals(POStatus.STATUS_ERR, res.returnStatus);
}
+
+ {
+ Tuple t = tf.newTuple();
+ t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+ Schema s = Utils.getSchemaFromString("b:bag{t:tuple(a:chararray, b:float)}");
+ op.setFieldSchema(new ResourceSchema.ResourceFieldSchema(s.getField(0)));
+ plan.attachInput(t);
+ DataBag db = null;
+ Result res = op.getNext(db);
+ Iterator<Tuple> expectedBagIterator = ((DataBag)(t.get(0))).iterator();
+ Iterator<Tuple> convertedBagIterator = ((DataBag)(res.result)).iterator();
+
+ while(expectedBagIterator.hasNext()) {
+ Tuple expectedBagTuple = expectedBagIterator.next();
+ Tuple convertedBagTuple = convertedBagIterator.next();
+ assertTrue(convertedBagTuple.get(0) instanceof String);
+ assertTrue(convertedBagTuple.get(1) instanceof Float);
+ assertTrue(expectedBagTuple.get(0).equals(convertedBagTuple.get(0)));
+ assertTrue(((Float)(expectedBagTuple.get(1))).floatValue()==(Float)(convertedBagTuple.get(1)));
+ }
+ }
+
+ {
+ Tuple t = tf.newTuple();
+ t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 100));
+ Schema s = Utils.getSchemaFromString("b:bag{}");
+ op.setFieldSchema(new ResourceSchema.ResourceFieldSchema(s.getField(0)));
+ plan.attachInput(t);
+ DataBag db = null;
+ Result res = op.getNext(db);
+ Iterator<Tuple> expectedBagIterator = ((DataBag)(t.get(0))).iterator();
+ Iterator<Tuple> convertedBagIterator = ((DataBag)(res.result)).iterator();
+
+ while(expectedBagIterator.hasNext()) {
+ Tuple expectedBagTuple = expectedBagIterator.next();
+ Tuple convertedBagTuple = convertedBagIterator.next();
+ assertTrue(convertedBagTuple.get(0) instanceof String);
+ assertTrue(convertedBagTuple.get(1) instanceof Integer);
+ assertTrue(expectedBagTuple.get(0).equals(convertedBagTuple.get(0)));
+ assertTrue(((Integer)(expectedBagTuple.get(1)))==(Integer)(convertedBagTuple.get(1)));
+ }
+ }
}
@Test