You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/27 21:43:47 UTC
svn commit: r699726 - in /incubator/pig/branches/types: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
test/org/apache/pig/test/TestPOCast.java
Author: olga
Date: Sat Sep 27 12:43:46 2008
New Revision: 699726
URL: http://svn.apache.org/viewvc?rev=699726&view=rev
Log:
PIG-463: POCast changes
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=699726&r1=699725&r2=699726&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Sat Sep 27 12:43:46 2008
@@ -257,3 +257,5 @@
PIG-443: Illustrate for the Types branch (shubham via olgan)
PIG-376: set job name (olgan)
+
+ PIG-463: POCast changes (pradeepk via olgan)
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=699726&r1=699725&r2=699726&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Sat Sep 27 12:43:46 2008
@@ -47,6 +47,7 @@
private String loadFSpec;
transient private LoadFunc load;
private Log log = LogFactory.getLog(getClass());
+ private boolean castNotNeeded = false;
private static final long serialVersionUID = 1L;
@@ -90,6 +91,7 @@
@Override
public Result getNext(Integer i) throws ExecException {
PhysicalOperator in = inputs.get(0);
+ Byte castToType = DataType.INTEGER;
Byte resultType = in.getResultType();
switch(resultType) {
case DataType.BAG : {
@@ -109,7 +111,30 @@
Result res = in.getNext(dba);
if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
//res.result = new Integer(Integer.valueOf((((DataByteArray)res.result).toString())));
- dba = (DataByteArray) res.result;
+ if(castNotNeeded) {
+ // we examined the data once before and
+ // determined that the input is the same
+ // type as the type we are casting to
+ // so just send the input out as output
+ return res;
+ }
+ try {
+ dba = (DataByteArray) res.result;
+ } catch (ClassCastException e) {
+ // check if the type of res.result is
+ // same as the type we are trying to cast to
+ if(DataType.findType(res.result) == castToType) {
+ // remember this for future calls
+ castNotNeeded = true;
+ // just return the output
+ return res;
+ } else {
+ // the input is a differen type
+ // rethrow the exception
+ throw e;
+ }
+
+ }
try {
res.result = load.bytesToInteger(dba.get());
} catch (IOException e) {
@@ -187,6 +212,7 @@
@Override
public Result getNext(Long l) throws ExecException {
PhysicalOperator in = inputs.get(0);
+ Byte castToType = DataType.LONG;
Byte resultType = in.getResultType();
switch(resultType) {
case DataType.BAG : {
@@ -211,8 +237,31 @@
DataByteArray dba = null;
Result res = in.getNext(dba);
if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+ if(castNotNeeded) {
+ // we examined the data once before and
+ // determined that the input is the same
+ // type as the type we are casting to
+ // so just send the input out as output
+ return res;
+ }
//res.result = new Long(Long.valueOf((((DataByteArray)res.result).toString())));
- dba = (DataByteArray) res.result;
+ try {
+ dba = (DataByteArray) res.result;
+ } catch (ClassCastException e) {
+ // check if the type of res.result is
+ // same as the type we are trying to cast to
+ if(DataType.findType(res.result) == castToType) {
+ // remember this for future calls
+ castNotNeeded = true;
+ // just return the output
+ return res;
+ } else {
+ // the input is a differen type
+ // rethrow the exception
+ throw e;
+ }
+
+ }
try {
res.result = load.bytesToLong(dba.get());
} catch (IOException e) {
@@ -285,6 +334,7 @@
@Override
public Result getNext(Double d) throws ExecException {
PhysicalOperator in = inputs.get(0);
+ Byte castToType = DataType.DOUBLE;
Byte resultType = in.getResultType();
switch(resultType) {
case DataType.BAG : {
@@ -310,7 +360,30 @@
Result res = in.getNext(dba);
if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
//res.result = new Double(Double.valueOf((((DataByteArray)res.result).toString())));
- dba = (DataByteArray) res.result;
+ if(castNotNeeded) {
+ // we examined the data once before and
+ // determined that the input is the same
+ // type as the type we are casting to
+ // so just send the input out as output
+ return res;
+ }
+ try {
+ dba = (DataByteArray) res.result;
+ } catch (ClassCastException e) {
+ // check if the type of res.result is
+ // same as the type we are trying to cast to
+ if(DataType.findType(res.result) == castToType) {
+ // remember this for future calls
+ castNotNeeded = true;
+ // just return the output
+ return res;
+ } else {
+ // the input is a differen type
+ // rethrow the exception
+ throw e;
+ }
+
+ }
try {
res.result = load.bytesToDouble(dba.get());
} catch (IOException e) {
@@ -382,6 +455,7 @@
@Override
public Result getNext(Float f) throws ExecException {
PhysicalOperator in = inputs.get(0);
+ Byte castToType = DataType.FLOAT;
Byte resultType = in.getResultType();
switch(resultType) {
case DataType.BAG : {
@@ -407,7 +481,30 @@
Result res = in.getNext(dba);
if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
//res.result = new Float(Float.valueOf((((DataByteArray)res.result).toString())));
- dba = (DataByteArray) res.result;
+ if(castNotNeeded) {
+ // we examined the data once before and
+ // determined that the input is the same
+ // type as the type we are casting to
+ // so just send the input out as output
+ return res;
+ }
+ try {
+ dba = (DataByteArray) res.result;
+ } catch (ClassCastException e) {
+ // check if the type of res.result is
+ // same as the type we are trying to cast to
+ if(DataType.findType(res.result) == castToType) {
+ // remember this for future calls
+ castNotNeeded = true;
+ // just return the output
+ return res;
+ } else {
+ // the input is a differen type
+ // rethrow the exception
+ throw e;
+ }
+
+ }
try {
res.result = load.bytesToFloat(dba.get());
} catch (IOException e) {
@@ -481,6 +578,7 @@
@Override
public Result getNext(String str) throws ExecException {
PhysicalOperator in = inputs.get(0);
+ Byte castToType = DataType.CHARARRAY;
Byte resultType = in.getResultType();
switch(resultType) {
case DataType.BAG : {
@@ -506,7 +604,30 @@
Result res = in.getNext(dba);
if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
//res.result = new String(((DataByteArray)res.result).toString());
- dba = (DataByteArray) res.result;
+ if(castNotNeeded) {
+ // we examined the data once before and
+ // determined that the input is the same
+ // type as the type we are casting to
+ // so just send the input out as output
+ return res;
+ }
+ try {
+ dba = (DataByteArray) res.result;
+ } catch (ClassCastException e) {
+ // check if the type of res.result is
+ // same as the type we are trying to cast to
+ if(DataType.findType(res.result) == castToType) {
+ // remember this for future calls
+ castNotNeeded = true;
+ // just return the output
+ return res;
+ } else {
+ // the input is a differen type
+ // rethrow the exception
+ throw e;
+ }
+
+ }
try {
res.result = load.bytesToCharArray(dba.get());
} catch (IOException e) {
@@ -580,6 +701,7 @@
@Override
public Result getNext(Tuple t) throws ExecException {
PhysicalOperator in = inputs.get(0);
+ Byte castToType = DataType.TUPLE;
Byte resultType = in.getResultType();
switch(resultType) {
@@ -593,7 +715,30 @@
Result res = in.getNext(dba);
if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
//res.result = new String(((DataByteArray)res.result).toString());
- dba = (DataByteArray) res.result;
+ if(castNotNeeded) {
+ // we examined the data once before and
+ // determined that the input is the same
+ // type as the type we are casting to
+ // so just send the input out as output
+ return res;
+ }
+ try {
+ dba = (DataByteArray) res.result;
+ } catch (ClassCastException e) {
+ // check if the type of res.result is
+ // same as the type we are trying to cast to
+ if(DataType.findType(res.result) == castToType) {
+ // remember this for future calls
+ castNotNeeded = true;
+ // just return the output
+ return res;
+ } else {
+ // the input is a differen type
+ // rethrow the exception
+ throw e;
+ }
+
+ }
try {
res.result = load.bytesToTuple(dba.get());
} catch (IOException e) {
@@ -634,6 +779,7 @@
@Override
public Result getNext(DataBag bag) throws ExecException {
PhysicalOperator in = inputs.get(0);
+ Byte castToType = DataType.BAG;
Byte resultType = in.getResultType();
switch(resultType) {
@@ -647,7 +793,30 @@
Result res = in.getNext(dba);
if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
//res.result = new String(((DataByteArray)res.result).toString());
- dba = (DataByteArray) res.result;
+ if(castNotNeeded) {
+ // we examined the data once before and
+ // determined that the input is the same
+ // type as the type we are casting to
+ // so just send the input out as output
+ return res;
+ }
+ try {
+ dba = (DataByteArray) res.result;
+ } catch (ClassCastException e) {
+ // check if the type of res.result is
+ // same as the type we are trying to cast to
+ if(DataType.findType(res.result) == castToType) {
+ // remember this for future calls
+ castNotNeeded = true;
+ // just return the output
+ return res;
+ } else {
+ // the input is a differen type
+ // rethrow the exception
+ throw e;
+ }
+
+ }
try {
res.result = load.bytesToBag(dba.get());
} catch (IOException e) {
@@ -688,6 +857,7 @@
@Override
public Result getNext(Map m) throws ExecException {
PhysicalOperator in = inputs.get(0);
+ Byte castToType = DataType.MAP;
Byte resultType = in.getResultType();
switch(resultType) {
@@ -701,7 +871,30 @@
Result res = in.getNext(dba);
if(res.returnStatus == POStatus.STATUS_OK && res.result != null) {
//res.result = new String(((DataByteArray)res.result).toString());
- dba = (DataByteArray) res.result;
+ if(castNotNeeded) {
+ // we examined the data once before and
+ // determined that the input is the same
+ // type as the type we are casting to
+ // so just send the input out as output
+ return res;
+ }
+ try {
+ dba = (DataByteArray) res.result;
+ } catch (ClassCastException e) {
+ // check if the type of res.result is
+ // same as the type we are trying to cast to
+ if(DataType.findType(res.result) == castToType) {
+ // remember this for future calls
+ castNotNeeded = true;
+ // just return the output
+ return res;
+ } else {
+ // the input is a differen type
+ // rethrow the exception
+ throw e;
+ }
+
+ }
try {
res.result = load.bytesToMap(dba.get());
} catch (IOException e) {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java?rev=699726&r1=699725&r2=699726&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Sat Sep 27 12:43:46 2008
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.URL;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
@@ -29,6 +30,7 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -732,6 +734,7 @@
//System.out.println(res.result + " : " + i);
assertEquals(i, res.result);
}
+
}
{
@@ -825,6 +828,155 @@
}
}
+ private PhysicalPlan constructPlan(POCast op) throws PlanException {
+ LoadFunc load = new TestLoader();
+ op.setLoadFSpec(load.getClass().getName());
+ POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ PhysicalPlan plan = new PhysicalPlan();
+ plan.add(prj);
+ plan.add(op);
+ plan.connect(prj, op);
+ prj.setResultType(DataType.BYTEARRAY);
+ return plan;
+ }
+
+ /*
+ * Test that if the input type is actually same
+ * as output type and we think that the input type is a
+ * bytearray we still can handle it. This can happen in the
+ * following situation:
+ * If a map in pig (say returned from a UDF) has a key with
+ * the value being a string, then a lookup of that key being used
+ * in a context which expects a string will cause an implicit cast
+ * to a string. This is because the Pig frontend (logical layer)
+ * thinks of all map "values" as bytearrays and hence introduces
+ * a Cast to convert the bytearray to string. Though in reality
+ * the input to the cast is already a string
+ */
+ @Test
+ public void testByteArrayToOtherNoCast() throws PlanException, ExecException {
+ POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
+ PhysicalPlan plan = constructPlan(op);
+ TupleFactory tf = TupleFactory.getInstance();
+
+ {
+ Tuple t = tf.newTuple();
+ Integer input = new Integer(r.nextInt());
+ t.append(input);
+ plan.attachInput(t);
+ Result res = op.getNext(new Integer(0));
+ if(res.returnStatus == POStatus.STATUS_OK) {
+ //System.out.println(res.result + " : " + i);
+ assertEquals(input, res.result);
+ }
+ }
+
+ {
+ // create a new POCast each time since we
+ // maintain a state variable per POCast object
+ // indicating if cast is really required
+ POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+ plan = constructPlan(newOp);
+ Tuple t = tf.newTuple();
+ Float input = new Float(r.nextFloat());
+ t.append(input);
+ plan.attachInput(t);
+ Result res = newOp.getNext(new Float(0));
+ if(res.returnStatus == POStatus.STATUS_OK) {
+ //System.out.println(res.result + " : " + i);
+ assertEquals(input, res.result);
+ }
+ }
+
+ {
+ // create a new POCast each time since we
+ // maintain a state variable per POCast object
+ // indicating if cast is really required
+ POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+ plan = constructPlan(newOp);
+ Tuple t = tf.newTuple();
+ Long input = new Long(r.nextLong());
+ t.append(input);
+ plan.attachInput(t);
+ Result res = newOp.getNext(new Long(0));
+ if(res.returnStatus == POStatus.STATUS_OK) {
+ //System.out.println(res.result + " : " + i);
+ assertEquals(input, res.result);
+ }
+ }
+
+ {
+ // create a new POCast each time since we
+ // maintain a state variable per POCast object
+ // indicating if cast is really required
+ POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+ plan = constructPlan(newOp);
+ Tuple t = tf.newTuple();
+ Double input = new Double(r.nextDouble());
+ t.append(input);
+ plan.attachInput(t);
+ Result res = newOp.getNext(new Double(0));
+ if(res.returnStatus == POStatus.STATUS_OK) {
+ //System.out.println(res.result + " : " + i);
+ assertEquals(input, res.result);
+ }
+ }
+
+ {
+ // create a new POCast each time since we
+ // maintain a state variable per POCast object
+ // indicating if cast is really required
+ POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+ plan = constructPlan(newOp);
+ Tuple t = tf.newTuple();
+ Tuple input = GenRandomData.genRandSmallTuple("test", 1);
+ t.append(input);
+ plan.attachInput(t);
+ Result res = newOp.getNext(tf.newTuple());
+ if(res.returnStatus == POStatus.STATUS_OK) {
+ //System.out.println(res.result + " : " + str);
+ assertEquals(input, res.result);
+ }
+ }
+
+ {
+ // create a new POCast each time since we
+ // maintain a state variable per POCast object
+ // indicating if cast is really required
+ POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+ plan = constructPlan(newOp);
+ Tuple t = tf.newTuple();
+ DataBag input = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+ t.append(input);
+ plan.attachInput(t);
+ Result res = newOp.getNext(DefaultBagFactory.getInstance().newDefaultBag());
+ if(res.returnStatus == POStatus.STATUS_OK) {
+ //System.out.println(res.result + " : " + str);
+ assertEquals(input, res.result);
+ }
+ }
+
+ {
+ // create a new POCast each time since we
+ // maintain a state variable per POCast object
+ // indicating if cast is really required
+ POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1);
+ plan = constructPlan(newOp);
+ Tuple t = tf.newTuple();
+ Map<Object, Object> input = new HashMap<Object, Object>();
+ input.put("key1", "value1");
+ input.put("key2", "value2");
+ t.append(input);
+ plan.attachInput(t);
+ Result res = newOp.getNext(new HashMap<Object, Object>());
+ if(res.returnStatus == POStatus.STATUS_OK) {
+ //System.out.println(res.result + " : " + str);
+ assertEquals(input, res.result);
+ }
+ }
+
+ }
+
@Test
public void testTupleToOther() throws PlanException, ExecException {
POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);