You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/10/28 02:48:15 UTC

svn commit: r1028153 - in /pig/trunk: ./ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/schema/ test/org/apache/pig/test/

Author: thejas
Date: Thu Oct 28 00:48:15 2010
New Revision: 1028153

URL: http://svn.apache.org/viewvc?rev=1028153&view=rev
Log:
PIG-1694: union-onschema projects null schema at parsing stage for some queries

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    pig/trunk/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java
    pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1028153&r1=1028152&r2=1028153&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 28 00:48:15 2010
@@ -213,6 +213,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1694: union-onschema projects null schema at parsing stage for some queries (thejas)
+
 PIG-1685: Pig is unable to handle counters for glob paths ? (daijy)
 
 PIG-1683: New logical plan: Nested foreach plan fail if one inner alias is refered more than once (daijy)

Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=1028153&r1=1028152&r2=1028153&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java Thu Oct 28 00:48:15 2010
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.PigException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
@@ -61,30 +62,42 @@ public class LOUnion extends RelationalO
             log.debug("Number of predecessors in the graph: " + s.size());
             try {
                 Iterator<LogicalOperator> iter = s.iterator();
-                LogicalOperator op = iter.next();
-                if (null == op) {
-                    int errCode = 1006;
-                    String msg = "Could not find operator in plan";
-                    throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
-                }
-                if (op.getSchema()!=null)
-                    mSchema = new Schema(op.getSchema());
-                else
-                    mSchema = null;
-                while(iter.hasNext()) {
-                    op = iter.next();
-                    if(null != mSchema) {
-                        mSchema = mSchema.merge(op.getSchema(), false);
-                    } else {
+                //create merged schema
+                if(isOnSchema){
+                    // this function can be called in parser, before
+                    // the foreach statements to project previous columns as 
+                    //per merged schema are setup. So can't rely just on 
+                    //regular union's schema merge logic
+                    mSchema = createMergedSchemaOnAlias(iter);
+                }else{
+                    //schema for regular union
+                    LogicalOperator op = iter.next();
+                    if (null == op) {
+                        int errCode = 1006;
+                        String msg = "Could not find operator in plan";
+                        throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
+                    }
+                    if (op.getSchema()!=null)
+                        mSchema = new Schema(op.getSchema());
+                    else
                         mSchema = null;
-                        break;
+                    while(iter.hasNext()) {
+                        op = iter.next();
+                        if(null != mSchema) {
+                            mSchema = mSchema.merge(op.getSchema(), false);
+                        } else {
+                            mSchema = null;
+                            break;
+                        }
                     }
                 }
+                
+                // set fieldschema parents
                 if(null != mSchema) {
                     for(Schema.FieldSchema fs: mSchema.getFields()) {
                         iter = s.iterator();
                         while(iter.hasNext()) {
-                            op = iter.next();
+                            LogicalOperator op = iter.next();
                             Schema opSchema = op.getSchema();
                             if(null != opSchema) {
                                 for(Schema.FieldSchema opFs: opSchema.getFields()) {
@@ -106,6 +119,49 @@ public class LOUnion extends RelationalO
         return mSchema;
     }
 
+    /**
+     * create schema for union-onschema
+     * @param iter
+     * @return
+     * @throws FrontendException
+     */
+    private Schema createMergedSchemaOnAlias(Iterator<LogicalOperator> iter)
+    throws FrontendException {
+        ArrayList<Schema> schemas = new ArrayList<Schema>();
+        while(iter.hasNext()){
+            LogicalOperator lop = iter.next();
+            Schema sch;
+            sch = lop.getSchema();
+            if(sch == null)                     
+            {                         
+                String msg = "Schema of relation " + lop.getAlias()
+                + " is null." 
+                + " UNION ONSCHEMA cannot be used with relations that"
+                + " have null schema.";
+                throw new FrontendException(msg, 1116, PigException.INPUT);
+            }
+            for(Schema.FieldSchema fs : sch.getFields()){
+                if(fs.alias == null){
+                    String msg = "Schema of relation " + lop.getAlias()
+                    + " has a null fieldschema for column(s). Schema :" +
+                    sch;
+                    throw new FrontendException(msg, 1116, PigException.INPUT);
+                }
+            }
+            schemas.add(sch);
+        }
+        //create the merged schema
+        Schema mergedSchema ;
+        try {
+            mergedSchema = Schema.mergeSchemasByAlias(schemas);   
+        }catch(SchemaMergeException e)                 {
+            String msg = "Error merging schemas for union operator : "
+                + e.getMessage();
+            throw new FrontendException(msg, 1116, PigException.INPUT, e);
+        }
+        return mergedSchema;
+    }
+
     @Override
     public String name() {
         return getAliasString() + "Union " + mKey.scope + "-" + mKey.id;

Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java?rev=1028153&r1=1028152&r2=1028153&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java Thu Oct 28 00:48:15 2010
@@ -70,40 +70,12 @@ public class UnionOnSchemaSetter extends
         validator.validate(mPlan, collector);
         List<LogicalOperator> preds = mPlan.getPredecessors(loUnion);
 
-        //validate each input schema, and collect them in the ArrayList
-        ArrayList<Schema> schemas = new ArrayList<Schema>(preds.size());
-        for(LogicalOperator lop : preds){
-            Schema sch;
-            try {
-                sch = lop.getSchema();
-            } catch (FrontendException e) {
-                throw new UnionOnSchemaSetException("Error getting schema from logical operator");
-            }
-            if(sch == null)                     
-            {                         
-                String msg = "Schema of relation " + lop.getAlias()
-                + " is null." 
-                + " UNION ONSCHEMA cannot be used with relations that"
-                + " have null schema.";
-                throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT);
-            }
-            for(Schema.FieldSchema fs : sch.getFields()){
-                if(fs.alias == null){
-                    String msg = "Schema of relation " + lop.getAlias()
-                    + " has a null fieldschema for column(s). Schema :" +
-                    sch;
-                    throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT);
-                }
-            }
-            schemas.add(sch);
-        }
-        
         //create the merged schema
         Schema mergedSchema ;
         try {
-            mergedSchema = Schema.mergeSchemasByAlias(schemas);   
-        }catch(SchemaMergeException e)                 {
-            String msg = "Error merging schemas for union operator : "
+            mergedSchema = loUnion.getSchema();
+        }catch(FrontendException e)                 {
+            String msg = "Error creating merged schemas for union-onschema operator : "
                 + e.getMessage();
             throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT, e);
         }

Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=1028153&r1=1028152&r2=1028153&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu Oct 28 00:48:15 2010
@@ -871,6 +871,8 @@ public class Schema implements Serializa
         return fs;
     }
     
+    
+    
     /**
      * Given a field number, find the associated FieldSchema.
      *
@@ -1676,6 +1678,12 @@ public class Schema implements Serializa
             checkNullAlias(fs1, schema1);
             FieldSchema fs2 = getFieldSubNameMatchThrowSchemaMergeException(schema2,fs1.alias);
             if(fs2 != null){
+                if(schema2colsAdded.contains(fs2)){
+                    // alias corresponds to multiple fields in schema1,
+                    // just do a lookup on
+                    // schema1 , that will throw the appropriate error.
+                    getFieldSubNameMatchThrowSchemaMergeException(schema1, fs2.alias);
+                }
                 schema2colsAdded.add(fs2);
             }
             FieldSchema mergedFs = mergeFieldSchemaFirstLevelSameAlias(fs1,fs2);

Modified: pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1028153&r1=1028152&r2=1028153&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Thu Oct 28 00:48:15 2010
@@ -120,6 +120,9 @@ public class TestUnionOnSchema  {
             + "u = union onschema l1, l2;"
         ; 
         Util.registerMultiLineQuery(pig, query);
+        Schema expectedSch = Util.getSchemaFromString("i: int, j: int");
+        Schema sch = pig.dumpSchema("u");
+        assertEquals("Checking expected schema",sch, expectedSch);
         Iterator<Tuple> it = pig.openIterator("u");
         
         List<Tuple> expectedRes = 
@@ -136,6 +139,72 @@ public class TestUnionOnSchema  {
     
     
     /**
+     * Test UNION ONSCHEMA with operations after the union
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaFilter() throws IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, x : int);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+            + "u = union onschema l1, l2;"
+            + "fil = filter u by i == 5 and (x is null or x != 1);"
+        ; 
+        Util.registerMultiLineQuery(pig, query);
+        
+        Schema sch = pig.dumpSchema("fil");
+        Schema expectedSch = Util.getSchemaFromString("i: int, x: int, j: int");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        
+
+        Iterator<Tuple> it = pig.openIterator("fil");
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(5,null,3)",
+                            "(5,3,null)"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    
+    /**
+     * Test UNION ONSCHEMA with operations after the union
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaSuccOps() throws IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (x : int, y : int);"
+            + "u = union onschema l1, l2;"
+            + "o = order u by i desc;"
+            + "lim = limit o 2;"
+            + "fil = filter lim by i == 5 and y is null;"
+        ; 
+        Util.registerMultiLineQuery(pig, query);        
+        
+        Schema sch = pig.dumpSchema("fil");
+        Schema expectedSch = Util.getSchemaFromString("i: int, x: int, y: int");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        
+
+        Iterator<Tuple> it = pig.openIterator("fil");
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(5,null,null)",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    /**
      * Test UNION ONSCHEMA with cast from bytearray to another type
      * @throws IOException
      * @throws ParseException