You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/08 01:18:46 UTC

svn commit: r654320 - in /incubator/pig/branches/types: build.xml src/org/apache/pig/data/DataType.java src/org/apache/pig/impl/logicalLayer/schema/Schema.java test/org/apache/pig/test/TestSchema.java

Author: gates
Date: Wed May  7 16:18:45 2008
New Revision: 654320

URL: http://svn.apache.org/viewvc?rev=654320&view=rev
Log:
Added equals and merge to schema class.  Work done by Pi.


Added:
    incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=654320&r1=654319&r2=654320&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Wed May  7 16:18:45 2008
@@ -144,6 +144,7 @@
                  **/test/FakeFSOutputStream.java, **/test/TestPackage.java, **/test/TestForEach.java,
         		**/test/TestLocalRearrange.java, **/test/TestPOUserFunc.java,
         		**/test/TestPODistinct.java, **/test/TestPOSort.java,
+        		**/test/TestSchema.java,
                 **/test/FakeFSInputStream.java, **/test/Util.java,
                 **/logicalLayer/*.java, **/logicalLayer/parser/NodeIdGenerator.java,
                 **/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
@@ -263,6 +264,7 @@
                 	<include name="**/TestForEach.java" /> 
                 	<include name="**/TestInputOutputFileValidator.java" /> 
                 	<include name="**/TestTypeCheckingValidator.java" /> 
+                	<include name="**/TestSchema.java" /> 
                     <!--
                     <include name="**/*Test*.java" />
                     <exclude name="**/TestLargeFile.java" />

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=654320&r1=654319&r2=654320&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Wed May  7 16:18:45 2008
@@ -572,4 +572,5 @@
             default :return true ;
         }
     }
+    
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=654320&r1=654319&r2=654320&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Wed May  7 16:18:45 2008
@@ -78,6 +78,44 @@
 
             return true;
         }
+        
+        /***
+         * Compare two field schema for equality
+         * @param fschema
+         * @param fother
+         * @param relaxInner If true, we don't check inner tuple schemas
+         * @param relaxAlias If true, we don't check aliases
+         * @return
+         */
+        public static boolean equals(FieldSchema fschema, 
+                                     FieldSchema fother, 
+                                     boolean relaxInner,
+                                     boolean relaxAlias) {
+            if (fschema == null) {              
+                return false ;
+            }
+            
+            if (fother == null) {
+                return false ;
+            }
+            
+            if (fschema.type != fother.type) {
+                return false ;
+            }
+            
+            if ( (!relaxAlias) && (fschema.alias != fother.alias) ) {
+                return false ;
+            }
+            
+            if ( (!relaxInner) && (fschema.type == DataType.TUPLE) ) {
+               // compare recursively using schema
+               if (!Schema.equals(fschema.schema, fother.schema, false, relaxAlias)) {
+                   return false ;
+               }
+            }
+            
+            return true ;
+        }
     }
 
     private List<FieldSchema> mFields;
@@ -181,6 +219,196 @@
         }
         return true;
     }
+    
+    /**
+     * Recursively compare two schemas for equality
+     * @param schema
+     * @param other
+     * @param relaxInner if true, inner schemas will not be checked
+     * @return
+     */
+    public static boolean equals(Schema schema, 
+                                 Schema other, 
+                                 boolean relaxInner,
+                                 boolean relaxAlias) {
+        if (schema == null) {
+            return false ;
+        }
+        
+        if (other == null) {
+            return false ;
+        }
+        
+        if (schema.size() != other.size()) return false;
+
+        Iterator<FieldSchema> i = schema.mFields.iterator();
+        Iterator<FieldSchema> j = other.mFields.iterator();
+        
+        while (i.hasNext()) {
+            
+            FieldSchema myFs = i.next() ;
+            FieldSchema otherFs = j.next() ;
+            
+            if ( (!relaxAlias) && (myFs.alias != otherFs.alias) ) {
+                return false ;
+            }
+            
+            if (myFs.type != otherFs.type) {
+                return false ;
+            }
+            
+            if (!relaxInner) {
+                // Compare recursively using field schema
+                if (!FieldSchema.equals(myFs, otherFs, false, relaxAlias)) {
+                    return false ;
+                }            
+            }
+            
+        }
+        return true;
+    }
+    
+    
+    /***
+     * Merge this schema with the other schema
+     * @param other the other schema to be merged with
+     * @param otherTakesAliasPrecedence true if aliases from the other
+     *                                  schema take precedence
+     * @return the merged schema, null if they are not compatible
+     */
+    public Schema merge(Schema other, boolean otherTakesAliasPrecedence) {
+        return mergeSchema(this, other, otherTakesAliasPrecedence) ;
+    }
+    
+    /***
+     * Recursively merge two schemas 
+     * @param schema the initial schema
+     * @param other the other schema to be merged with
+     * @param otherTakesAliasPrecedence true if aliases from the other
+     *                                  schema take precedence
+     * @return the merged schema, null if they are not compatible
+     */
+    private Schema mergeSchema(Schema schema, Schema other, 
+                               boolean otherTakesAliasPrecedence) {
+        
+        if (other == null) {
+            return null ;
+        }
+        
+        if (schema.size() != other.size()) {
+            return null ;
+        }
+        
+        List<FieldSchema> outputList = new ArrayList<FieldSchema>() ;
+        
+        Iterator<FieldSchema> mylist = schema.mFields.iterator() ;
+        Iterator<FieldSchema> otherlist = other.mFields.iterator() ;
+        
+        while (mylist.hasNext()) {
+            
+            FieldSchema myFs = mylist.next() ;
+            FieldSchema otherFs = otherlist.next() ;
+            
+            byte mergedType = mergeType(myFs.type, otherFs.type) ;
+            // if the types cannot be merged, the schemas cannot be merged
+            if (mergedType == DataType.ERROR) {
+                return null ;
+            }
+            
+            String mergedAlias = mergeAlias(myFs.alias, 
+                                            otherFs.alias, 
+                                            otherTakesAliasPrecedence) ;
+            
+            FieldSchema mergedFs = null ;
+            if (mergedType != DataType.TUPLE) {
+                // just normal merge              
+                mergedFs = new FieldSchema(mergedAlias, mergedType) ;
+            }
+            else {
+                // merge inner tuple because both sides are tuples
+                Schema mergedSubSchema = mergeSchema(myFs.schema, 
+                                                     otherFs.schema,
+                                                     otherTakesAliasPrecedence) ;
+                // return null if they cannot be merged
+                if (mergedSubSchema == null) {
+                    return null ;
+                }
+                
+                mergedFs = new FieldSchema(mergedAlias, mergedSubSchema) ;
+                
+            }
+            outputList.add(mergedFs) ;
+        }
+        
+        return new Schema(outputList) ;
+    }
+    
+    /***
+     * Merge two aliases. If one of aliases is null, return the other.
+     * Otherwise check the precedence condition
+     * @param alias
+     * @param other
+     * @param otherTakesPrecedence
+     * @return
+     */
+    private String mergeAlias(String alias, String other
+                              ,boolean otherTakesPrecedence) {
+        if (alias == null) {
+            return other ;
+        }
+        else if (other == null) {
+            return alias ;
+        }
+        else if (otherTakesPrecedence) {
+            return other ;
+        }
+        else {
+            return alias ;
+        }
+    }
+    
+    /***
+     * Merge types if possible
+     * @param type1
+     * @param type2
+     * @return the merged type, or DataType.ERROR if not successful
+     */
+    private byte mergeType(byte type1, byte type2) {
+        // Only legal types can be merged
+        if ( (!DataType.isUsableType(type1)) ||
+             (!DataType.isUsableType(type2)) ) {
+            return DataType.ERROR ;
+        }
+        
+        // Same type is OK
+        if (type1==type2) {
+            return type1 ;
+        }
+        
+        // Both are number so we return the bigger type
+        if ( (DataType.isNumberType(type1)) &&
+             (DataType.isNumberType(type2)) ) {
+            return type1>type2 ? type1:type2 ;
+        }
+        
+        // One is bytearray and the other is (number or chararray)
+        if ( (type1 == DataType.BYTEARRAY) &&
+                ( (type2 == DataType.CHARARRAY) || (DataType.isNumberType(type2)) )
+              ) {
+            return type2 ;
+        }
+        
+        if ( (type2 == DataType.BYTEARRAY) &&
+                ( (type1 == DataType.CHARARRAY) || (DataType.isNumberType(type1)) )
+              ) {
+            return type1 ;
+        }
+        
+        // else return just ERROR
+        return DataType.ERROR ;
+    }
+    
+    
 }
 
 

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java?rev=654320&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java Wed May  7 16:18:45 2008
@@ -0,0 +1,113 @@
+package org.apache.pig.test;
+
+import java.util.* ;
+
+import org.apache.pig.data.* ;
+import org.apache.pig.impl.logicalLayer.schema.* ;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import org.junit.* ;
+
+import junit.framework.Assert;
+import junit.framework.TestCase ;
+
+public class TestSchema extends TestCase {
+    
+    @Test
+    public void testSchemaEqual1() {
+        
+        List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ;
+        innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ;
+        innerList1.add(new FieldSchema("11b", DataType.LONG)) ;
+        
+        List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ;
+        innerList2.add(new FieldSchema("11a", DataType.INTEGER)) ;
+        innerList2.add(new FieldSchema("11b", DataType.LONG)) ;
+        
+        Schema innerSchema1 = new Schema(innerList1) ;
+        Schema innerSchema2 = new Schema(innerList2) ;
+                
+        List<FieldSchema> list1 = new ArrayList<FieldSchema>() ;
+        list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+        list1.add(new FieldSchema("1b", innerSchema1)) ;
+        list1.add(new FieldSchema("1c", DataType.INTEGER)) ;
+        
+        List<FieldSchema> list2 = new ArrayList<FieldSchema>() ;
+        list2.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+        list2.add(new FieldSchema("1b", innerSchema2)) ;
+        list2.add(new FieldSchema("1c", DataType.INTEGER)) ;
+        
+        Schema schema1 = new Schema(list1) ;
+        Schema schema2 = new Schema(list2) ;
+        
+        Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ;
+        
+        innerList2.get(1).alias = "pi" ;
+        
+        Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+        Assert.assertTrue(Schema.equals(schema1, schema2, false, true)) ;
+        
+        innerList2.get(1).alias = "11b" ;
+        innerList2.get(1).type = DataType.BYTEARRAY ;
+        
+        Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+        Assert.assertTrue(Schema.equals(schema1, schema2, true, false)) ;
+        
+        innerList2.get(1).type = DataType.LONG ;
+        
+        Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ;
+        
+        list2.get(0).type = DataType.CHARARRAY ;
+        Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+    }
+    
+    @Test
+    public void testMerge1() {
+        
+        // Generate two schemas
+        List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ;
+        innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ; 
+        innerList1.add(new FieldSchema("11b", DataType.FLOAT)) ;
+        
+        List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ;
+        innerList2.add(new FieldSchema("22a", DataType.DOUBLE)) ;
+        innerList2.add(new FieldSchema(null, DataType.LONG)) ;
+        
+        Schema innerSchema1 = new Schema(innerList1) ;
+        Schema innerSchema2 = new Schema(innerList2) ;
+                
+        List<FieldSchema> list1 = new ArrayList<FieldSchema>() ;
+        list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+        list1.add(new FieldSchema("1b", innerSchema1)) ;
+        list1.add(new FieldSchema("1c", DataType.LONG)) ;
+        
+        List<FieldSchema> list2 = new ArrayList<FieldSchema>() ;
+        list2.add(new FieldSchema("2a", DataType.BYTEARRAY)) ;
+        list2.add(new FieldSchema("2b", innerSchema2)) ;
+        list2.add(new FieldSchema("2c", DataType.INTEGER)) ;
+        
+        Schema schema1 = new Schema(list1) ;
+        Schema schema2 = new Schema(list2) ;
+        
+        // Merge
+        Schema mergedSchema = schema1.merge(schema2, true) ;
+        
+        
+        // Generate expected schema
+        List<FieldSchema> expectedInnerList = new ArrayList<FieldSchema>() ;
+        expectedInnerList.add(new FieldSchema("22a", DataType.DOUBLE)) ;
+        expectedInnerList.add(new FieldSchema("11b", DataType.FLOAT)) ;
+        
+        Schema expectedInner = new Schema(expectedInnerList) ;
+        
+        List<FieldSchema> expectedList = new ArrayList<FieldSchema>() ;
+        expectedList.add(new FieldSchema("2a", DataType.BYTEARRAY)) ;
+        expectedList.add(new FieldSchema("2b", expectedInner)) ;
+        expectedList.add(new FieldSchema("2c", DataType.LONG)) ;
+        
+        Schema expected = new Schema(expectedList) ;
+        
+        // Compare
+        Assert.assertTrue(Schema.equals(mergedSchema, expected, false, false)) ;
+    }
+}