You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/12/06 07:48:44 UTC

svn commit: r1042526 - in /pig/trunk: CHANGES.txt src/org/apache/pig/builtin/BinStorage.java test/org/apache/pig/test/TestEvalPipeline2.java

Author: daijy
Date: Mon Dec  6 06:48:44 2010
New Revision: 1042526

URL: http://svn.apache.org/viewvc?rev=1042526&view=rev
Log:
PIG-1745: Disable converting bytes loading from BinStorage

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1042526&r1=1042525&r2=1042526&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Dec  6 06:48:44 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1745: Disable converting bytes loading from BinStorage (daijy)
+
 PIG-1747: pattern match classes for matching patterns in physical plan (thejas)
 
 PIG-1707: Allow pig build to pull from alternate maven repo to enable building

Modified: pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=1042526&r1=1042525&r2=1042526&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/BinStorage.java Mon Dec  6 06:48:44 2010
@@ -61,6 +61,7 @@ import org.apache.pig.impl.io.BinStorage
 import org.apache.pig.impl.io.BinStorageRecordReader;
 import org.apache.pig.impl.io.BinStorageRecordWriter;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.Utils;
 
@@ -70,21 +71,74 @@ import org.apache.pig.impl.util.Utils;
  * supported.
  */
 public class BinStorage extends FileInputLoadFunc 
-implements LoadCaster, StoreFuncInterface, LoadMetadata {
+implements StoreFuncInterface, LoadMetadata {
 
+    static class UnImplementedLoadCaster implements LoadCaster {
+
+        @Override
+        public DataBag bytesToBag(byte[] b, ResourceFieldSchema fieldSchema)
+                throws IOException {
+            throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+        }
+
+        @Override
+        public String bytesToCharArray(byte[] b) throws IOException {
+            throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+        }
+
+        @Override
+        public Double bytesToDouble(byte[] b) throws IOException {
+            throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+        }
+
+        @Override
+        public Float bytesToFloat(byte[] b) throws IOException {
+            throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+        }
+
+        @Override
+        public Integer bytesToInteger(byte[] b) throws IOException {
+            throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+        }
+
+        @Override
+        public Long bytesToLong(byte[] b) throws IOException {
+            throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+        }
+
+        @Override
+        public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+            throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+        }
+
+        @Override
+        public Tuple bytesToTuple(byte[] b, ResourceFieldSchema fieldSchema)
+                throws IOException {
+            throw new ExecException("Cannot convert bytes load from BinStorage", 1118);
+        }
+    }
 
     Iterator<Tuple>     i              = null;
     private static final Log mLog = LogFactory.getLog(BinStorage.class);
     protected long                end            = Long.MAX_VALUE;
     
+    static String casterString = null;
+    static LoadCaster caster = null;
+    
     private BinStorageRecordReader recReader = null;
     private BinStorageRecordWriter recWriter = null;
     
-    /**
-     * Simple binary nested reader format
-     */
     public BinStorage() {
     }
+    
+    // If user knows how to cast the bytes for BinStorage, provide
+    // the class name for the caster. When we later want to convert
+    // bytes to other types, BinStorage knows how. This provides a way 
+    // for user to store intermediate data without having to explicitly
+    // list all the fields and figure out their parts.
+    public BinStorage(String casterString) {
+        this.casterString = casterString;
+    }
 
     @Override
     public Tuple getNext() throws IOException {
@@ -104,126 +158,6 @@ implements LoadCaster, StoreFuncInterfac
         }
     }
 
-    @Override
-    public DataBag bytesToBag(byte[] b, ResourceFieldSchema schema){
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
-        try {
-            return DataReaderWriter.bytesToBag(dis);
-        } catch (IOException e) {
-            LogUtils.warn(this, "Unable to convert bytearray to bag, " +
-                    "caught IOException <" + e.getMessage() + ">",
-                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
-                    mLog);
-        
-            return null;
-        }        
-    }
-
-    @Override
-    public String bytesToCharArray(byte[] b) {
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
-        try {
-            return DataReaderWriter.bytesToCharArray(dis);
-        } catch (IOException e) {
-            LogUtils.warn(this, "Unable to convert bytearray to chararray, " +
-                    "caught IOException <" + e.getMessage() + ">",
-                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
-                    mLog);
-        
-            return null;
-        }
-    }
-
-    @Override
-    public Double bytesToDouble(byte[] b) {
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
-        try {
-            return new Double(dis.readDouble());
-        } catch (IOException e) {
-            LogUtils.warn(this, "Unable to convert bytearray to double, " +
-                    "caught IOException <" + e.getMessage() + ">",
-                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
-                    mLog);
-        
-            return null;
-        }
-    }
-
-    @Override
-    public Float bytesToFloat(byte[] b) {
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
-        try {
-            return new Float(dis.readFloat());
-        } catch (IOException e) {
-            LogUtils.warn(this, "Unable to convert bytearray to float, " +
-                    "caught IOException <" + e.getMessage() + ">",
-                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
-                    mLog);
-            
-            return null;
-        }
-    }
-
-    @Override
-    public Integer bytesToInteger(byte[] b) {
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
-        try {
-            return Integer.valueOf(dis.readInt());
-        } catch (IOException e) {
-            LogUtils.warn(this, "Unable to convert bytearray to integer, " +
-                    "caught IOException <" + e.getMessage() + ">",
-                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
-                    mLog);
-        
-            return null;
-        }
-    }
-
-    @Override
-    public Long bytesToLong(byte[] b) {
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
-        try {
-            return Long.valueOf(dis.readLong());
-        } catch (IOException e) {
-            LogUtils.warn(this, "Unable to convert bytearray to long, " +
-                    "caught IOException <" + e.getMessage() + ">",
-                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
-                    mLog);
-        
-            return null;
-        }
-    }
-
-    @Override
-    public Map<String, Object> bytesToMap(byte[] b) {
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
-        try {
-            return DataReaderWriter.bytesToMap(dis);
-        } catch (IOException e) {
-            LogUtils.warn(this, "Unable to convert bytearray to map, " +
-                    "caught IOException <" + e.getMessage() + ">",
-                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
-                    mLog);
-        
-            return null;
-        }
-    }
-
-    @Override
-    public Tuple bytesToTuple(byte[] b, ResourceFieldSchema schema) {
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
-        try {
-            return DataReaderWriter.bytesToTuple(dis);
-        } catch (IOException e) {
-            LogUtils.warn(this, "Unable to convert bytearray to tuple, " +
-                    "caught IOException <" + e.getMessage() + ">",
-                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
-                    mLog);
-        
-            return null;
-        }
-    }
-
     public byte[] toBytes(DataBag bag) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(baos);
@@ -338,9 +272,37 @@ implements LoadCaster, StoreFuncInterfac
         return 42; 
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public LoadCaster getLoadCaster() {
-        return this;
+    public LoadCaster getLoadCaster() throws IOException {
+        if (caster == null) {
+            Class<LoadCaster> casterClass = null;
+            if (casterString!=null) {
+                ClassLoader cl = Thread.currentThread().getContextClassLoader();
+                try {
+                    // Try casterString as a fully qualified name
+                    casterClass = (Class<LoadCaster>)cl.loadClass(casterString);
+                } catch (ClassNotFoundException e) {
+                }
+                if (casterClass==null) {
+                    try {
+                        // Try casterString as in builtin
+                        casterClass = (Class<LoadCaster>)cl.loadClass("org.apache.pig.builtin." + casterString);
+                    } catch (ClassNotFoundException e) {
+                        throw new FrontendException("Cannot find LoadCaster class " + casterString, 1119, e); 
+                    }
+                }
+                try {
+                    caster = casterClass.newInstance();
+                } catch (Exception e) {
+                    throw new FrontendException("Cannot instantiate class " + casterString, 2259, e);
+                }
+            }
+            else {
+                caster = new UnImplementedLoadCaster();
+            }
+        }
+        return caster;
     }
 
     @Override

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1042526&r1=1042525&r2=1042526&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Dec  6 06:48:44 2010
@@ -34,6 +34,7 @@ import junit.framework.TestCase;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -165,7 +166,7 @@ public class TestEvalPipeline2 extends T
         pigServer.deleteFile(output);
         pigServer.store("a", output, BinStorage.class.getName());
 
-        pigServer.registerQuery("b = load '" + output + "' using BinStorage() "
+        pigServer.registerQuery("b = load '" + output + "' using BinStorage('Utf8StorageConverter') "
                 + "as (name: int, age: int, gpa: float, lage: long, dgpa: double);");
         
         Iterator<Tuple> it = pigServer.openIterator("b");
@@ -180,35 +181,27 @@ public class TestEvalPipeline2 extends T
         //tuple 1 
         tup = it.next();
 
-        
-        //1634952294 is integer whose  binary represtation is same as that of "asdf"
-        // other columns are returning null because they have less than num of bytes
-        //expected for the corresponding numeric type's binary respresentation.
-        assertTrue( (Integer)tup.get(0) == 1634952294); 
-        assertTrue(tup.get(1) == null);
-        assertTrue(tup.get(2) == null);
-        assertTrue(tup.get(3) == null);
-        assertTrue(tup.get(4) == null);
+        assertTrue((Integer)tup.get(0) == null); 
+        assertTrue((Integer)tup.get(1) == 12);
+        assertTrue((Float)tup.get(2) == 1.1F);
+        assertTrue((Long)tup.get(3) == 231L);
+        assertTrue((Double)tup.get(4) == 234.0);
         
         //tuple 2 
         tup = it.next();
         assertTrue(tup.get(0) == null);
-        assertTrue( (Integer)tup.get(1) == 825373489);
-        assertTrue( (Float)tup.get(2) == 2.5931501E-9F);
-        assertTrue( (Long)tup.get(3) == 3544952156018063160L);
-        assertTrue( (Double)tup.get(4) == 1.030084341992388E-71);
+        assertTrue((Integer)tup.get(1) == 1231);
+        assertTrue((Float)tup.get(2) == 123.4F);
+        assertTrue((Long)tup.get(3) == 12345678L);
+        assertTrue((Double)tup.get(4) == 1234.567);
         
         //tuple 3
         tup = it.next();
-        // when byte array is larger than required num of bytes for given number type
-        // it uses the required bytes from beginging of byte array for conversion
-        // for example 1634952294 corresponds to first 4 byptes of binary string correspnding to
-        // asdff
-        assertTrue((Integer)tup.get(0) == 1634952294);
-        assertTrue( (Integer)tup.get(1) == 825373490);
-        assertTrue( (Float)tup.get(2) == 2.5350009E-9F);
-        assertTrue( (Long)tup.get(3) == 3544952156018063160L);
-        assertTrue( (Double)tup.get(4) == 1.0300843656201408E-71);
+        assertTrue(tup.get(0) == null);
+        assertTrue((Integer)tup.get(1) == 1232123);
+        assertTrue((Float)tup.get(2) == 1.45345F);
+        assertTrue((Long)tup.get(3) == 123456789L);
+        assertTrue((Double)tup.get(4) == 1.234567899E8);
         
         Util.deleteFile(cluster, "table");
     }
@@ -948,4 +941,29 @@ public class TestEvalPipeline2 extends T
         
         assertFalse(iter.hasNext());
     }
+    
+    // See PIG-1732
+    @Test
+    public void testBinStorageByteCast() throws Exception{
+        String[] input1 = {
+                "1\t2\t3",
+        };
+        
+        Util.createInputFile(cluster, "table_testBinStorageByteCast", input1);
+        pigServer.registerQuery("a = load 'table_testBinStorageByteCast' as (a0, a1, a2);");
+        pigServer.store("a", "table_testBinStorageByteCast.temp", BinStorage.class.getName());
+        
+        pigServer.registerQuery("a = load 'table_testBinStorageByteCast.temp' using BinStorage() as (a0, a1, a2);");
+        pigServer.registerQuery("b = foreach a generate (long)a0;");
+        
+        try {
+            pigServer.openIterator("b");
+        } catch (Exception e) {
+            PigException pe = LogUtils.getPigException(e);
+            assertTrue(pe.getErrorCode()==1118);
+            return;
+        }
+        
+        fail();
+    }
 }