You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/01/27 03:45:07 UTC

svn commit: r1063964 - in /pig/branches/branch-0.8: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig/test/

Author: daijy
Date: Thu Jan 27 02:45:07 2011
New Revision: 1063964

URL: http://svn.apache.org/viewvc?rev=1063964&view=rev
Log:
PIG-1813: Pig 0.8 throws ERROR 1075 while trying to refer a map in the result of eval udf.Works with 0.7

Modified:
    pig/branches/branch-0.8/CHANGES.txt
    pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
    pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1063964&r1=1063963&r2=1063964&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Thu Jan 27 02:45:07 2011
@@ -213,6 +213,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1813: Pig 0.8 throws ERROR 1075 while trying to refer a map in the result of eval udf.Works with 0.7 (daijy)
+
 PIG-1776: changing statement corresponding to alias after explain , then
   doing dump gives incorrect result (thejas)
 

Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1063964&r1=1063963&r2=1063964&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Jan 27 02:45:07 2011
@@ -229,13 +229,6 @@ public class POUserFunc extends Expressi
                     result.result = func.exec((Tuple) result.result);
                     }
                 }
-                if(resultType == DataType.BYTEARRAY) {
-                    // This is needed if some EvalFunc has default datatype as bytearray and returns arbitrary objects
-                    // We see such behavior in case of script EvalFunc, which is used to run udfs in scripting langs
-                    if(result.result != null && DataType.findType(result.result) != DataType.BYTEARRAY) {
-                        result.result = new DataByteArray(result.result.toString().getBytes());
-                    }
-                }
                 return result;
             }
                         

Modified: pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1063964&r1=1063963&r2=1063964&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Thu Jan 27 02:45:07 2011
@@ -74,6 +74,7 @@ public class LOGenerate extends LogicalR
             
             LogicalFieldSchema fieldSchema = null;
             
+            // schema of expression, or inner schema if expression is flattenned
             LogicalSchema expSchema = null;
             
             if (exp.getFieldSchema()!=null) {
@@ -81,7 +82,7 @@ public class LOGenerate extends LogicalR
                 fieldSchema = exp.getFieldSchema().deepCopy();
                 
                 expSchema = new LogicalSchema();
-                if (fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG) {
+                if (!flattenFlags[i] || (fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG)) {
                     // if type is primitive, just add to schema
                     if (fieldSchema!=null)
                         expSchema.addField(fieldSchema);
@@ -93,34 +94,30 @@ public class LOGenerate extends LogicalR
                         expSchema = null;
                     }
                     else {
-                        // if flatten is set, set schema of tuple field to this schema
+                        // if we come here, we get a BAG/Tuple with flatten, extract inner schema of the tuple as expSchema
                         List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
-                        if (flattenFlags[i]) {
-                            if (fieldSchema.type == DataType.BAG) {
-                                // if it is bag of tuples, get the schema of tuples
-                                if (fieldSchema.schema!=null) {
-                                    if (fieldSchema.schema.isTwoLevelAccessRequired()) {
-                                        //  assert(fieldSchema.schema.size() == 1 && fieldSchema.schema.getField(0).type == DataType.TUPLE)
-                                        innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields();
-                                    } else {
-                                        innerFieldSchemas = fieldSchema.schema.getFields();
-                                    }
-                                    for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
-                                        fs.alias = fieldSchema.alias + "::" + fs.alias;
-                                    }
+                        if (fieldSchema.type == DataType.BAG) {
+                            // if it is bag of tuples, get the schema of tuples
+                            if (fieldSchema.schema!=null) {
+                                if (fieldSchema.schema.isTwoLevelAccessRequired()) {
+                                    //  assert(fieldSchema.schema.size() == 1 && fieldSchema.schema.getField(0).type == DataType.TUPLE)
+                                    innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields();
+                                } else {
+                                    innerFieldSchemas = fieldSchema.schema.getFields();
                                 }
-                            } else { // DataType.TUPLE
-                                innerFieldSchemas = fieldSchema.schema.getFields();
                                 for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
                                     fs.alias = fieldSchema.alias + "::" + fs.alias;
                                 }
                             }
-                            
-                            for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
-                                expSchema.addField(fs);
+                        } else { // DataType.TUPLE
+                            innerFieldSchemas = fieldSchema.schema.getFields();
+                            for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
+                                fs.alias = fieldSchema.alias + "::" + fs.alias;
+                            }
                         }
-                        else
-                            expSchema.addField(fieldSchema);
+                        
+                        for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
+                            expSchema.addField(fs);
                     }
                 }
             }

Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1063964&r1=1063963&r2=1063964&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java Thu Jan 27 02:45:07 2011
@@ -34,10 +34,12 @@ import junit.framework.TestCase;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
+import org.apache.pig.EvalFunc;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultBagFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -1064,4 +1066,61 @@ public class TestEvalPipeline2 extends T
         assertTrue(t.toString().contains("(2,3,1,2)"));
         assertFalse(iter.hasNext());
     }
+    
+    public static class BagGenerateNoSchema extends EvalFunc<DataBag> {
+        @Override
+        public DataBag exec(Tuple input) throws IOException {
+            DataBag bg = DefaultBagFactory.getInstance().newDefaultBag();
+            bg.add(input);
+            return bg;
+        }
+    }
+    
+    // See PIG-1813
+    @Test
+    public void testUDFNoSchemaPropagate1() throws Exception{
+        String[] input1 = {
+                "[key#1,key2#2]",
+                "[key#2,key2#3]",
+        };
+        
+        Util.createInputFile(cluster, "table_testUDFNoSchemaPropagate1", input1);
+
+        pigServer.registerQuery("a = load 'table_testUDFNoSchemaPropagate1' as (a0:map[]);");
+        pigServer.registerQuery("b = foreach a generate " + BagGenerateNoSchema.class.getName() + "(*) as b0;");
+        pigServer.registerQuery("c = foreach b generate flatten(IdentityColumn(b0));");
+        pigServer.registerQuery("d = foreach c generate $0#'key';");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("d");
+        
+        Tuple t = iter.next();
+        assertTrue(t.toString().contains("(1)"));
+        t = iter.next();
+        assertTrue(t.toString().contains("(2)"));
+        assertFalse(iter.hasNext());
+    }
+    
+    // See PIG-1813
+    @Test
+    public void testUDFNoSchemaPropagate2() throws Exception{
+        String[] input1 = {
+                "[key#1,key2#2]",
+                "[key#2,key2#3]",
+        };
+        
+        Util.createInputFile(cluster, "table_testUDFNoSchemaPropagate2", input1);
+
+        pigServer.registerQuery("a = load 'table_testUDFNoSchemaPropagate2' as (a0:map[]);");
+        pigServer.registerQuery("b = foreach a generate flatten(" + BagGenerateNoSchema.class.getName() + "(*)) as b0;");
+        pigServer.registerQuery("c = foreach b generate IdentityColumn(b0);");
+        pigServer.registerQuery("d = foreach c generate $0#'key';");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("d");
+        
+        Tuple t = iter.next();
+        assertTrue(t.toString().contains("(1)"));
+        t = iter.next();
+        assertTrue(t.toString().contains("(2)"));
+        assertFalse(iter.hasNext());
+    }
 }