You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/02/24 22:52:01 UTC

svn commit: r1074311 - in /pig/trunk: ./ src/org/apache/pig/data/ src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig/parser/ test/org/apache/pig/test/

Author: daijy
Date: Thu Feb 24 21:52:00 2011
New Revision: 1074311

URL: http://svn.apache.org/viewvc?rev=1074311&view=rev
Log:
PIG-1536: use same logic for merging inner schemas in "default union" and "union onschema"

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/data/DataType.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
    pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/trunk/test/org/apache/pig/test/TestSchema.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Feb 24 21:52:00 2011
@@ -100,6 +100,9 @@ PIG-1696: Performance: Use System.arrayc
 
 BUG FIXES
 
+PIG-1536: use same logic for merging inner schemas in "default union" and
+"union onschema" (daijy)
+
 PIG-1304: Fail underlying M/R jobs when concatenated gzip and bz2 files are provided as input (laukik via rding)
 
 PIG-1852: Packaging antlr jar with pig.jar (rding via daijy)

Modified: pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DataType.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ pig/trunk/src/org/apache/pig/data/DataType.java Thu Feb 24 21:52:00 2011
@@ -1077,6 +1077,44 @@ public class DataType {
     }
 
     /**
+     * Test if one type can cast to the other.
+     * @param castType data type of the cast type
+     * @param inputType data type of the input
+     * @return true or false
+     */
+    public static boolean castable(byte castType, byte inputType) {
+        // Only legal types can be cast to
+        if ( (!DataType.isUsableType(castType)) ||
+             (!DataType.isUsableType(inputType)) ) {
+            return false;
+        }
+
+        // Same type is castable
+        if (castType==inputType) {
+            return true;
+        }
+
+        // Numerical type is castable
+        if ( (DataType.isNumberType(castType)) &&
+             (DataType.isNumberType(inputType)) ) {
+            return true;
+        }
+
+        // databyte can cast to anything
+        if (inputType == DataType.BYTEARRAY) {
+            return true;
+        }
+
+        // Cast numerical type to string, or vice versa is valid
+        if (DataType.isNumberType(inputType)&&castType==DataType.CHARARRAY ||
+                DataType.isNumberType(castType)&&inputType==DataType.CHARARRAY)
+            return true;
+
+        // else return false
+        return false;
+    }
+    
+    /**
      * Merge types if possible.  Merging types means finding a type that one
      * or both types can be upcast to.
      * @param type1

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Thu Feb 24 21:52:00 2011
@@ -146,7 +146,7 @@ public class LOGenerate extends LogicalR
                 }
                 else {
                     // Merge uid with the exp field schema
-                    mergedSchema = LogicalSchema.merge(mUserDefinedSchemaCopy, expSchema);
+                    mergedSchema = LogicalSchema.merge(mUserDefinedSchemaCopy, expSchema, LogicalSchema.MergeMode.LoadForEach);
                     if (mergedSchema==null) {
                         throw new FrontendException("Cannot merge (" + expSchema.toString(false) + 
                                 ") with user defined schema (" + mUserDefinedSchemaCopy.toString(false) + ")", 1117);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Thu Feb 24 21:52:00 2011
@@ -109,7 +109,7 @@ public class LOLoad extends LogicalRelat
         }
         
         if (scriptSchema != null && determinedSchema != null) {
-            originalSchema = LogicalSchema.merge(scriptSchema, determinedSchema);
+            originalSchema = LogicalSchema.merge(scriptSchema, determinedSchema, LogicalSchema.MergeMode.LoadForEach);
         } else if (scriptSchema != null)  originalSchema = scriptSchema;
         else if (determinedSchema != null) originalSchema = determinedSchema;
         

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Thu Feb 24 21:52:00 2011
@@ -85,7 +85,7 @@ public class LOUnion extends LogicalRela
             mergedSchema = createMergedSchemaOnAlias( inputs );
         } else {
             LogicalSchema s1 = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
-            mergedSchema = LogicalSchema.merge(s0, s1);
+            mergedSchema = LogicalSchema.merge(s0, s1, LogicalSchema.MergeMode.Union);
             if (mergedSchema==null)
                 return null;
             
@@ -94,7 +94,7 @@ public class LOUnion extends LogicalRela
                 LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
                 if (mergedSchema==null || otherSchema==null)
                     return null;
-                mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema);
+                mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema, LogicalSchema.MergeMode.Union);
                 if (mergedSchema == null)
                     return null;
             }
@@ -147,7 +147,7 @@ public class LOUnion extends LogicalRela
         LogicalSchema mergedSchema = null;
         try {
             mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );   
-        } catch(SchemaMergeException e)                 {
+        } catch(FrontendException e)                 {
             String msg = "Error merging schemas for union operator : "
                 + e.getMessage();
             throw new FrontendException(msg, 1116, PigException.INPUT, e);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Thu Feb 24 21:52:00 2011
@@ -25,10 +25,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.pig.PigException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
 
@@ -216,7 +214,115 @@ public class LogicalSchema {
 
             return true ;
         }
+        
+        /***
+         * Merge two LogicalFieldSchema, the behavior of merge depends on mode. 
+         * If mode==MergeType.LoadForEach or MergeType.LoadForEachInner, take left side if compatible, otherwise, throw exception.
+         * If mode==MergeType.UnionInner, if not same type, throw exception, end up with null inner schema.
+         * If mode==MergeType.Union, take more specific type.
+         * @param fs1 In Load/Foreach, fs1 is user declared schema; in Union, fs1 is left side
+         * @param fs2 In Load/Foreach, fs1 is inferred schema; in Union, fs1 is left side
+         * @param mode merge mode
+         */
+        public static LogicalFieldSchema merge(LogicalFieldSchema fs1, LogicalFieldSchema fs2, MergeMode mode) throws FrontendException {
+            // deal with null schema
+            if (mode==MergeMode.LoadForEach) {
+                if (fs1==null) throw new FrontendException("We cannot cast into null", 1031);
+                if (fs2==null) return fs1.deepCopy();
+            } else if (mode==MergeMode.LoadForEachInner) {
+                if (fs1==null)
+                    return null;
+                if (fs2==null)
+                    return fs1.deepCopy();
+            } else { // Union/UnionInner
+                if(fs1==null||fs2==null)
+                    return null;
+            }
+            
+            String mergedAlias;
+            byte mergedType = DataType.UNKNOWN;
+            LogicalSchema mergedSubSchema = null;
+            
+            // Infer merged data type
+            if (mode==MergeMode.UnionInner) {
+                if (fs1.type!=fs2.type)
+                    // We don't merge inner schema of different type for union, throw exception
+                    throw new FrontendException("Incompatable field schema: left is \"" + fs1.toString(false) + "\", right is \"" + fs2.toString(false) + "\"", 1031);
+                else
+                    mergedType = fs1.type;
+            }
+            else if (mode==MergeMode.LoadForEach||mode==MergeMode.LoadForEachInner) {
+                if (fs1.type==DataType.NULL||fs1.type==DataType.BYTEARRAY)  // If declared schema does not have type part
+                    mergedType = fs2.type;
+                else if (!DataType.castable(fs1.type, fs2.type))
+                    throw new FrontendException("Incompatable field schema: declared is \"" + fs1.toString(false) + "\", infered is \"" + fs2.toString(false) + "\"", 1031);
+                else mergedType = fs1.type; // If compatible type, we take the declared type
+            }
+            else {
+                // Union schema
+                if (fs1.type==DataType.BYTEARRAY) {
+                    mergedType=fs2.type;
+                } else if (fs2.type==DataType.BYTEARRAY) {
+                    mergedType = fs1.type;
+                }
+                else {
+                    // Take the more specific type
+                    mergedType = DataType.mergeType(fs1.type, fs2.type);
+                    if (mergedType == DataType.ERROR) {
+                        // Another possibility is one side is numeric, the other side is string, in this case, we take string
+                        if (DataType.isNumberType(fs1.type) && fs2.type==DataType.CHARARRAY ||
+                                DataType.isNumberType(fs2.type) && fs1.type==DataType.CHARARRAY)
+                            mergedType = DataType.CHARARRAY;
+                        else {
+                            // True incompatible, set to bytearray
+                            mergedType = DataType.BYTEARRAY;
+                        }
+                    }
+                }
+            }
+            
+            if (fs1.alias==null)
+                mergedAlias = fs2.alias;
+            else {
+                mergedAlias = fs1.alias; // If both schema have alias, the first one win
+            }
 
+            if (DataType.isSchemaType(mergedType)) {
+                if (mode==MergeMode.Union) {
+                    try {
+                        if (fs1.type==DataType.BYTEARRAY) {
+                            if (fs2.schema!=null)
+                                mergedSubSchema = fs2.schema.deepCopy();
+                        }
+                        else if (fs2.type==DataType.BYTEARRAY) {
+                            if (fs1.schema!=null)
+                                mergedSubSchema = fs1.schema.deepCopy();
+                        }
+                        else {
+                            mergedSubSchema = LogicalSchema.merge(fs1.schema, fs2.schema, MergeMode.UnionInner);
+                        }
+                    } catch (FrontendException e) {
+                        // If inner schema is not compatible, mergedSubSchema set to null
+                    }
+                }
+                else {
+                    if (mode==MergeMode.UnionInner)
+                        mergedSubSchema = LogicalSchema.merge(fs1.schema, fs2.schema, MergeMode.UnionInner);
+                    else {
+                        // LoadForEach/LoadForEachInner
+                        try {
+                            // Only check compatibility
+                            LogicalSchema.merge(fs1.schema, fs2.schema, MergeMode.LoadForEachInner);
+                            mergedSubSchema = fs1.schema;
+                        } catch (FrontendException e) {
+                            throw new FrontendException("Incompatable field schema: left is \"" + fs1.toString(false) + "\", right is \"" + fs2.toString(false) + "\"", 1031);
+                        }
+                    }
+                }
+            }
+            LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType);
+            return mergedFS;
+        }
     }
     
     private List<LogicalFieldSchema> fields;
@@ -353,44 +459,48 @@ public class LogicalSchema {
         return -1;
     }
     
+    public static enum MergeMode {
+        LoadForEach,
+        LoadForEachInner,
+        Union,
+        UnionInner
+    }
     
     /**
      * Merge two schemas.
-     * @param s1
-     * @param s2
+     * @param s1 In Load/ForEach, s1 is user declared schema; In Union, s1 is left side.
+     * @param s2 In Load/ForEach, s2 is infered schema; In Union, s2 is right side.
+     * @param mode We merge schema in Load/Foreach/Union. In Load/Foreach, we always take s1 if compatible (s1 is set to be user defined schema),
+     * In union, we take more specific type (between numeric and string, we take string). In the case type mismatch in s1/s2, 
+     * we expect TypeCheckingVisitor will fill the gap later. 
      * @return a merged schema, or null if the merge fails
      */
-    public static LogicalSchema merge(LogicalSchema s1, LogicalSchema s2) throws FrontendException {
+    public static LogicalSchema merge(LogicalSchema s1, LogicalSchema s2, MergeMode mode) throws FrontendException {
         // If any of the schema is null, take the other party
         if (s1==null || s2==null) {
-            if (s1!=null) return s1.deepCopy();
-            else if (s2!=null) return s2.deepCopy();
-            else return null;
+            if (mode==MergeMode.LoadForEach||mode==MergeMode.LoadForEachInner) {
+                if (s1!=null) return s1.deepCopy();
+                else if (s2!=null) return s2.deepCopy();
+                else return null;
+            }
+            else // Union/UnionInner, take null
+                return null;
         }
         
-        if (s1.size()!=s2.size()) return null;
+        
+        if (s1.size()!=s2.size()) {
+            if (mode==MergeMode.Union) // In union, incompatible type result a null schema
+                return null;
+            else
+                throw new FrontendException("Incompatable schema: left is \"" + s1.toString(false) + "\", right is \"" + s2.toString(false) + "\"", 1031);    
+        }
+            
         LogicalSchema mergedSchema = new LogicalSchema();
         for (int i=0;i<s1.size();i++) {
-            String mergedAlias;
-            byte mergedType;
-            LogicalSchema mergedSubSchema = null;
             LogicalFieldSchema fs1 = s1.getField(i);
             LogicalFieldSchema fs2 = s2.getField(i);
             
-            if (fs1.alias==null)
-                mergedAlias = fs2.alias;
-            else {
-                mergedAlias = fs1.alias; // If both schema have alias, the first one win
-            }
-            if (fs1.type==DataType.NULL)
-                mergedType = fs2.type;
-            else
-                mergedType = fs1.type;
-            
-            if (DataType.isSchemaType(mergedType)) {
-                mergedSubSchema = merge(fs1.schema, fs2.schema);
-            }
-            LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType);
+            LogicalFieldSchema mergedFS = LogicalFieldSchema.merge(fs1, fs2, mode);
             mergedSchema.addField(mergedFS);
         }
         return mergedSchema;
@@ -449,7 +559,7 @@ public class LogicalSchema {
      * @return merged schema
      */
     public static LogicalSchema mergeSchemasByAlias(List<LogicalSchema> schemas)
-    throws SchemaMergeException{
+    throws FrontendException{
         LogicalSchema mergedSchema = null;
 
         // list of schemas that have currently been merged, used in error message
@@ -463,11 +573,11 @@ public class LogicalSchema {
             try{
                 mergedSchema = mergeSchemaByAlias( mergedSchema, sch );
                 mergedSchemas.add(sch);
-            }catch(SchemaMergeException e){
+            }catch(FrontendException e){
                 String msg = "Error merging schema: ("  + sch + ") with " 
                     + "merged schema: (" + mergedSchema + ")" + " of schemas : "
                     + mergedSchemas;
-                throw new SchemaMergeException(msg, e);
+                throw new FrontendException(msg, e);
             }
         }
         return mergedSchema;
@@ -481,7 +591,7 @@ public class LogicalSchema {
      * For Tuples and Bags, SubSchemas have to be equal be considered compatible
      */
     public static LogicalSchema mergeSchemaByAlias(LogicalSchema schema1, LogicalSchema schema2)
-    throws SchemaMergeException{
+    throws FrontendException {
         LogicalSchema mergedSchema = new LogicalSchema();
         HashSet<LogicalFieldSchema> schema2colsAdded = new HashSet<LogicalFieldSchema>();
         // add/merge fields present in first schema 
@@ -496,9 +606,12 @@ public class LogicalSchema {
                     schema1.getField( fs2.alias );
                 }
                 schema2colsAdded.add(fs2);
+                LogicalFieldSchema mergedFs = LogicalFieldSchema.merge(fs1,fs2, MergeMode.Union);
+                mergedFs.alias = mergeNameSpacedAlias(fs1.alias, fs2.alias);
+                mergedSchema.addField(mergedFs);
             }
-            LogicalFieldSchema mergedFs = mergeFieldSchemaFirstLevelSameAlias(fs1,fs2);
-            mergedSchema.addField( mergedFs );
+            else
+                mergedSchema.addField(new LogicalFieldSchema(fs1));
         }
 
         //add schemas from 2nd schema, that are not already present in
@@ -513,9 +626,9 @@ public class LogicalSchema {
     }
 
     private static void checkNullAlias(LogicalFieldSchema fs, LogicalSchema schema)
-    throws SchemaMergeException {
+    throws FrontendException {
         if(fs.alias == null){
-            throw new SchemaMergeException(
+            throw new FrontendException(
                     "Schema having field with null alias cannot be merged " +
                     "using alias. Schema :" + schema
             );
@@ -523,61 +636,11 @@ public class LogicalSchema {
     }
 
     /**
-     * Schema will not be merged if types are incompatible, 
-     * as per DataType.mergeType(..)
-     * For Tuples and Bags, SubSchemas have to be equal be considered compatible
-     * Aliases are assumed to be same for both
-     */
-    private static LogicalFieldSchema mergeFieldSchemaFirstLevelSameAlias(LogicalFieldSchema fs1,
-            LogicalFieldSchema fs2) 
-    throws SchemaMergeException {
-        if(fs1 == null)
-            return fs2;
-        if(fs2 == null)
-            return fs1;
-
-        LogicalSchema innerSchema = null;
-        
-        String alias = mergeNameSpacedAlias(fs1.alias, fs2.alias);
-        
-        byte mergedType = DataType.mergeType(fs1.type, fs2.type) ;
-
-        // If the types cannot be merged
-        if (mergedType == DataType.ERROR) {
-                int errCode = 1031;
-                String msg = "Incompatible types for merging schemas. Field schema: "
-                    + fs1 + " Other field schema: " + fs2;
-                throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
-        }
-        if(DataType.isSchemaType(mergedType)) {
-            // if one of them is a bytearray, pick inner schema of other one
-            if( fs1.type == DataType.BYTEARRAY ){
-                innerSchema = fs2.schema;
-            }else if(fs2.type == DataType.BYTEARRAY){
-                innerSchema = fs1.schema;
-            }
-            else {
-                //in case of types with inner schema such as bags and tuples
-                // the inner schema has to be same
-                if(!equals(fs1.schema, fs2.schema, false, false)){
-                    int errCode = 1032;
-                    String msg = "Incompatible types for merging inner schemas of " +
-                    " Field schema type: " + fs1 + " Other field schema type: " + fs2;
-                    throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;                
-                }
-                innerSchema = fs1.schema;
-            }
-        }
-      
-        return new LogicalFieldSchema(alias, innerSchema, mergedType) ;
-    }
-
-    /**
      * If one of the aliases is of form 'nm::str1', and other is of the form
      * 'str1', this returns str1
      */
     private static String mergeNameSpacedAlias(String alias1, String alias2)
-    throws SchemaMergeException {
+    throws FrontendException {
         if(alias1.equals(alias2)){
             return alias1;
         }

Modified: pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java Thu Feb 24 21:52:00 2011
@@ -75,22 +75,19 @@ public class TestUnionOnSchemaSetter {
     }
 
     @Test
-    public void testNegative1() {
+    public void testMergeCompatibleSchema() throws FrontendException {
         String query = "A = load 'x' as ( u:int, v:long, w:chararray); " + 
                        "B = load 'y' as ( u:int, v:long, w:long); " +
                        "C = union onschema A, B; " +
                        "D = store C into 'output';";
         LogicalPlan plan = generateLogicalPlan( query );
         if( plan != null ) {
-            UnionOnSchemaSetter visitor;
-            try {
-                visitor = new UnionOnSchemaSetter( plan );
-                visitor.visit();
-            } catch (FrontendException e) {
-                return; // Expect an exception.
-            }
+            int nc = plan.size();
+            UnionOnSchemaSetter visitor = new UnionOnSchemaSetter( plan );
+            visitor.visit();
+            System.out.println( "Plan after setter: " + plan.toString() );
+            Assert.assertEquals( nc+1, plan.size() ); // ForEach inserted before union
         }
-        Assert.fail( "Test case shouldn't pass!" );
     }
     
     private LogicalPlan generateLogicalPlan(String query) {

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Thu Feb 24 21:52:00 2011
@@ -902,8 +902,8 @@ public class TestEvalPipeline2 extends T
             pigServer.openIterator("c");
         } catch (Exception e) {
             PigException pe = LogUtils.getPigException(e);
-            assertTrue(pe.getErrorCode()==1117);
-            assertTrue(pe.getMessage().contains("Cannot merge"));
+            assertTrue(pe.getErrorCode()==1031);
+            assertTrue(pe.getMessage().contains("Incompatable schema"));
             return;
         }
         fail();

Modified: pig/trunk/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSchema.java Thu Feb 24 21:52:00 2011
@@ -48,12 +48,14 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.MergeMode;
 import org.junit.Test;
 
 public class TestSchema extends TestCase {
@@ -693,7 +695,172 @@ public class TestSchema extends TestCase
         Schema s2 = Utils.getSchemaFromString("b:{t:(b0:int, b1:int)}");
         LogicalSchema ls1 = org.apache.pig.newplan.logical.Util.translateSchema(s1);
         LogicalSchema ls2 = org.apache.pig.newplan.logical.Util.translateSchema(s2);
-        LogicalSchema ls3 = LogicalSchema.merge(ls1, ls2);
+        LogicalSchema ls3 = LogicalSchema.merge(ls1, ls2, MergeMode.LoadForEach);
         assertTrue(org.apache.pig.newplan.logical.Util.translateSchema(ls3).toString().equals("{a: {t: (a0: int,a1: int)}}"));
     }
+    
+    @Test
+    public void testNewNormalNestedMerge1() throws Exception {
+        LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:bytearray, b1:(b11:int, b12:float), c1:long"));
+        LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a2:bytearray, b2:(b21:double, b22:long), c2:int"));
+        
+        LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+        LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:bytearray, b1:(), c1:long"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+        
+        mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+        expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:bytearray, b1:(b11:int, b12:float), c1:long"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+        
+        mergedSchema = LogicalSchema.merge(b, a, LogicalSchema.MergeMode.LoadForEach);
+        expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a2:bytearray, b2:(b21:double, b22:long), c2:int"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+    }
+    
+    @Test
+    public void testNewNormalNestedMerge2() throws Exception {
+        LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:(a11:chararray, a12:float), b1:(b11:chararray, b12:float), c1:long"));
+        LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a2:bytearray, b2:(b21:double, b22:long), c2:chararray"));
+        
+        LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+        LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:(a11:chararray, a12:float), b1:(), c1:chararray"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+        
+        mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+        expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:(a11:chararray, a12:float), b1:(b11:chararray, b12:float), c1:long"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+        
+        try {
+            LogicalSchema.merge(b, a, LogicalSchema.MergeMode.LoadForEach);
+            fail();
+        } catch (FrontendException e) {
+            assertTrue(e.getErrorCode()==1031);
+        }
+    }
+
+    @Test
+    public void testNewMergeNullSchemas() throws Throwable {
+        LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:bytearray, b1:(b11:int, b12:float), c1:long"));
+        LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a2:bytearray, b2:(), c2:int"));
+        
+        LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+        LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:bytearray, b1:(), c1:long"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+        
+        mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+        expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:bytearray, b1:(b11:int, b12:float), c1:long"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+        
+        mergedSchema = LogicalSchema.merge(b, a, LogicalSchema.MergeMode.LoadForEach);
+        expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a2:bytearray, b2:(), c2:int"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+    }
+
+    @Test
+    public void testNewMergeDifferentSize1() throws Throwable {
+        LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:bytearray, b1:long, c1:long"));
+        LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a2:bytearray, b2:long"));
+        
+        LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+        assertTrue(mergedSchema==null);
+        
+        try {
+            LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+            fail();
+        } catch (FrontendException e) {
+            assertTrue(e.getErrorCode()==1031);
+        }
+    }
+
+    @Test
+    public void testNewMergeDifferentSize2() throws Throwable {
+        LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:bytearray, b1:(b11:int, b12:float, b13:float), c1:long"));
+        LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a2:bytearray, b2:(b21:double, b22:long), c2:int"));
+
+        LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+        LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:bytearray, b1:(), c1:long"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+
+        try {
+            LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+            fail();
+        } catch (FrontendException e) {
+            assertTrue(e.getErrorCode()==1031);
+        }
+    }
+
+
+    @Test
+    public void testNewMergeMismatchType1() throws Throwable {
+        LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:chararray, b1:long, c1:long"));
+        LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a2:bytearray, b2:(b21:double, b22:long), c2:int"));
+
+        LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+        LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:chararray, b1:bytearray, c1:long"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+        
+        try {
+            LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+            fail();
+        } catch (FrontendException e) {
+            assertTrue(e.getErrorCode()==1031);
+        }
+        
+        try {
+            LogicalSchema.merge(b, a, LogicalSchema.MergeMode.LoadForEach);
+            fail();
+        } catch (FrontendException e) {
+            assertTrue(e.getErrorCode()==1031);
+        }
+    }
+
+
+    @Test
+    public void testNewMergeMismatchType2() throws Throwable {
+        LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:chararray, b1:(b11:double, b12:(b121:int)), c1:long"));
+        LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a2:bytearray, b2:(b21:double, b22:long), c2:int"));
+    
+        LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+        LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+            "a1:chararray, b1:(), c1:long"));
+        Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+        
+        try {
+            LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+            fail();
+        } catch (FrontendException e) {
+            assertTrue(e.getErrorCode()==1031);
+        }
+        
+        try {
+            LogicalSchema.merge(b, a, LogicalSchema.MergeMode.LoadForEach);
+            fail();
+        } catch (FrontendException e) {
+            assertTrue(e.getErrorCode()==1031);
+        }
+    }
 }