You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/10/19 00:45:05 UTC

svn commit: r1024052 - in /pig/branches/branch-0.8: ./ src/org/apache/pig/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/logicalLayer/validators/ test/org/apache/pig/test/

Author: thejas
Date: Mon Oct 18 22:45:04 2010
New Revision: 1024052

URL: http://svn.apache.org/viewvc?rev=1024052&view=rev
Log:
PIG-1673: query with consecutive union-onschema statement errors out

Added:
    pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java
    pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/UnionOnSchemaSetException.java
Modified:
    pig/branches/branch-0.8/CHANGES.txt
    pig/branches/branch-0.8/src/org/apache/pig/PigServer.java
    pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java

Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Mon Oct 18 22:45:04 2010
@@ -200,6 +200,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1673: query with consecutive union-onschema statement errors out (thejas)
+
 PIG-1653: Scripting UDF fails if the path to script is an absolute path (daijy)
 
 PIG-1669: PushUpFilter fail when filter condition contains scalar (daijy)

Modified: pig/branches/branch-0.8/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/PigServer.java Mon Oct 18 22:45:04 2010
@@ -81,6 +81,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.logicalLayer.PlanSetter;
 import org.apache.pig.impl.logicalLayer.ScalarFinder;
+import org.apache.pig.impl.logicalLayer.UnionOnSchemaSetter;
 import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
@@ -1312,6 +1313,9 @@ public class PigServer {
         PlanSetter ps = new PlanSetter(lp);
         ps.visit();
         
+        UnionOnSchemaSetter setUnionOnSchema = new UnionOnSchemaSetter(lp, pigContext);
+        setUnionOnSchema.visit();
+        
         // run through validator
         CompilationMessageCollector collector = new CompilationMessageCollector() ;
         boolean isBeforeOptimizer = true;

Modified: pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/LOUnion.java Mon Oct 18 22:45:04 2010
@@ -18,29 +18,27 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.Set;
+import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DataType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 public class LOUnion extends RelationalOperator {
 
     private static final long serialVersionUID = 2L;
     private static Log log = LogFactory.getLog(LOUnion.class);
+    private boolean isOnSchema = false;
     
     /**
      * @param plan
@@ -294,4 +292,18 @@ public class LOUnion extends RelationalO
         super.pruneColumns(columns);
         return true;
     }
+
+    /**
+     * @param isOnSchema the isOnSchema to set
+     */
+    public void setOnSchema(boolean isOnSchema) {
+        this.isOnSchema = isOnSchema;
+    }
+
+    /**
+     * @return the isOnSchema
+     */
+    public boolean isOnSchema() {
+        return isOnSchema;
+    }
 }

Added: pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java?rev=1024052&view=auto
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java (added)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/UnionOnSchemaSetter.java Mon Oct 18 22:45:04 2010
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
+import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
+import org.apache.pig.impl.logicalLayer.validators.UnionOnSchemaSetException;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanValidationException;
+
+/**
+ * A visitor that modifies the logical plan (if necessary) for union-onschema
+ * functionality. It runs logical plan validator so that the correct schema
+ * of its inputs is available. It inserts foreach statements in its input
+ * if the input operator schema does not match the schema created by 
+ * merging all input schemas 
+ * 
+ */
+public class UnionOnSchemaSetter extends LOVisitor {
+
+    private PigContext pigContext;
+
+
+    public UnionOnSchemaSetter(LogicalPlan plan, PigContext pigContext) {
+        super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+        this.pigContext = pigContext;
+    }
+
+
+    public void visit(LOUnion loUnion) throws PlanValidationException, UnionOnSchemaSetException {
+        if(!loUnion.isOnSchema()) {
+            //Not union-onschema, nothing to be done
+            return;
+        }
+        // run through validator first on inputs so that the schemas have the right
+        //types for columns. It will run TypeCheckingValidator as well.
+        // The compilation messages will be logged when validation is
+        // done from PigServer, so not doing it here
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        boolean isBeforeOptimizer = true;
+
+
+        LogicalPlanValidationExecutor validator = 
+            new LogicalPlanValidationExecutor(mPlan, pigContext, isBeforeOptimizer);
+        validator.validate(mPlan, collector);
+        List<LogicalOperator> preds = mPlan.getPredecessors(loUnion);
+
+        //validate each input schema, and collect them in the ArrayList
+        ArrayList<Schema> schemas = new ArrayList<Schema>(preds.size());
+        for(LogicalOperator lop : preds){
+            Schema sch;
+            try {
+                sch = lop.getSchema();
+            } catch (FrontendException e) {
+                throw new UnionOnSchemaSetException("Error getting schema from logical operator");
+            }
+            if(sch == null)                     
+            {                         
+                String msg = "Schema of relation " + lop.getAlias()
+                + " is null." 
+                + " UNION ONSCHEMA cannot be used with relations that"
+                + " have null schema.";
+                throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT);
+            }
+            for(Schema.FieldSchema fs : sch.getFields()){
+                if(fs.alias == null){
+                    String msg = "Schema of relation " + lop.getAlias()
+                    + " has a null fieldschema for column(s). Schema :" +
+                    sch;
+                    throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT);
+                }
+            }
+            schemas.add(sch);
+        }
+        
+        //create the merged schema
+        Schema mergedSchema ;
+        try {
+            mergedSchema = Schema.mergeSchemasByAlias(schemas);   
+        }catch(SchemaMergeException e)                 {
+            String msg = "Error merging schemas for union operator : "
+                + e.getMessage();
+            throw new UnionOnSchemaSetException(msg, 1116, PigException.INPUT, e);
+        }
+
+
+        //create a user defined schema list for use in LOForeach
+        // using merged schema
+        ArrayList<Schema> mergedSchemaList = new ArrayList<Schema>();
+        for(Schema.FieldSchema fs : mergedSchema.getFields()){
+            // Use NULL datatype because the type will be set by the TypeChecking
+            // visitors
+            mergedSchemaList.add(
+                    new Schema(new Schema.FieldSchema(fs.alias, DataType.NULL))
+            );
+        }
+
+        // add a foreach for inputs that don't match mergedSchema, projecting
+        // null for columns that don't exist in the input
+        for(LogicalOperator lop : preds)                 
+        {                     
+            try {
+                if(! lop.getSchema().equals(mergedSchema))
+                {
+                    //the mergedSchema is different from this operators schema
+                    // so add a foreach to project columns appropriately
+                    int mergeSchSz = mergedSchema.size();
+                    ArrayList<LogicalPlan> generatePlans =
+                        new ArrayList<LogicalPlan>(mergeSchSz);
+                    ArrayList<Boolean> flattenList =
+                        new ArrayList<Boolean>(mergeSchSz);
+
+                    String scope = loUnion.getOperatorKey().getScope();
+                    for(Schema.FieldSchema fs : mergedSchema.getFields()) { 
+                        LogicalPlan projectPlan = new LogicalPlan();
+                        Schema inpSchema = lop.getSchema();
+                        flattenList.add(Boolean.FALSE);
+
+                        int inpPos = inpSchema.getPositionSubName(fs.alias);
+
+                        LogicalOperator columnProj = null;
+                        boolean isCastNeeded = false;
+                        if(inpPos == -1){   
+                            //the column is not present in schema of this input,
+                            // so project null
+                            columnProj =
+                                new LOConst(mPlan, getNextId(scope), null);
+                            // cast is necessary if the type in schema is
+                            // not a BYTEARRAY
+                            if(fs.type != DataType.BYTEARRAY){
+                                isCastNeeded = true;
+                            }
+                        }else {
+                            //project the column from input
+                            columnProj = 
+                                new LOProject(projectPlan,
+                                        new OperatorKey(
+                                                scope, 
+                                                NodeIdGenerator.getGenerator().getNextNodeId(scope)
+                                        ),
+                                        lop, inpPos
+                                );
+
+                            //cast is needed if types are different.    
+                            //compatibility of types has already been checked
+                            //during creation of mergedSchema
+                            Schema.FieldSchema inpFs = inpSchema.getFieldSubNameMatch(fs.alias);
+                            if(inpFs.type != fs.type)
+                                isCastNeeded = true;
+                        }
+                        projectPlan.add(columnProj);
+
+                        //add a LOCast if necessary
+                        if(isCastNeeded){
+                            LOCast loCast = new LOCast(
+                                    projectPlan,
+                                    getNextId(scope),
+                                    fs.type
+                            );
+                            loCast.setFieldSchema(fs);
+                            projectPlan.add(loCast);
+                            projectPlan.connect(columnProj, loCast);
+                        }
+                        generatePlans.add(projectPlan);
+
+                    }
+                    LogicalOperator foreach = new LOForEach(
+                            mPlan,
+                            getNextId(scope),
+                            generatePlans, flattenList,
+                            mergedSchemaList
+                    );
+                    mPlan.add(foreach);
+                    mPlan.insertBetween(lop, foreach, loUnion);
+                }
+            } 
+            catch (FrontendException e) {
+                String msg = "Error adding union operator " + loUnion.getAlias()
+                   + ":" + e.getMessage();
+                UnionOnSchemaSetException pe = new UnionOnSchemaSetException(msg);
+                pe.initCause(e);
+                throw pe;  
+            }
+            
+        }
+        
+    }
+
+
+    private OperatorKey getNextId(String scope) {
+        return new OperatorKey(
+                scope, 
+                NodeIdGenerator.getGenerator().getNextNodeId(scope)
+        );
+    }
+
+}

Modified: pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Oct 18 22:45:04 2010
@@ -2261,158 +2261,12 @@ LogicalOperator UnionClause(LogicalPlan 
                 // to ParseException. Otherwise, if any exception than ParseException
                 // is thrown , the generated parse code tries to cast
                 //the exception to Error, resulting in a misleading error message
-                
-                if(isOnSchema) {
-                    // run through validator first on inputs so that the schemas have the right
-                    //types for columns. It will run TypeCheckingValidator as well.
-                    // The compilation messages will be logged when validation is
-                    // done from PigServer, so not doing it here
-                    CompilationMessageCollector collector = new CompilationMessageCollector() ;
-                    boolean isBeforeOptimizer = true;
-                    LogicalPlanValidationExecutor validator = 
-                        new LogicalPlanValidationExecutor(lp, pigContext, isBeforeOptimizer);
-                    validator.validate(lp, collector);
-                }
-                
-                LogicalOperator union = new LOUnion(lp, new OperatorKey(scope, getNextId()));
+                LOUnion union = new LOUnion(lp, new OperatorKey(scope, getNextId()));
+                union.setOnSchema(isOnSchema);
                 lp.add(union);
-                log.debug("Added operator " + union.getClass().getName() + " to the logical plan");
-
-                if(isOnSchema)             
-                {  // this is UNION ONSCHEMA, find merged schema 
-                    // and  (if necessary) add foreach to align columns
-                    
-                    
-                    ArrayList<Schema> schemas = new ArrayList<Schema>(inputs.size());
-                    for(LogicalOperator lop : inputs){
-                        Schema sch = lop.getSchema();
-                        if(sch == null)                     
-                        {                         
-                            String msg = "Schema of relation " + lop.getAlias()
-                            + " is null." 
-                            + " UNION ONSCHEMA cannot be used with relations that"
-                            + " have null schema.";
-                            throw new ParseException(msg);
-                        }
-                        for(Schema.FieldSchema fs : sch.getFields()){
-                            if(fs.alias == null){
-                                String msg = "Schema of relation " + lop.getAlias()
-                                + " has a null fieldschema for column(s). Schema :" +
-                                sch;
-                                throw new ParseException(msg);
-                            }
-                        }
-                        schemas.add(sch);
-                    }
-                    Schema mergedSchema ;
-                    try {
-                        mergedSchema = Schema.mergeSchemasByAlias(schemas);   
-                    }catch(SchemaMergeException e)                 {
-                        String msg = "Error merging schemas for union operator : "
-                            + e.getMessage();
-                        ParseException pe = new ParseException(msg);
-                        pe.initCause(e);
-                        throw pe;                 
-                    }
 
-                    // add a foreach for inputs that don't match mergedSchema, projecting
-                    // null for columns that don't exist in the input
-                    ArrayList<LogicalOperator> newInputs =
-                        new ArrayList<LogicalOperator>(inputs.size());
-                    
-                    //create a user defined schema list for use in LOForeach
-                    // using merged schema
-                    ArrayList<Schema> mergedSchemaList = new ArrayList<Schema>();
-                    for(Schema.FieldSchema fs : mergedSchema.getFields()){
-                        mergedSchemaList.add(new Schema(new Schema.FieldSchema(fs.alias, DataType.NULL)));
-                    }
-                    
-                    
-                    for(LogicalOperator lop : inputs)                 
-                    {                     
-                        if(! lop.getSchema().equals(mergedSchema))
-                        {
-                            //the mergedSchema is different from this operators schema
-                            // so add a foreach to project columns appropriately
-                            int mergeSchSz = mergedSchema.size();
-                            ArrayList<LogicalPlan> generatePlans =
-                                new ArrayList<LogicalPlan>(mergeSchSz);
-                            ArrayList<Boolean> flattenList =
-                                new ArrayList<Boolean>(mergeSchSz);
-
-                            for(Schema.FieldSchema fs : mergedSchema.getFields()) { 
-                                LogicalPlan projectPlan = new LogicalPlan();
-                                Schema inpSchema = lop.getSchema();
-                                flattenList.add(Boolean.FALSE);
-
-                                int inpPos = inpSchema.getPositionSubName(fs.alias);
-
-                                LogicalOperator columnProj = null;
-                                boolean isCastNeeded = false;
-                                if(inpPos == -1){   
-                                    //the column is not present in schema of this input,
-                                    // so project null
-                                    columnProj =
-                                        new LOConst(lp,
-                                                new OperatorKey(scope, getNextId()),
-                                                null
-                                        );
-                                    // cast is necessary if the type in schema is
-                                    // not a BYTEARRAY
-                                    if(fs.type != DataType.BYTEARRAY){
-                                        isCastNeeded = true;
-                                    }
-                                }else {
-                                    //project the column from input
-                                    columnProj = 
-                                        new LOProject(projectPlan,
-                                                new OperatorKey(scope, getNextId()),
-                                                lop, inpPos
-                                        );
-
-                                    //cast is needed if types are different.    
-                                    //compatibility of types has already been checked
-                                    //during creation of mergedSchema
-                                    Schema.FieldSchema inpFs = inpSchema.getFieldSubNameMatch(fs.alias);
-                                    if(inpFs.type != fs.type)
-                                        isCastNeeded = true;
-                                }
-                                projectPlan.add(columnProj);
-
-                                //add a LOCast if necessary
-                                if(isCastNeeded){
-                                    LOCast loCast = new LOCast(projectPlan,
-                                            new OperatorKey(scope, getNextId()),
-                                            fs.type
-                                    );
-                                    loCast.setFieldSchema(fs);
-                                    projectPlan.add(loCast);
-                                    projectPlan.connect(columnProj, loCast);
-                                }
-                                generatePlans.add(projectPlan);
-
-                            }
-                            LogicalOperator foreach = new LOForEach(lp,
-                                    new OperatorKey(scope, getNextId()),
-                                    generatePlans, flattenList,
-                                    mergedSchemaList
-                            );
-                            lp.add(foreach);
-                            lp.connect(lop, foreach);
-                            newInputs.add(foreach);
-                        }else {
-                            // schema of input is same as mergedSchema,
-                            //no additional foreach is required
-                            newInputs.add(lop);                     
-                        }
-
-                    }
-                    // use newInputs as the inputs for union
-                    inputs = newInputs;     
-                }
-                
+                log.debug("Added operator " + union.getClass().getName() + " to the logical plan");
 
-                
                 for (LogicalOperator lop: inputs) {
                     lp.connect(lop, union);
                     log.debug("Connected union input operator " +
@@ -2423,10 +2277,6 @@ LogicalOperator UnionClause(LogicalPlan 
                 log.trace("Exiting UnionClause");
                 return union;
             }
-            catch(ParseException e){
-                // its already a ParseException, just throw it.
-                throw e;
-            }
             catch(Exception e){
                 ParseException pe = new ParseException();
                 pe.initCause(e);

Modified: pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Mon Oct 18 22:45:04 2010
@@ -1892,8 +1892,10 @@ public class TypeCheckingVisitor extends
             throw new TypeCheckerException(msg, errCode, PigException.INPUT, fee) ;
         }
 
-        // Do cast insertion only if we are typed
-        if (schema != null) {
+        // Do cast insertion only if we are typed 
+        // and if its not union-onschema. In case of union-onschema the
+        // foreach with cast is added in UnionOnSchemaSetter
+        if (schema != null && !u.isOnSchema()) {
             // Insert casting to inputs if necessary
             for (int i=0; i< inputs.size() ;i++) {
                 LOForEach insertedOp

Added: pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/UnionOnSchemaSetException.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/UnionOnSchemaSetException.java?rev=1024052&view=auto
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/UnionOnSchemaSetException.java (added)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/validators/UnionOnSchemaSetException.java Mon Oct 18 22:45:04 2010
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.validators;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+public class UnionOnSchemaSetException extends VisitorException {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Create a new TypeCheckerException with null as the error message.
+     */
+    public UnionOnSchemaSetException() {
+        super();
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     */
+    public UnionOnSchemaSetException(String message) {
+        super(message);
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified cause.
+     *
+     * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+     */
+    public UnionOnSchemaSetException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+     */
+    public UnionOnSchemaSetException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     */
+    public UnionOnSchemaSetException(String message, int errCode) {
+        super(message, errCode);
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown. 
+     */
+    public UnionOnSchemaSetException(String message, int errCode, Throwable cause) {
+        super(message, errCode, cause);
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     */
+    public UnionOnSchemaSetException(String message, int errCode, byte errSrc) {
+        super(message, errCode, errSrc);
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source
+     * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown. 
+     */
+    public UnionOnSchemaSetException(String message, int errCode, byte errSrc,
+            Throwable cause) {
+        super(message, errCode, errSrc, cause);
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param retry - If the exception is retriable or not
+     */ 
+    public UnionOnSchemaSetException(String message, int errCode, boolean retry) {
+        super(message, errCode, retry);
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     * @param retry - If the exception is retriable or not
+     */
+    public UnionOnSchemaSetException(String message, int errCode, byte errSrc,
+            boolean retry) {
+        super(message, errCode, errSrc, retry);
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified message, error code, error source, retriable or not, detalied message for the developer and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     * @param retry - If the exception is retriable or not
+     * @param detailedMsg - The detailed message shown to the developer 
+     */
+    public UnionOnSchemaSetException(String message, int errCode, byte errSrc,
+            boolean retry, String detailedMsg) {
+        super(message, errCode, errSrc, retry, detailedMsg);
+    }
+
+    /**
+     * Create a new TypeCheckerException with the specified message, error code, error source, retriable or not, detalied message for the developer and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     * @param retry - If the exception is retriable or not
+     * @param detailedMsg - The detailed message shown to the developer 
+     * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+     */
+    public UnionOnSchemaSetException(String message, int errCode, byte errSrc,
+            boolean retry, String detailedMsg, Throwable cause) {
+        super(message, errCode, errSrc, retry, detailedMsg, cause);
+    }
+    
+}

Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1024052&r1=1024051&r2=1024052&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java Mon Oct 18 22:45:04 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -40,6 +41,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.LogUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -523,9 +525,11 @@ public class TestUnionOnSchema  {
         boolean foundEx = false;
         try{
             Util.registerMultiLineQuery(pig, query);
+            pig.dumpSchema("u");
         }catch(FrontendException e){
+            PigException pigEx = LogUtils.getPigException(e);
             foundEx = true;
-            if(!e.getMessage().contains(expectedErr)){
+            if(!pigEx.getMessage().contains(expectedErr)){
                 String msg = "Expected exception message matching '" 
                     + expectedErr + "' but got '" + e.getMessage() + "'" ;
                 fail(msg);
@@ -686,6 +690,60 @@ public class TestUnionOnSchema  {
                     });
         Util.checkQueryOutputsAfterSort(it, expectedRes);
     }
+    
+    
+    /**
+     * Test UNION ONSCHEMA with udf whose default type is different from
+     * final type - where udf is not in immediate input of union
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaUdfTypeEvolution2() throws IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query_prefix =
+            "  l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
+            + "  (i : int, c : chararray, j : int " 
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)}" 
+            +       ", t : tuple (tc1 : int, tc2 : chararray) );"
+            + " l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
+            + "  (i : int, c : chararray, j : int " 
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)}" 
+            +       ", t : tuple (tc1 : int, tc2 : chararray) );"
+            + "f1 = foreach l1 generate i, MAX(b.c1) as mx;"
+            + "f11 = foreach f1 generate i, mx;"
+            + "f2 = foreach l2 generate i, COUNT(b.c1) as mx;"
+            + "f22 = foreach f2 generate i, mx;"
+
+        ; 
+        String query = query_prefix  + "u = union onschema f11, f22;";
+        Util.registerMultiLineQuery(pig, query);
+        Schema sch = pig.dumpSchema("u");
+        Schema expectedSch = 
+            Util.getSchemaFromString("i: int, mx: long");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        
+        // verify schema for reverse order of relations as well
+        query = query_prefix  + "u = union onschema f22, f11;";
+        Util.registerMultiLineQuery(pig, query);
+        sch = pig.dumpSchema("u");
+        expectedSch = 
+            Util.getSchemaFromString("i: int, mx: long");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        
+        
+        Iterator<Tuple> it = pig.openIterator("u");
+        
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1,1L)",
+                            "(5,2L)",
+                            "(1,2L)",
+                            "(5,2L)"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
 
     /**
      * Udf that has schema of tuple column with no inner schema 
@@ -753,5 +811,35 @@ public class TestUnionOnSchema  {
 
     }
     
-    
+    /**
+     * Test query with a union-onschema having another as input 
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testTwoUnions() throws IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : int);"
+            + "u1 = union onschema l1, l2;"
+            + "l3 = load '" + INP_FILE_2NUMS + "' as (i : long, j : double);"
+            + "u2 = union onschema u1, l3;"
+        ; 
+        Util.registerMultiLineQuery(pig, query);
+        Iterator<Tuple> it = pig.openIterator("u2");
+        
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1L,2.0)",
+                            "(5L,3.0)",
+                            "(1L,2.0)",
+                            "(5L,3.0)",
+                            "(1L,2.0)",
+                            "(5L,3.0)"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
 }