You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/01/25 23:04:20 UTC

svn commit: r1063479 - in /pig/trunk: src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/newplan/logical/relational/ src/org/apache/pig/newplan/logical/visitor/ test/org/apache/pig/parser/

Author: thejas
Date: Tue Jan 25 22:04:19 2011
New Revision: 1063479

URL: http://svn.apache.org/viewvc?rev=1063479&view=rev
Log:
PIG-1618: Switch to new parser generator technology - NewParser-13.2.patch - (xuefuz via thejas)

Added:
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/SortInfoSetter.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
    pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java
Modified:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1063479&r1=1063478&r2=1063479&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Jan 25 22:04:19 2011
@@ -79,6 +79,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
+import org.apache.pig.newplan.logical.visitor.SortInfoSetter;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.pen.POOptimizeDisabler;
@@ -366,56 +367,6 @@ public class HExecutionEngine {
         return newPreoptimizedPlan;
     }
     
-    public static class SortInfoSetter extends LogicalRelationalNodesVisitor {
-
-        public SortInfoSetter(OperatorPlan plan) throws FrontendException {
-            super(plan, new DependencyOrderWalker(plan));
-        }
-
-        @Override
-        public void visit(LOStore store) throws FrontendException {
-            
-            Operator storePred = store.getPlan().getPredecessors(store).get(0);
-            if(storePred == null){
-                int errCode = 2051;
-                String msg = "Did not find a predecessor for Store." ;
-                throw new FrontendException(msg, errCode, PigException.BUG);    
-            }
-            
-            SortInfo sortInfo = null;
-            if(storePred instanceof LOLimit) {
-                storePred = store.getPlan().getPredecessors(storePred).get(0);
-            } else if (storePred instanceof LOSplitOutput) {
-                LOSplitOutput splitOutput = (LOSplitOutput)storePred;
-                // We assume this is the LOSplitOutput we injected for this case:
-                // b = order a by $0; store b into '1'; store b into '2';
-                // In this case, we should mark both '1' and '2' as sorted
-                LogicalExpressionPlan conditionPlan = splitOutput.getFilterPlan();
-                if (conditionPlan.getSinks().size()==1) {
-                    Operator root = conditionPlan.getSinks().get(0);
-                    if (root instanceof ConstantExpression) {
-                        Object value = ((ConstantExpression)root).getValue();
-                        if (value instanceof Boolean && (Boolean)value==true) {
-                            Operator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
-                            if (split instanceof LOSplit)
-                                storePred = store.getPlan().getPredecessors(split).get(0);
-                        }
-                    }
-                }
-            }
-            // if this predecessor is a sort, get
-            // the sort info.
-            if(storePred instanceof LOSort) {
-                try {
-                    sortInfo = ((LOSort)storePred).getSortInfo();
-                } catch (FrontendException e) {
-                    throw new FrontendException(e);
-                }
-            }
-            store.setSortInfo(sortInfo);
-        }
-    }
-
     public List<ExecJob> execute(PhysicalPlan plan,
                                  String jobName) throws ExecException, FrontendException {
         MapReduceLauncher launcher = new MapReduceLauncher();

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1063479&r1=1063478&r2=1063479&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Tue Jan 25 22:04:19 2011
@@ -23,12 +23,15 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.pig.PigException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
 
 public class LOUnion extends LogicalRelationalOperator {
     private boolean onSchema;
@@ -54,32 +57,47 @@ public class LOUnion extends LogicalRela
         if (schema != null) {
             return schema;
         }
-        List<Operator> inputs = null;
-        inputs = plan.getPredecessors(this);
         
+        List<Operator> inputs = plan.getPredecessors(this);
         // If any predecessor's schema is null, then the schema for union is null
         for (Operator input : inputs) {
             LogicalRelationalOperator op = (LogicalRelationalOperator)input;
-            if (op.getSchema()==null)
-                return null;
+            if( op.getSchema() == null ) {
+                if( isOnSchema() ) {
+                    String msg = "Schema of relation " + op.getAlias()
+                        + " is null." 
+                        + " UNION ONSCHEMA cannot be used with relations that"
+                        + " have null schema.";
+                    throw new FrontendException(msg, 1116, PigException.INPUT);
+
+                } else {
+                    return null;
+                }
+            }
         }
         
+        LogicalSchema mergedSchema = null;
         LogicalSchema s0 = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
-        if (inputs.size()==1)
-            return s0;
-        LogicalSchema s1 = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
-        LogicalSchema mergedSchema = LogicalSchema.merge(s0, s1);
-        if (mergedSchema==null)
-            return null;
-        
-        // Merge schema
-        for (int i=2;i<inputs.size();i++) {
-            LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
-            if (mergedSchema==null || otherSchema==null)
-                return null;
-            mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema);
-            if (mergedSchema == null)
+        if ( inputs.size() == 1 )
+            return schema = s0;
+        
+        if( isOnSchema() ) {
+            mergedSchema = createMergedSchemaOnAlias( inputs );
+        } else {
+            LogicalSchema s1 = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
+            mergedSchema = LogicalSchema.merge(s0, s1);
+            if (mergedSchema==null)
                 return null;
+            
+            // Merge schema
+            for (int i=2;i<inputs.size();i++) {
+                LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
+                if (mergedSchema==null || otherSchema==null)
+                    return null;
+                mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema);
+                if (mergedSchema == null)
+                    return null;
+            }
         }
         
         // Bring back cached uid if any; otherwise, cache uid generated
@@ -103,10 +121,41 @@ public class LOUnion extends LogicalRela
 
             fs.uid = uid;
         }
-        schema = mergedSchema;
-        return schema;
+        return schema = mergedSchema;
     }
 
+    /**
+     * create schema for union-onschema
+     */
+    private LogicalSchema createMergedSchemaOnAlias(List<Operator> ops)
+    throws FrontendException {
+        ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
+        for( Operator op : ops ){
+            LogicalRelationalOperator lop = (LogicalRelationalOperator)op;
+            LogicalSchema sch = lop.getSchema();
+            for( LogicalFieldSchema fs : sch.getFields() ) {
+                if(fs.alias == null){
+                    String msg = "Schema of relation " + lop.getAlias()
+                        + " has a null fieldschema for column(s). Schema :" + sch;
+                    throw new FrontendException( msg, 1116, PigException.INPUT );
+                }
+            }
+            schemas.add( sch );
+        }
+        
+        //create the merged schema
+        LogicalSchema mergedSchema = null;
+        try {
+            mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );   
+        } catch(SchemaMergeException e)                 {
+            String msg = "Error merging schemas for union operator : "
+                + e.getMessage();
+            throw new FrontendException(msg, 1116, PigException.INPUT, e);
+        }
+        
+        return mergedSchema;
+    }
+    
     @Override
     public void accept(PlanVisitor v) throws FrontendException {
         if (!(v instanceof LogicalRelationalNodesVisitor)) {

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1063479&r1=1063478&r2=1063479&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Tue Jan 25 22:04:19 2011
@@ -20,12 +20,15 @@ package org.apache.pig.newplan.logical.r
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.PigException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
 
@@ -168,6 +171,52 @@ public class LogicalSchema {
                     type, uid);
             return newFs;
         }
+        
+        /***
+         * Compare two field schema for equality
+
+         * @param relaxInner If true, we don't check inner tuple schemas
+         * @param relaxAlias If true, we don't check aliases
+         * @return true if FieldSchemas are equal, false otherwise
+         */
+        public static boolean equals(LogicalFieldSchema fschema,
+                                     LogicalFieldSchema fother,
+                                     boolean relaxInner,
+                                     boolean relaxAlias) {
+            if( fschema == null || fother == null ) {
+                return false ;
+            }
+
+            if( fschema.type != fother.type ) {
+                return false ;
+            }
+
+
+            if (!relaxAlias) {
+                if ( fschema.alias == null && fother.alias == null ) {
+                    // good
+                } else if ( fschema.alias == null ) {
+                    return false ;
+                } else if( !fschema.alias.equals( fother.alias ) ) {
+                    return false ;
+                }
+            }
+
+            if ( (!relaxInner) && (DataType.isSchemaType(fschema.type))) {
+                // Don't do the comparison if both embedded schemas are
+                // null.  That will cause Schema.equals to return false,
+                // even though we want to view that as true.
+                if (!(fschema.schema == null && fother.schema == null)) {
+                    // compare recursively using schema
+                    if (!LogicalSchema.equals(fschema.schema, fother.schema, false, relaxAlias)) {
+                        return false ;
+                    }
+                }
+            }
+
+            return true ;
+        }
+
     }
     
     private List<LogicalFieldSchema> fields;
@@ -228,6 +277,15 @@ public class LogicalSchema {
 
         return fields.get(p.first);
     }
+    
+    public int getFieldPosition(String alias) {
+        Pair<Integer, Boolean> p = aliases.get( alias );
+        if( p == null ) {
+            return -1;
+        }
+
+        return p.first;
+    }
 
     /**
      * Fetch a field by field number
@@ -397,4 +455,227 @@ public class LogicalSchema {
             newSchema.addField(getField(i).deepCopy());
         return newSchema;
     }
+
+    /**
+     * Merges collection of schemas using their column aliases 
+     * (unlike mergeSchema(..) functions which merge using positions)
+     * Schema will not be merged if types are incompatible, 
+     * as per DataType.mergeType(..)
+     * For Tuples and Bags, SubSchemas have to be equal be considered compatible
+     * @param schemas - list of schemas to be merged using their column alias
+     * @return merged schema
+     */
+    public static LogicalSchema mergeSchemasByAlias(List<LogicalSchema> schemas)
+    throws SchemaMergeException{
+        LogicalSchema mergedSchema = null;
+
+        // list of schemas that have currently been merged, used in error message
+        ArrayList<LogicalSchema> mergedSchemas = new ArrayList<LogicalSchema>(schemas.size());
+        for(LogicalSchema sch : schemas){
+            if(mergedSchema == null){
+                mergedSchema = sch.deepCopy();
+                mergedSchemas.add(sch);
+                continue;
+            }
+            try{
+                mergedSchema = mergeSchemaByAlias( mergedSchema, sch );
+                mergedSchemas.add(sch);
+            }catch(SchemaMergeException e){
+                String msg = "Error merging schema: ("  + sch + ") with " 
+                    + "merged schema: (" + mergedSchema + ")" + " of schemas : "
+                    + mergedSchemas;
+                throw new SchemaMergeException(msg, e);
+            }
+        }
+        return mergedSchema;
+    }
+    
+    /**
+     * Merges two schemas using their column aliases 
+     * (unlike mergeSchema(..) functions which merge using positions)
+     * Schema will not be merged if types are incompatible, 
+     * as per DataType.mergeType(..)
+     * For Tuples and Bags, SubSchemas have to be equal be considered compatible
+     */
+    public static LogicalSchema mergeSchemaByAlias(LogicalSchema schema1, LogicalSchema schema2)
+    throws SchemaMergeException{
+        LogicalSchema mergedSchema = new LogicalSchema();
+        HashSet<LogicalFieldSchema> schema2colsAdded = new HashSet<LogicalFieldSchema>();
+        // add/merge fields present in first schema 
+        for(LogicalFieldSchema fs1 : schema1.getFields()){
+            checkNullAlias(fs1, schema1);
+            LogicalFieldSchema fs2 = schema2.getField( fs1.alias );
+            if(fs2 != null){
+                if(schema2colsAdded.contains(fs2)){
+                    // alias corresponds to multiple fields in schema1,
+                    // just do a lookup on
+                    // schema1 , that will throw the appropriate error.
+                    schema1.getField( fs2.alias );
+                }
+                schema2colsAdded.add(fs2);
+            }
+            LogicalFieldSchema mergedFs = mergeFieldSchemaFirstLevelSameAlias(fs1,fs2);
+            mergedSchema.addField( mergedFs );
+        }
+
+        //add schemas from 2nd schema, that are not already present in
+        // merged schema
+        for(LogicalFieldSchema fs2 : schema2.getFields()){
+            checkNullAlias(fs2, schema2);
+            if(! schema2colsAdded.contains(fs2)){
+                mergedSchema.addField( new LogicalFieldSchema( fs2 ) );
+            }
+        }
+        return mergedSchema;
+    }
+
+    private static void checkNullAlias(LogicalFieldSchema fs, LogicalSchema schema)
+    throws SchemaMergeException {
+        if(fs.alias == null){
+            throw new SchemaMergeException(
+                    "Schema having field with null alias cannot be merged " +
+                    "using alias. Schema :" + schema
+            );
+        }
+    }
+
+    /**
+     * Schema will not be merged if types are incompatible, 
+     * as per DataType.mergeType(..)
+     * For Tuples and Bags, SubSchemas have to be equal be considered compatible
+     * Aliases are assumed to be same for both
+     */
+    private static LogicalFieldSchema mergeFieldSchemaFirstLevelSameAlias(LogicalFieldSchema fs1,
+            LogicalFieldSchema fs2) 
+    throws SchemaMergeException {
+        if(fs1 == null)
+            return fs2;
+        if(fs2 == null)
+            return fs1;
+
+        LogicalSchema innerSchema = null;
+        
+        String alias = mergeNameSpacedAlias(fs1.alias, fs2.alias);
+        
+        byte mergedType = DataType.mergeType(fs1.type, fs2.type) ;
+
+        // If the types cannot be merged
+        if (mergedType == DataType.ERROR) {
+                int errCode = 1031;
+                String msg = "Incompatible types for merging schemas. Field schema: "
+                    + fs1 + " Other field schema: " + fs2;
+                throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
+        }
+        if(DataType.isSchemaType(mergedType)) {
+            // if one of them is a bytearray, pick inner schema of other one
+            if( fs1.type == DataType.BYTEARRAY ){
+                innerSchema = fs2.schema;
+            }else if(fs2.type == DataType.BYTEARRAY){
+                innerSchema = fs1.schema;
+            }
+            else {
+                //in case of types with inner schema such as bags and tuples
+                // the inner schema has to be same
+                if(!equals(fs1.schema, fs2.schema, false, false)){
+                    int errCode = 1032;
+                    String msg = "Incompatible types for merging inner schemas of " +
+                    " Field schema type: " + fs1 + " Other field schema type: " + fs2;
+                    throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;                
+                }
+                innerSchema = fs1.schema;
+            }
+        }
+      
+        return new LogicalFieldSchema(alias, innerSchema, mergedType) ;
+    }
+
+    /**
+     * If one of the aliases is of form 'nm::str1', and other is of the form
+     * 'str1', this returns str1
+     */
+    private static String mergeNameSpacedAlias(String alias1, String alias2)
+    throws SchemaMergeException {
+        if(alias1.equals(alias2)){
+            return alias1;
+        }
+        if(alias1.endsWith("::" + alias2)){
+            return alias2;
+        }
+        if(alias2.endsWith("::" + alias1)){
+            return alias1;
+        }
+        //the aliases are different, alias cannot be merged
+        return null;
+    }
+
+    /**
+     * Recursively compare two schemas for equality
+     * @param schema
+     * @param other
+     * @param relaxInner if true, inner schemas will not be checked
+     * @param relaxAlias if true, aliases will not be checked
+     * @return true if schemas are equal, false otherwise
+     */
+    public static boolean equals(LogicalSchema schema,
+                                 LogicalSchema other,
+                                 boolean relaxInner,
+                                 boolean relaxAlias) {
+        // If both of them are null, they are equal
+        if ((schema == null) && (other == null)) {
+            return true ;
+        }
+
+        // otherwise
+        if (schema == null || other == null ) {
+            return false ;
+        }
+
+        /*
+         * Need to check for bags with schemas and bags with tuples that in turn have schemas.
+         * Retrieve the tuple schema of the bag if twoLevelAccessRequired
+         * Assuming that only bags exhibit this behavior and twoLevelAccessRequired is used
+         * with the right intentions
+         */
+        if(schema.isTwoLevelAccessRequired() || other.isTwoLevelAccessRequired()) {
+            if(schema.isTwoLevelAccessRequired()) {
+                schema = schema.getField(0).schema;
+            }
+            
+            if(other.isTwoLevelAccessRequired()) {
+                other = other.getField(0).schema;
+            }
+            
+            return LogicalSchema.equals(schema, other, relaxInner, relaxAlias);
+        }
+
+        if (schema.size() != other.size()) return false;
+
+        Iterator<LogicalFieldSchema> i = schema.fields.iterator();
+        Iterator<LogicalFieldSchema> j = other.fields.iterator();
+
+        while (i.hasNext()) {
+            LogicalFieldSchema myFs = i.next() ;
+            LogicalFieldSchema otherFs = j.next() ;
+
+            if (!relaxAlias) {
+                if( myFs.alias == null && otherFs.alias == null ) {
+                    // good
+                } else if( myFs.alias == null ) {
+                    return false ;
+                } else if( !myFs.alias.equals(otherFs.alias) ) {
+                    return false ;
+                }
+            }
+
+            if (myFs.type != otherFs.type) {
+                return false ;
+            }
+
+            if (!relaxInner && !LogicalFieldSchema.equals( myFs, otherFs, false, relaxAlias ) ) {
+                // Compare recursively using field schema
+                return false ;
+            }
+        }
+        return true;
+    }
 }

Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/SortInfoSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/SortInfoSetter.java?rev=1063479&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/SortInfoSetter.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/SortInfoSetter.java Tue Jan 25 22:04:19 2011
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.visitor;
+
+import org.apache.pig.PigException;
+import org.apache.pig.SortInfo;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+
+public class SortInfoSetter extends LogicalRelationalNodesVisitor {
+
+    public SortInfoSetter(OperatorPlan plan) throws FrontendException {
+        super(plan, new DependencyOrderWalker(plan));
+    }
+
+    @Override
+    public void visit(LOStore store) throws FrontendException {
+        
+        Operator storePred = store.getPlan().getPredecessors(store).get(0);
+        if(storePred == null){
+            int errCode = 2051;
+            String msg = "Did not find a predecessor for Store." ;
+            throw new FrontendException(msg, errCode, PigException.BUG);    
+        }
+        
+        SortInfo sortInfo = null;
+        if(storePred instanceof LOLimit) {
+            storePred = store.getPlan().getPredecessors(storePred).get(0);
+        } else if (storePred instanceof LOSplitOutput) {
+            LOSplitOutput splitOutput = (LOSplitOutput)storePred;
+            // We assume this is the LOSplitOutput we injected for this case:
+            // b = order a by $0; store b into '1'; store b into '2';
+            // In this case, we should mark both '1' and '2' as sorted
+            LogicalExpressionPlan conditionPlan = splitOutput.getFilterPlan();
+            if (conditionPlan.getSinks().size()==1) {
+                Operator root = conditionPlan.getSinks().get(0);
+                if (root instanceof ConstantExpression) {
+                    Object value = ((ConstantExpression)root).getValue();
+                    if (value instanceof Boolean && (Boolean)value==true) {
+                        Operator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
+                        if (split instanceof LOSplit)
+                            storePred = store.getPlan().getPredecessors(split).get(0);
+                    }
+                }
+            }
+        }
+        // if this predecessor is a sort, get
+        // the sort info.
+        if(storePred instanceof LOSort) {
+            try {
+                sortInfo = ((LOSort)storePred).getSortInfo();
+            } catch (FrontendException e) {
+                throw new FrontendException(e);
+            }
+        }
+        store.setSortInfo(sortInfo);
+    }
+}

Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java?rev=1063479&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java Tue Jan 25 22:04:19 2011
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+/**
+ * A visitor that modifies the logical plan (if necessary) for union-onschema
+ * functionality. It runs logical plan validator so that the correct schema
+ * of its inputs is available. It inserts foreach statements in its input
+ * if the input operator schema does not match the schema created by 
+ * merging all input schemas.
+ * 
+ * Migrated from the old UnionOnSchemaSetter class.
+ * 
+ */
+public class UnionOnSchemaSetter extends LogicalRelationalNodesVisitor{
+
+    public UnionOnSchemaSetter(OperatorPlan plan)
+            throws FrontendException {
+        super(plan, new DependencyOrderWalker(plan));
+    }
+
+    @Override
+    public void visit(LOUnion union) throws FrontendException {
+        if( !union.isOnSchema() )
+            return;
+        
+        LogicalSchema outputSchema = union.getSchema();
+        int fieldCount = outputSchema.size();
+        OperatorPlan plan = union.getPlan();
+        List<Operator> preds = new ArrayList<Operator>();
+        preds.addAll( plan.getPredecessors( union ) );
+
+        List<LogicalSchema> fieldSchemas = new ArrayList<LogicalSchema>( fieldCount );
+        for( LogicalFieldSchema fs : outputSchema.getFields() ) {
+            LogicalSchema ls = new LogicalSchema();
+            ls.addField( new LogicalFieldSchema( fs.alias, fs.schema, DataType.NULL ) );
+            fieldSchemas.add( ls );
+        }
+        
+        for( Operator pred : preds ) {
+            LogicalRelationalOperator op = (LogicalRelationalOperator)pred;
+            LogicalSchema opSchema = op.getSchema();
+            if( opSchema.isEqual( outputSchema ) )
+                continue;
+            
+            LOForEach foreach = new LOForEach( plan );
+            LogicalPlan innerPlan = new LogicalPlan();
+
+            LOGenerate gen = new LOGenerate( innerPlan );
+            boolean[] flattenFlags = new boolean[fieldCount];
+            List<LogicalExpressionPlan> exprPlans = new ArrayList<LogicalExpressionPlan>( fieldCount );
+            List<Operator> genInputs = new ArrayList<Operator>();
+            
+            // Get exprPlans, and genInputs
+            for( LogicalFieldSchema fs : outputSchema.getFields() ) {
+                LogicalExpressionPlan exprPlan = new LogicalExpressionPlan();
+                exprPlans.add( exprPlan );
+                int pos = opSchema.getFieldPosition( fs.alias );
+                if( pos == -1 ) {
+                    new ConstantExpression( exprPlan, null, fs );
+                } else {
+                    ProjectExpression projExpr = 
+                        new ProjectExpression( exprPlan, genInputs.size(), 0, gen );
+                    if( opSchema.getField( pos ).type != fs.type ) {
+                        new CastExpression( exprPlan, projExpr, fs );
+                    }
+                    genInputs.add( new LOInnerLoad( innerPlan, foreach, pos ) );
+                }
+            }
+            
+            gen.setFlattenFlags( flattenFlags );
+            gen.setOutputPlans( exprPlans );
+            gen.setOutputPlanSchemas( fieldSchemas );
+            innerPlan.add( gen );
+            for( Operator input : genInputs ) {
+                innerPlan.add(input);
+                innerPlan.connect( input, gen );
+            }
+            
+            foreach.setInnerPlan( innerPlan );
+            
+            Pair<Integer, Integer> pair = plan.disconnect( pred, union );
+            plan.add( foreach );
+            plan.connect( pred, pair.first, foreach, 0 );
+            plan.connect( foreach, 0, union, pair.second );
+        }
+        
+    }
+
+}

Added: pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java?rev=1063479&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java (added)
+++ pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java Tue Jan 25 22:04:19 2011
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import junit.framework.Assert;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.visitor.UnionOnSchemaSetter;
+import org.junit.Test;
+
+public class TestUnionOnSchemaSetter {
+    @Test
+    public void test1() throws FrontendException {
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " + 
+                       "B = load 'y' as ( u:int, v:int, z:long); " +
+                       "C = union onschema A, B; " +
+                       "D = store C into 'output';";
+        LogicalPlan plan = generateLogicalPlan( query );
+        if( plan != null ) {
+            int nodeCount = plan.size();
+            UnionOnSchemaSetter visitor = new UnionOnSchemaSetter( plan );
+            visitor.visit();
+            System.out.println( "Plan after setter: " + plan.toString() );
+            Assert.assertEquals( nodeCount + 2, plan.size() );
+        }
+    }
+
+    @Test
+    public void test2() throws FrontendException {
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " + 
+                       "B = load 'y' as ( u:int, v:int, w:bytearray); " +
+                       "C = union onschema A, B; " +
+                       "D = store C into 'output';";
+        LogicalPlan plan = generateLogicalPlan( query );
+        if( plan != null ) {
+            int nc = plan.size();
+            UnionOnSchemaSetter visitor = new UnionOnSchemaSetter( plan );
+            visitor.visit();
+            System.out.println( "Plan after setter: " + plan.toString() );
+            Assert.assertEquals( nc + 1, plan.size() );
+        }
+    }
+
+    @Test
+    public void test3() throws FrontendException {
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " + 
+                       "B = load 'y' as ( u:int, v:long, w:bytearray); " +
+                       "C = union onschema A, B; " +
+                       "D = store C into 'output';";
+        LogicalPlan plan = generateLogicalPlan( query );
+        if( plan != null ) {
+            int nc = plan.size();
+            UnionOnSchemaSetter visitor = new UnionOnSchemaSetter( plan );
+            visitor.visit();
+            System.out.println( "Plan after setter: " + plan.toString() );
+            Assert.assertEquals( nc, plan.size() );
+        }
+    }
+
+    @Test
+    public void testNegative1() {
+        String query = "A = load 'x' as ( u:int, v:long, w:chararray); " + 
+                       "B = load 'y' as ( u:int, v:long, w:long); " +
+                       "C = union onschema A, B; " +
+                       "D = store C into 'output';";
+        LogicalPlan plan = generateLogicalPlan( query );
+        if( plan != null ) {
+            UnionOnSchemaSetter visitor;
+            try {
+                visitor = new UnionOnSchemaSetter( plan );
+                visitor.visit();
+            } catch (FrontendException e) {
+                return; // Expect an exception.
+            }
+        }
+        Assert.fail( "Test case shouldn't pass!" );
+    }
+    
+    private LogicalPlan generateLogicalPlan(String query) {
+        try {
+            return ParserTestingUtils.generateLogicalPlan( query );
+        } catch(Exception ex) {
+            Assert.fail( "Failed to generate logical plan for query [" + query + "] due to exception: " + ex );
+        }
+        return null;
+    }
+
+}