You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/12/06 07:48:44 UTC
svn commit: r1042526 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/builtin/BinStorage.java
test/org/apache/pig/test/TestEvalPipeline2.java
Author: daijy
Date: Mon Dec 6 06:48:44 2010
New Revision: 1042526
URL: http://svn.apache.org/viewvc?rev=1042526&view=rev
Log:
PIG-1745: Disable converting bytes loading from BinStorage
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/builtin/BinStorage.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1042526&r1=1042525&r2=1042526&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Dec 6 06:48:44 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1745: Disable converting bytes loading from BinStorage (daijy)
+
PIG-1747: pattern match classes for matching patterns in physical plan (thejas)
PIG-1707: Allow pig build to pull from alternate maven repo to enable building
Modified: pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=1042526&r1=1042525&r2=1042526&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/BinStorage.java Mon Dec 6 06:48:44 2010
@@ -61,6 +61,7 @@ import org.apache.pig.impl.io.BinStorage
import org.apache.pig.impl.io.BinStorageRecordReader;
import org.apache.pig.impl.io.BinStorageRecordWriter;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.Utils;
@@ -70,21 +71,74 @@ import org.apache.pig.impl.util.Utils;
* supported.
*/
public class BinStorage extends FileInputLoadFunc
-implements LoadCaster, StoreFuncInterface, LoadMetadata {
+implements StoreFuncInterface, LoadMetadata {
+ static class UnImplementedLoadCaster implements LoadCaster {
+
+ @Override
+ public DataBag bytesToBag(byte[] b, ResourceFieldSchema fieldSchema)
+ throws IOException {
+ throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+ }
+
+ @Override
+ public String bytesToCharArray(byte[] b) throws IOException {
+ throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+ }
+
+ @Override
+ public Double bytesToDouble(byte[] b) throws IOException {
+ throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+ }
+
+ @Override
+ public Float bytesToFloat(byte[] b) throws IOException {
+ throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+ }
+
+ @Override
+ public Integer bytesToInteger(byte[] b) throws IOException {
+ throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+ }
+
+ @Override
+ public Long bytesToLong(byte[] b) throws IOException {
+ throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+ }
+
+ @Override
+ public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+ throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+ }
+
+ @Override
+ public Tuple bytesToTuple(byte[] b, ResourceFieldSchema fieldSchema)
+ throws IOException {
+ throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+ }
+ }
Iterator<Tuple> i = null;
private static final Log mLog = LogFactory.getLog(BinStorage.class);
protected long end = Long.MAX_VALUE;
+ static String casterString = null;
+ static LoadCaster caster = null;
+
private BinStorageRecordReader recReader = null;
private BinStorageRecordWriter recWriter = null;
- /**
- * Simple binary nested reader format
- */
public BinStorage() {
}
+
+ // If user knows how to cast the bytes for BinStorage, provide
+ // the class name for the caster. When we later want to convert
+ // bytes to other types, BinStorage knows how. This provides a way
+ // for user to store intermediate data without having to explicitly
+ // list all the fields and figure out their parts.
+ public BinStorage(String casterString) {
+ this.casterString = casterString;
+ }
@Override
public Tuple getNext() throws IOException {
@@ -104,126 +158,6 @@ implements LoadCaster, StoreFuncInterfac
}
}
- @Override
- public DataBag bytesToBag(byte[] b, ResourceFieldSchema schema){
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
- try {
- return DataReaderWriter.bytesToBag(dis);
- } catch (IOException e) {
- LogUtils.warn(this, "Unable to convert bytearray to bag, " +
- "caught IOException <" + e.getMessage() + ">",
- PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED,
- mLog);
-
- return null;
- }
- }
-
- @Override
- public String bytesToCharArray(byte[] b) {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
- try {
- return DataReaderWriter.bytesToCharArray(dis);
- } catch (IOException e) {
- LogUtils.warn(this, "Unable to convert bytearray to chararray, " +
- "caught IOException <" + e.getMessage() + ">",
- PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED,
- mLog);
-
- return null;
- }
- }
-
- @Override
- public Double bytesToDouble(byte[] b) {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
- try {
- return new Double(dis.readDouble());
- } catch (IOException e) {
- LogUtils.warn(this, "Unable to convert bytearray to double, " +
- "caught IOException <" + e.getMessage() + ">",
- PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED,
- mLog);
-
- return null;
- }
- }
-
- @Override
- public Float bytesToFloat(byte[] b) {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
- try {
- return new Float(dis.readFloat());
- } catch (IOException e) {
- LogUtils.warn(this, "Unable to convert bytearray to float, " +
- "caught IOException <" + e.getMessage() + ">",
- PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED,
- mLog);
-
- return null;
- }
- }
-
- @Override
- public Integer bytesToInteger(byte[] b) {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
- try {
- return Integer.valueOf(dis.readInt());
- } catch (IOException e) {
- LogUtils.warn(this, "Unable to convert bytearray to integer, " +
- "caught IOException <" + e.getMessage() + ">",
- PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED,
- mLog);
-
- return null;
- }
- }
-
- @Override
- public Long bytesToLong(byte[] b) {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
- try {
- return Long.valueOf(dis.readLong());
- } catch (IOException e) {
- LogUtils.warn(this, "Unable to convert bytearray to long, " +
- "caught IOException <" + e.getMessage() + ">",
- PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED,
- mLog);
-
- return null;
- }
- }
-
- @Override
- public Map<String, Object> bytesToMap(byte[] b) {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
- try {
- return DataReaderWriter.bytesToMap(dis);
- } catch (IOException e) {
- LogUtils.warn(this, "Unable to convert bytearray to map, " +
- "caught IOException <" + e.getMessage() + ">",
- PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED,
- mLog);
-
- return null;
- }
- }
-
- @Override
- public Tuple bytesToTuple(byte[] b, ResourceFieldSchema schema) {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
- try {
- return DataReaderWriter.bytesToTuple(dis);
- } catch (IOException e) {
- LogUtils.warn(this, "Unable to convert bytearray to tuple, " +
- "caught IOException <" + e.getMessage() + ">",
- PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED,
- mLog);
-
- return null;
- }
- }
-
public byte[] toBytes(DataBag bag) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
@@ -338,9 +272,37 @@ implements LoadCaster, StoreFuncInterfac
return 42;
}
+ @SuppressWarnings("unchecked")
@Override
- public LoadCaster getLoadCaster() {
- return this;
+ public LoadCaster getLoadCaster() throws IOException {
+ if (caster == null) {
+ Class<LoadCaster> casterClass = null;
+ if (casterString!=null) {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ try {
+ // Try casterString as a fully qualified name
+ casterClass = (Class<LoadCaster>)cl.loadClass(casterString);
+ } catch (ClassNotFoundException e) {
+ }
+ if (casterClass==null) {
+ try {
+ // Try casterString as in builtin
+ casterClass = (Class<LoadCaster>)cl.loadClass("org.apache.pig.builtin." + casterString);
+ } catch (ClassNotFoundException e) {
+ throw new FrontendException("Cannot find LoadCaster class " + casterString, 1119, e);
+ }
+ }
+ try {
+ caster = casterClass.newInstance();
+ } catch (Exception e) {
+ throw new FrontendException("Cannot instantiate class " + casterString, 2259, e);
+ }
+ }
+ else {
+ caster = new UnImplementedLoadCaster();
+ }
+ }
+ return caster;
}
@Override
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1042526&r1=1042525&r2=1042526&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Dec 6 06:48:44 2010
@@ -34,6 +34,7 @@ import junit.framework.TestCase;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -165,7 +166,7 @@ public class TestEvalPipeline2 extends T
pigServer.deleteFile(output);
pigServer.store("a", output, BinStorage.class.getName());
- pigServer.registerQuery("b = load '" + output + "' using BinStorage() "
+ pigServer.registerQuery("b = load '" + output + "' using BinStorage('Utf8StorageConverter') "
+ "as (name: int, age: int, gpa: float, lage: long, dgpa: double);");
Iterator<Tuple> it = pigServer.openIterator("b");
@@ -180,35 +181,27 @@ public class TestEvalPipeline2 extends T
//tuple 1
tup = it.next();
-
- //1634952294 is integer whose binary represtation is same as that of "asdf"
- // other columns are returning null because they have less than num of bytes
- //expected for the corresponding numeric type's binary respresentation.
- assertTrue( (Integer)tup.get(0) == 1634952294);
- assertTrue(tup.get(1) == null);
- assertTrue(tup.get(2) == null);
- assertTrue(tup.get(3) == null);
- assertTrue(tup.get(4) == null);
+ assertTrue((Integer)tup.get(0) == null);
+ assertTrue((Integer)tup.get(1) == 12);
+ assertTrue((Float)tup.get(2) == 1.1F);
+ assertTrue((Long)tup.get(3) == 231L);
+ assertTrue((Double)tup.get(4) == 234.0);
//tuple 2
tup = it.next();
assertTrue(tup.get(0) == null);
- assertTrue( (Integer)tup.get(1) == 825373489);
- assertTrue( (Float)tup.get(2) == 2.5931501E-9F);
- assertTrue( (Long)tup.get(3) == 3544952156018063160L);
- assertTrue( (Double)tup.get(4) == 1.030084341992388E-71);
+ assertTrue((Integer)tup.get(1) == 1231);
+ assertTrue((Float)tup.get(2) == 123.4F);
+ assertTrue((Long)tup.get(3) == 12345678L);
+ assertTrue((Double)tup.get(4) == 1234.567);
//tuple 3
tup = it.next();
- // when byte array is larger than required num of bytes for given number type
- // it uses the required bytes from beginging of byte array for conversion
- // for example 1634952294 corresponds to first 4 byptes of binary string correspnding to
- // asdff
- assertTrue((Integer)tup.get(0) == 1634952294);
- assertTrue( (Integer)tup.get(1) == 825373490);
- assertTrue( (Float)tup.get(2) == 2.5350009E-9F);
- assertTrue( (Long)tup.get(3) == 3544952156018063160L);
- assertTrue( (Double)tup.get(4) == 1.0300843656201408E-71);
+ assertTrue(tup.get(0) == null);
+ assertTrue((Integer)tup.get(1) == 1232123);
+ assertTrue((Float)tup.get(2) == 1.45345F);
+ assertTrue((Long)tup.get(3) == 123456789L);
+ assertTrue((Double)tup.get(4) == 1.234567899E8);
Util.deleteFile(cluster, "table");
}
@@ -948,4 +941,29 @@ public class TestEvalPipeline2 extends T
assertFalse(iter.hasNext());
}
+
+ // See PIG-1732
+ @Test
+ public void testBinStorageByteCast() throws Exception{
+ String[] input1 = {
+ "1\t2\t3",
+ };
+
+ Util.createInputFile(cluster, "table_testBinStorageByteCast", input1);
+ pigServer.registerQuery("a = load 'table_testBinStorageByteCast' as (a0, a1, a2);");
+ pigServer.store("a", "table_testBinStorageByteCast.temp", BinStorage.class.getName());
+
+ pigServer.registerQuery("a = load 'table_testBinStorageByteCast.temp' using BinStorage() as (a0, a1, a2);");
+ pigServer.registerQuery("b = foreach a generate (long)a0;");
+
+ try {
+ pigServer.openIterator("b");
+ } catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertTrue(pe.getErrorCode()==1118);
+ return;
+ }
+
+ fail();
+ }
}