You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/08 01:18:46 UTC
svn commit: r654320 - in /incubator/pig/branches/types: build.xml
src/org/apache/pig/data/DataType.java
src/org/apache/pig/impl/logicalLayer/schema/Schema.java
test/org/apache/pig/test/TestSchema.java
Author: gates
Date: Wed May 7 16:18:45 2008
New Revision: 654320
URL: http://svn.apache.org/viewvc?rev=654320&view=rev
Log:
Added equals and merge to schema class. Work done by Pi.
Added:
incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java
Modified:
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=654320&r1=654319&r2=654320&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Wed May 7 16:18:45 2008
@@ -144,6 +144,7 @@
**/test/FakeFSOutputStream.java, **/test/TestPackage.java, **/test/TestForEach.java,
**/test/TestLocalRearrange.java, **/test/TestPOUserFunc.java,
**/test/TestPODistinct.java, **/test/TestPOSort.java,
+ **/test/TestSchema.java,
**/test/FakeFSInputStream.java, **/test/Util.java,
**/logicalLayer/*.java, **/logicalLayer/parser/NodeIdGenerator.java,
**/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
@@ -263,6 +264,7 @@
<include name="**/TestForEach.java" />
<include name="**/TestInputOutputFileValidator.java" />
<include name="**/TestTypeCheckingValidator.java" />
+ <include name="**/TestSchema.java" />
<!--
<include name="**/*Test*.java" />
<exclude name="**/TestLargeFile.java" />
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=654320&r1=654319&r2=654320&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Wed May 7 16:18:45 2008
@@ -572,4 +572,5 @@
default :return true ;
}
}
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=654320&r1=654319&r2=654320&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Wed May 7 16:18:45 2008
@@ -78,6 +78,44 @@
return true;
}
+
+ /***
+ * Compare two field schema for equality
+ * @param fschema
+ * @param fother
+ * @param relaxInner If true, we don't check inner tuple schemas
+ * @param relaxAlias If true, we don't check aliases
+ * @return
+ */
+ public static boolean equals(FieldSchema fschema,
+ FieldSchema fother,
+ boolean relaxInner,
+ boolean relaxAlias) {
+ if (fschema == null) {
+ return false ;
+ }
+
+ if (fother == null) {
+ return false ;
+ }
+
+ if (fschema.type != fother.type) {
+ return false ;
+ }
+
+ if ( (!relaxAlias) && (fschema.alias != fother.alias) ) {
+ return false ;
+ }
+
+ if ( (!relaxInner) && (fschema.type == DataType.TUPLE) ) {
+ // compare recursively using schema
+ if (!Schema.equals(fschema.schema, fother.schema, false, relaxAlias)) {
+ return false ;
+ }
+ }
+
+ return true ;
+ }
}
private List<FieldSchema> mFields;
@@ -181,6 +219,196 @@
}
return true;
}
+
+ /**
+ * Recursively compare two schemas for equality
+ * @param schema
+ * @param other
+ * @param relaxInner if true, inner schemas will not be checked
+ * @return
+ */
+ public static boolean equals(Schema schema,
+ Schema other,
+ boolean relaxInner,
+ boolean relaxAlias) {
+ if (schema == null) {
+ return false ;
+ }
+
+ if (other == null) {
+ return false ;
+ }
+
+ if (schema.size() != other.size()) return false;
+
+ Iterator<FieldSchema> i = schema.mFields.iterator();
+ Iterator<FieldSchema> j = other.mFields.iterator();
+
+ while (i.hasNext()) {
+
+ FieldSchema myFs = i.next() ;
+ FieldSchema otherFs = j.next() ;
+
+ if ( (!relaxAlias) && (myFs.alias != otherFs.alias) ) {
+ return false ;
+ }
+
+ if (myFs.type != otherFs.type) {
+ return false ;
+ }
+
+ if (!relaxInner) {
+ // Compare recursively using field schema
+ if (!FieldSchema.equals(myFs, otherFs, false, relaxAlias)) {
+ return false ;
+ }
+ }
+
+ }
+ return true;
+ }
+
+
+ /***
+ * Merge this schema with the other schema
+ * @param other the other schema to be merged with
+ * @param otherTakesAliasPrecedence true if aliases from the other
+ * schema take precedence
+ * @return the merged schema, null if they are not compatible
+ */
+ public Schema merge(Schema other, boolean otherTakesAliasPrecedence) {
+ return mergeSchema(this, other, otherTakesAliasPrecedence) ;
+ }
+
+ /***
+ * Recursively merge two schemas
+ * @param schema the initial schema
+ * @param other the other schema to be merged with
+ * @param otherTakesAliasPrecedence true if aliases from the other
+ * schema take precedence
+ * @return the merged schema, null if they are not compatible
+ */
+ private Schema mergeSchema(Schema schema, Schema other,
+ boolean otherTakesAliasPrecedence) {
+
+ if (other == null) {
+ return null ;
+ }
+
+ if (schema.size() != other.size()) {
+ return null ;
+ }
+
+ List<FieldSchema> outputList = new ArrayList<FieldSchema>() ;
+
+ Iterator<FieldSchema> mylist = schema.mFields.iterator() ;
+ Iterator<FieldSchema> otherlist = other.mFields.iterator() ;
+
+ while (mylist.hasNext()) {
+
+ FieldSchema myFs = mylist.next() ;
+ FieldSchema otherFs = otherlist.next() ;
+
+ byte mergedType = mergeType(myFs.type, otherFs.type) ;
+ // if the types cannot be merged, the schemas cannot be merged
+ if (mergedType == DataType.ERROR) {
+ return null ;
+ }
+
+ String mergedAlias = mergeAlias(myFs.alias,
+ otherFs.alias,
+ otherTakesAliasPrecedence) ;
+
+ FieldSchema mergedFs = null ;
+ if (mergedType != DataType.TUPLE) {
+ // just normal merge
+ mergedFs = new FieldSchema(mergedAlias, mergedType) ;
+ }
+ else {
+ // merge inner tuple because both sides are tuples
+ Schema mergedSubSchema = mergeSchema(myFs.schema,
+ otherFs.schema,
+ otherTakesAliasPrecedence) ;
+ // return null if they cannot be merged
+ if (mergedSubSchema == null) {
+ return null ;
+ }
+
+ mergedFs = new FieldSchema(mergedAlias, mergedSubSchema) ;
+
+ }
+ outputList.add(mergedFs) ;
+ }
+
+ return new Schema(outputList) ;
+ }
+
+ /***
+ * Merge two aliases. If one of aliases is null, return the other.
+ * Otherwise check the precedence condition
+ * @param alias
+ * @param other
+ * @param otherTakesPrecedence
+ * @return
+ */
+ private String mergeAlias(String alias, String other
+ ,boolean otherTakesPrecedence) {
+ if (alias == null) {
+ return other ;
+ }
+ else if (other == null) {
+ return alias ;
+ }
+ else if (otherTakesPrecedence) {
+ return other ;
+ }
+ else {
+ return alias ;
+ }
+ }
+
+ /***
+ * Merge types if possible
+ * @param type1
+ * @param type2
+ * @return the merged type, or DataType.ERROR if not successful
+ */
+ private byte mergeType(byte type1, byte type2) {
+ // Only legal types can be merged
+ if ( (!DataType.isUsableType(type1)) ||
+ (!DataType.isUsableType(type2)) ) {
+ return DataType.ERROR ;
+ }
+
+ // Same type is OK
+ if (type1==type2) {
+ return type1 ;
+ }
+
+ // Both are number so we return the bigger type
+ if ( (DataType.isNumberType(type1)) &&
+ (DataType.isNumberType(type2)) ) {
+ return type1>type2 ? type1:type2 ;
+ }
+
+ // One is bytearray and the other is (number or chararray)
+ if ( (type1 == DataType.BYTEARRAY) &&
+ ( (type2 == DataType.CHARARRAY) || (DataType.isNumberType(type2)) )
+ ) {
+ return type2 ;
+ }
+
+ if ( (type2 == DataType.BYTEARRAY) &&
+ ( (type1 == DataType.CHARARRAY) || (DataType.isNumberType(type1)) )
+ ) {
+ return type1 ;
+ }
+
+ // else return just ERROR
+ return DataType.ERROR ;
+ }
+
+
}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java?rev=654320&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java Wed May 7 16:18:45 2008
@@ -0,0 +1,113 @@
+package org.apache.pig.test;
+
+import java.util.* ;
+
+import org.apache.pig.data.* ;
+import org.apache.pig.impl.logicalLayer.schema.* ;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import org.junit.* ;
+
+import junit.framework.Assert;
+import junit.framework.TestCase ;
+
+public class TestSchema extends TestCase {
+
+ @Test
+ public void testSchemaEqual1() {
+
+ List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ;
+ innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ;
+ innerList1.add(new FieldSchema("11b", DataType.LONG)) ;
+
+ List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ;
+ innerList2.add(new FieldSchema("11a", DataType.INTEGER)) ;
+ innerList2.add(new FieldSchema("11b", DataType.LONG)) ;
+
+ Schema innerSchema1 = new Schema(innerList1) ;
+ Schema innerSchema2 = new Schema(innerList2) ;
+
+ List<FieldSchema> list1 = new ArrayList<FieldSchema>() ;
+ list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+ list1.add(new FieldSchema("1b", innerSchema1)) ;
+ list1.add(new FieldSchema("1c", DataType.INTEGER)) ;
+
+ List<FieldSchema> list2 = new ArrayList<FieldSchema>() ;
+ list2.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+ list2.add(new FieldSchema("1b", innerSchema2)) ;
+ list2.add(new FieldSchema("1c", DataType.INTEGER)) ;
+
+ Schema schema1 = new Schema(list1) ;
+ Schema schema2 = new Schema(list2) ;
+
+ Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ;
+
+ innerList2.get(1).alias = "pi" ;
+
+ Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+ Assert.assertTrue(Schema.equals(schema1, schema2, false, true)) ;
+
+ innerList2.get(1).alias = "11b" ;
+ innerList2.get(1).type = DataType.BYTEARRAY ;
+
+ Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+ Assert.assertTrue(Schema.equals(schema1, schema2, true, false)) ;
+
+ innerList2.get(1).type = DataType.LONG ;
+
+ Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ;
+
+ list2.get(0).type = DataType.CHARARRAY ;
+ Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+ }
+
+ @Test
+ public void testMerge1() {
+
+ // Generate two schemas
+ List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ;
+ innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ;
+ innerList1.add(new FieldSchema("11b", DataType.FLOAT)) ;
+
+ List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ;
+ innerList2.add(new FieldSchema("22a", DataType.DOUBLE)) ;
+ innerList2.add(new FieldSchema(null, DataType.LONG)) ;
+
+ Schema innerSchema1 = new Schema(innerList1) ;
+ Schema innerSchema2 = new Schema(innerList2) ;
+
+ List<FieldSchema> list1 = new ArrayList<FieldSchema>() ;
+ list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+ list1.add(new FieldSchema("1b", innerSchema1)) ;
+ list1.add(new FieldSchema("1c", DataType.LONG)) ;
+
+ List<FieldSchema> list2 = new ArrayList<FieldSchema>() ;
+ list2.add(new FieldSchema("2a", DataType.BYTEARRAY)) ;
+ list2.add(new FieldSchema("2b", innerSchema2)) ;
+ list2.add(new FieldSchema("2c", DataType.INTEGER)) ;
+
+ Schema schema1 = new Schema(list1) ;
+ Schema schema2 = new Schema(list2) ;
+
+ // Merge
+ Schema mergedSchema = schema1.merge(schema2, true) ;
+
+
+ // Generate expected schema
+ List<FieldSchema> expectedInnerList = new ArrayList<FieldSchema>() ;
+ expectedInnerList.add(new FieldSchema("22a", DataType.DOUBLE)) ;
+ expectedInnerList.add(new FieldSchema("11b", DataType.FLOAT)) ;
+
+ Schema expectedInner = new Schema(expectedInnerList) ;
+
+ List<FieldSchema> expectedList = new ArrayList<FieldSchema>() ;
+ expectedList.add(new FieldSchema("2a", DataType.BYTEARRAY)) ;
+ expectedList.add(new FieldSchema("2b", expectedInner)) ;
+ expectedList.add(new FieldSchema("2c", DataType.LONG)) ;
+
+ Schema expected = new Schema(expectedList) ;
+
+ // Compare
+ Assert.assertTrue(Schema.equals(mergedSchema, expected, false, false)) ;
+ }
+}