You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/02/24 22:52:01 UTC
svn commit: r1074311 - in /pig/trunk: ./ src/org/apache/pig/data/
src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig/parser/
test/org/apache/pig/test/
Author: daijy
Date: Thu Feb 24 21:52:00 2011
New Revision: 1074311
URL: http://svn.apache.org/viewvc?rev=1074311&view=rev
Log:
PIG-1536: use same logic for merging inner schemas in "default union" and "union onschema"
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/data/DataType.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
pig/trunk/test/org/apache/pig/test/TestSchema.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Feb 24 21:52:00 2011
@@ -100,6 +100,9 @@ PIG-1696: Performance: Use System.arrayc
BUG FIXES
+PIG-1536: use same logic for merging inner schemas in "default union" and
+"union onschema" (daijy)
+
PIG-1304: Fail underlying M/R jobs when concatenated gzip and bz2 files are provided as input (laukik via rding)
PIG-1852: Packaging antlr jar with pig.jar (rding via daijy)
Modified: pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DataType.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ pig/trunk/src/org/apache/pig/data/DataType.java Thu Feb 24 21:52:00 2011
@@ -1077,6 +1077,44 @@ public class DataType {
}
/**
+ * Test if one type can cast to the other.
+ * @param castType data type of the cast type
+ * @param inputType data type of the input
+ * @return true or false
+ */
+ public static boolean castable(byte castType, byte inputType) {
+ // Only legal types can be cast to
+ if ( (!DataType.isUsableType(castType)) ||
+ (!DataType.isUsableType(inputType)) ) {
+ return false;
+ }
+
+ // Same type is castable
+ if (castType==inputType) {
+ return true;
+ }
+
+ // Numerical type is castable
+ if ( (DataType.isNumberType(castType)) &&
+ (DataType.isNumberType(inputType)) ) {
+ return true;
+ }
+
+ // databyte can cast to anything
+ if (inputType == DataType.BYTEARRAY) {
+ return true;
+ }
+
+ // Cast numerical type to string, or vice versa is valid
+ if (DataType.isNumberType(inputType)&&castType==DataType.CHARARRAY ||
+ DataType.isNumberType(castType)&&inputType==DataType.CHARARRAY)
+ return true;
+
+ // else return false
+ return false;
+ }
+
+ /**
* Merge types if possible. Merging types means finding a type that one
* or both types can be upcast to.
* @param type1
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Thu Feb 24 21:52:00 2011
@@ -146,7 +146,7 @@ public class LOGenerate extends LogicalR
}
else {
// Merge uid with the exp field schema
- mergedSchema = LogicalSchema.merge(mUserDefinedSchemaCopy, expSchema);
+ mergedSchema = LogicalSchema.merge(mUserDefinedSchemaCopy, expSchema, LogicalSchema.MergeMode.LoadForEach);
if (mergedSchema==null) {
throw new FrontendException("Cannot merge (" + expSchema.toString(false) +
") with user defined schema (" + mUserDefinedSchemaCopy.toString(false) + ")", 1117);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Thu Feb 24 21:52:00 2011
@@ -109,7 +109,7 @@ public class LOLoad extends LogicalRelat
}
if (scriptSchema != null && determinedSchema != null) {
- originalSchema = LogicalSchema.merge(scriptSchema, determinedSchema);
+ originalSchema = LogicalSchema.merge(scriptSchema, determinedSchema, LogicalSchema.MergeMode.LoadForEach);
} else if (scriptSchema != null) originalSchema = scriptSchema;
else if (determinedSchema != null) originalSchema = determinedSchema;
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Thu Feb 24 21:52:00 2011
@@ -85,7 +85,7 @@ public class LOUnion extends LogicalRela
mergedSchema = createMergedSchemaOnAlias( inputs );
} else {
LogicalSchema s1 = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
- mergedSchema = LogicalSchema.merge(s0, s1);
+ mergedSchema = LogicalSchema.merge(s0, s1, LogicalSchema.MergeMode.Union);
if (mergedSchema==null)
return null;
@@ -94,7 +94,7 @@ public class LOUnion extends LogicalRela
LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
if (mergedSchema==null || otherSchema==null)
return null;
- mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema);
+ mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema, LogicalSchema.MergeMode.Union);
if (mergedSchema == null)
return null;
}
@@ -147,7 +147,7 @@ public class LOUnion extends LogicalRela
LogicalSchema mergedSchema = null;
try {
mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );
- } catch(SchemaMergeException e) {
+ } catch(FrontendException e) {
String msg = "Error merging schemas for union operator : "
+ e.getMessage();
throw new FrontendException(msg, 1116, PigException.INPUT, e);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Thu Feb 24 21:52:00 2011
@@ -25,10 +25,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.pig.PigException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
@@ -216,7 +214,115 @@ public class LogicalSchema {
return true ;
}
+
+ /***
+ * Merge two LogicalFieldSchema, the behavior of merge depends on mode.
+ * If mode==MergeType.LoadForEach or MergeType.LoadForEachInner, take left side if compatible, otherwise, throw exception.
+ * If mode==MergeType.UnionInner, if not same type, throw exception, end up with null inner schema.
+ * If mode==MergeType.Union, take more specific type.
+ * @param fs1 In Load/Foreach, fs1 is user declared schema; in Union, fs1 is left side
+ * @param fs2 In Load/Foreach, fs1 is inferred schema; in Union, fs1 is left side
+ * @param mode merge mode
+ */
+ public static LogicalFieldSchema merge(LogicalFieldSchema fs1, LogicalFieldSchema fs2, MergeMode mode) throws FrontendException {
+ // deal with null schema
+ if (mode==MergeMode.LoadForEach) {
+ if (fs1==null) throw new FrontendException("We cannot cast into null", 1031);
+ if (fs2==null) return fs1.deepCopy();
+ } else if (mode==MergeMode.LoadForEachInner) {
+ if (fs1==null)
+ return null;
+ if (fs2==null)
+ return fs1.deepCopy();
+ } else { // Union/UnionInner
+ if(fs1==null||fs2==null)
+ return null;
+ }
+
+ String mergedAlias;
+ byte mergedType = DataType.UNKNOWN;
+ LogicalSchema mergedSubSchema = null;
+
+ // Infer merged data type
+ if (mode==MergeMode.UnionInner) {
+ if (fs1.type!=fs2.type)
+ // We don't merge inner schema of different type for union, throw exception
+ throw new FrontendException("Incompatable field schema: left is \"" + fs1.toString(false) + "\", right is \"" + fs2.toString(false) + "\"", 1031);
+ else
+ mergedType = fs1.type;
+ }
+ else if (mode==MergeMode.LoadForEach||mode==MergeMode.LoadForEachInner) {
+ if (fs1.type==DataType.NULL||fs1.type==DataType.BYTEARRAY) // If declared schema does not have type part
+ mergedType = fs2.type;
+ else if (!DataType.castable(fs1.type, fs2.type))
+ throw new FrontendException("Incompatable field schema: declared is \"" + fs1.toString(false) + "\", infered is \"" + fs2.toString(false) + "\"", 1031);
+ else mergedType = fs1.type; // If compatible type, we take the declared type
+ }
+ else {
+ // Union schema
+ if (fs1.type==DataType.BYTEARRAY) {
+ mergedType=fs2.type;
+ } else if (fs2.type==DataType.BYTEARRAY) {
+ mergedType = fs1.type;
+ }
+ else {
+ // Take the more specific type
+ mergedType = DataType.mergeType(fs1.type, fs2.type);
+ if (mergedType == DataType.ERROR) {
+ // Another possibility is one side is numeric, the other side is string, in this case, we take string
+ if (DataType.isNumberType(fs1.type) && fs2.type==DataType.CHARARRAY ||
+ DataType.isNumberType(fs2.type) && fs1.type==DataType.CHARARRAY)
+ mergedType = DataType.CHARARRAY;
+ else {
+ // True incompatible, set to bytearray
+ mergedType = DataType.BYTEARRAY;
+ }
+ }
+ }
+ }
+
+ if (fs1.alias==null)
+ mergedAlias = fs2.alias;
+ else {
+ mergedAlias = fs1.alias; // If both schema have alias, the first one win
+ }
+ if (DataType.isSchemaType(mergedType)) {
+ if (mode==MergeMode.Union) {
+ try {
+ if (fs1.type==DataType.BYTEARRAY) {
+ if (fs2.schema!=null)
+ mergedSubSchema = fs2.schema.deepCopy();
+ }
+ else if (fs2.type==DataType.BYTEARRAY) {
+ if (fs1.schema!=null)
+ mergedSubSchema = fs1.schema.deepCopy();
+ }
+ else {
+ mergedSubSchema = LogicalSchema.merge(fs1.schema, fs2.schema, MergeMode.UnionInner);
+ }
+ } catch (FrontendException e) {
+ // If inner schema is not compatible, mergedSubSchema set to null
+ }
+ }
+ else {
+ if (mode==MergeMode.UnionInner)
+ mergedSubSchema = LogicalSchema.merge(fs1.schema, fs2.schema, MergeMode.UnionInner);
+ else {
+ // LoadForEach/LoadForEachInner
+ try {
+ // Only check compatibility
+ LogicalSchema.merge(fs1.schema, fs2.schema, MergeMode.LoadForEachInner);
+ mergedSubSchema = fs1.schema;
+ } catch (FrontendException e) {
+ throw new FrontendException("Incompatable field schema: left is \"" + fs1.toString(false) + "\", right is \"" + fs2.toString(false) + "\"", 1031);
+ }
+ }
+ }
+ }
+ LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType);
+ return mergedFS;
+ }
}
private List<LogicalFieldSchema> fields;
@@ -353,44 +459,48 @@ public class LogicalSchema {
return -1;
}
+ public static enum MergeMode {
+ LoadForEach,
+ LoadForEachInner,
+ Union,
+ UnionInner
+ }
/**
* Merge two schemas.
- * @param s1
- * @param s2
+ * @param s1 In Load/ForEach, s1 is user declared schema; In Union, s1 is left side.
+ * @param s2 In Load/ForEach, s2 is infered schema; In Union, s2 is right side.
+ * @param mode We merge schema in Load/Foreach/Union. In Load/Foreach, we always take s1 if compatible (s1 is set to be user defined schema),
+ * In union, we take more specific type (between numeric and string, we take string). In the case type mismatch in s1/s2,
+ * we expect TypeCheckingVisitor will fill the gap later.
* @return a merged schema, or null if the merge fails
*/
- public static LogicalSchema merge(LogicalSchema s1, LogicalSchema s2) throws FrontendException {
+ public static LogicalSchema merge(LogicalSchema s1, LogicalSchema s2, MergeMode mode) throws FrontendException {
// If any of the schema is null, take the other party
if (s1==null || s2==null) {
- if (s1!=null) return s1.deepCopy();
- else if (s2!=null) return s2.deepCopy();
- else return null;
+ if (mode==MergeMode.LoadForEach||mode==MergeMode.LoadForEachInner) {
+ if (s1!=null) return s1.deepCopy();
+ else if (s2!=null) return s2.deepCopy();
+ else return null;
+ }
+ else // Union/UnionInner, take null
+ return null;
}
- if (s1.size()!=s2.size()) return null;
+
+ if (s1.size()!=s2.size()) {
+ if (mode==MergeMode.Union) // In union, incompatible type result a null schema
+ return null;
+ else
+ throw new FrontendException("Incompatable schema: left is \"" + s1.toString(false) + "\", right is \"" + s2.toString(false) + "\"", 1031);
+ }
+
LogicalSchema mergedSchema = new LogicalSchema();
for (int i=0;i<s1.size();i++) {
- String mergedAlias;
- byte mergedType;
- LogicalSchema mergedSubSchema = null;
LogicalFieldSchema fs1 = s1.getField(i);
LogicalFieldSchema fs2 = s2.getField(i);
- if (fs1.alias==null)
- mergedAlias = fs2.alias;
- else {
- mergedAlias = fs1.alias; // If both schema have alias, the first one win
- }
- if (fs1.type==DataType.NULL)
- mergedType = fs2.type;
- else
- mergedType = fs1.type;
-
- if (DataType.isSchemaType(mergedType)) {
- mergedSubSchema = merge(fs1.schema, fs2.schema);
- }
- LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType);
+ LogicalFieldSchema mergedFS = LogicalFieldSchema.merge(fs1, fs2, mode);
mergedSchema.addField(mergedFS);
}
return mergedSchema;
@@ -449,7 +559,7 @@ public class LogicalSchema {
* @return merged schema
*/
public static LogicalSchema mergeSchemasByAlias(List<LogicalSchema> schemas)
- throws SchemaMergeException{
+ throws FrontendException{
LogicalSchema mergedSchema = null;
// list of schemas that have currently been merged, used in error message
@@ -463,11 +573,11 @@ public class LogicalSchema {
try{
mergedSchema = mergeSchemaByAlias( mergedSchema, sch );
mergedSchemas.add(sch);
- }catch(SchemaMergeException e){
+ }catch(FrontendException e){
String msg = "Error merging schema: (" + sch + ") with "
+ "merged schema: (" + mergedSchema + ")" + " of schemas : "
+ mergedSchemas;
- throw new SchemaMergeException(msg, e);
+ throw new FrontendException(msg, e);
}
}
return mergedSchema;
@@ -481,7 +591,7 @@ public class LogicalSchema {
* For Tuples and Bags, SubSchemas have to be equal be considered compatible
*/
public static LogicalSchema mergeSchemaByAlias(LogicalSchema schema1, LogicalSchema schema2)
- throws SchemaMergeException{
+ throws FrontendException {
LogicalSchema mergedSchema = new LogicalSchema();
HashSet<LogicalFieldSchema> schema2colsAdded = new HashSet<LogicalFieldSchema>();
// add/merge fields present in first schema
@@ -496,9 +606,12 @@ public class LogicalSchema {
schema1.getField( fs2.alias );
}
schema2colsAdded.add(fs2);
+ LogicalFieldSchema mergedFs = LogicalFieldSchema.merge(fs1,fs2, MergeMode.Union);
+ mergedFs.alias = mergeNameSpacedAlias(fs1.alias, fs2.alias);
+ mergedSchema.addField(mergedFs);
}
- LogicalFieldSchema mergedFs = mergeFieldSchemaFirstLevelSameAlias(fs1,fs2);
- mergedSchema.addField( mergedFs );
+ else
+ mergedSchema.addField(new LogicalFieldSchema(fs1));
}
//add schemas from 2nd schema, that are not already present in
@@ -513,9 +626,9 @@ public class LogicalSchema {
}
private static void checkNullAlias(LogicalFieldSchema fs, LogicalSchema schema)
- throws SchemaMergeException {
+ throws FrontendException {
if(fs.alias == null){
- throw new SchemaMergeException(
+ throw new FrontendException(
"Schema having field with null alias cannot be merged " +
"using alias. Schema :" + schema
);
@@ -523,61 +636,11 @@ public class LogicalSchema {
}
/**
- * Schema will not be merged if types are incompatible,
- * as per DataType.mergeType(..)
- * For Tuples and Bags, SubSchemas have to be equal be considered compatible
- * Aliases are assumed to be same for both
- */
- private static LogicalFieldSchema mergeFieldSchemaFirstLevelSameAlias(LogicalFieldSchema fs1,
- LogicalFieldSchema fs2)
- throws SchemaMergeException {
- if(fs1 == null)
- return fs2;
- if(fs2 == null)
- return fs1;
-
- LogicalSchema innerSchema = null;
-
- String alias = mergeNameSpacedAlias(fs1.alias, fs2.alias);
-
- byte mergedType = DataType.mergeType(fs1.type, fs2.type) ;
-
- // If the types cannot be merged
- if (mergedType == DataType.ERROR) {
- int errCode = 1031;
- String msg = "Incompatible types for merging schemas. Field schema: "
- + fs1 + " Other field schema: " + fs2;
- throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
- }
- if(DataType.isSchemaType(mergedType)) {
- // if one of them is a bytearray, pick inner schema of other one
- if( fs1.type == DataType.BYTEARRAY ){
- innerSchema = fs2.schema;
- }else if(fs2.type == DataType.BYTEARRAY){
- innerSchema = fs1.schema;
- }
- else {
- //in case of types with inner schema such as bags and tuples
- // the inner schema has to be same
- if(!equals(fs1.schema, fs2.schema, false, false)){
- int errCode = 1032;
- String msg = "Incompatible types for merging inner schemas of " +
- " Field schema type: " + fs1 + " Other field schema type: " + fs2;
- throw new SchemaMergeException(msg, errCode, PigException.INPUT) ;
- }
- innerSchema = fs1.schema;
- }
- }
-
- return new LogicalFieldSchema(alias, innerSchema, mergedType) ;
- }
-
- /**
* If one of the aliases is of form 'nm::str1', and other is of the form
* 'str1', this returns str1
*/
private static String mergeNameSpacedAlias(String alias1, String alias2)
- throws SchemaMergeException {
+ throws FrontendException {
if(alias1.equals(alias2)){
return alias1;
}
Modified: pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestUnionOnSchemaSetter.java Thu Feb 24 21:52:00 2011
@@ -75,22 +75,19 @@ public class TestUnionOnSchemaSetter {
}
@Test
- public void testNegative1() {
+ public void testMergeCompatibleSchema() throws FrontendException {
String query = "A = load 'x' as ( u:int, v:long, w:chararray); " +
"B = load 'y' as ( u:int, v:long, w:long); " +
"C = union onschema A, B; " +
"D = store C into 'output';";
LogicalPlan plan = generateLogicalPlan( query );
if( plan != null ) {
- UnionOnSchemaSetter visitor;
- try {
- visitor = new UnionOnSchemaSetter( plan );
- visitor.visit();
- } catch (FrontendException e) {
- return; // Expect an exception.
- }
+ int nc = plan.size();
+ UnionOnSchemaSetter visitor = new UnionOnSchemaSetter( plan );
+ visitor.visit();
+ System.out.println( "Plan after setter: " + plan.toString() );
+ Assert.assertEquals( nc+1, plan.size() ); // ForEach inserted before union
}
- Assert.fail( "Test case shouldn't pass!" );
}
private LogicalPlan generateLogicalPlan(String query) {
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Thu Feb 24 21:52:00 2011
@@ -902,8 +902,8 @@ public class TestEvalPipeline2 extends T
pigServer.openIterator("c");
} catch (Exception e) {
PigException pe = LogUtils.getPigException(e);
- assertTrue(pe.getErrorCode()==1117);
- assertTrue(pe.getMessage().contains("Cannot merge"));
+ assertTrue(pe.getErrorCode()==1031);
+ assertTrue(pe.getMessage().contains("Incompatable schema"));
return;
}
fail();
Modified: pig/trunk/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=1074311&r1=1074310&r2=1074311&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSchema.java Thu Feb 24 21:52:00 2011
@@ -48,12 +48,14 @@ import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.MergeMode;
import org.junit.Test;
public class TestSchema extends TestCase {
@@ -693,7 +695,172 @@ public class TestSchema extends TestCase
Schema s2 = Utils.getSchemaFromString("b:{t:(b0:int, b1:int)}");
LogicalSchema ls1 = org.apache.pig.newplan.logical.Util.translateSchema(s1);
LogicalSchema ls2 = org.apache.pig.newplan.logical.Util.translateSchema(s2);
- LogicalSchema ls3 = LogicalSchema.merge(ls1, ls2);
+ LogicalSchema ls3 = LogicalSchema.merge(ls1, ls2, MergeMode.LoadForEach);
assertTrue(org.apache.pig.newplan.logical.Util.translateSchema(ls3).toString().equals("{a: {t: (a0: int,a1: int)}}"));
}
+
+ @Test
+ public void testNewNormalNestedMerge1() throws Exception {
+ LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:bytearray, b1:(b11:int, b12:float), c1:long"));
+ LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a2:bytearray, b2:(b21:double, b22:long), c2:int"));
+
+ LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+ LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:bytearray, b1:(), c1:long"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+
+ mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+ expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:bytearray, b1:(b11:int, b12:float), c1:long"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+
+ mergedSchema = LogicalSchema.merge(b, a, LogicalSchema.MergeMode.LoadForEach);
+ expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a2:bytearray, b2:(b21:double, b22:long), c2:int"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+ }
+
+ @Test
+ public void testNewNormalNestedMerge2() throws Exception {
+ LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:(a11:chararray, a12:float), b1:(b11:chararray, b12:float), c1:long"));
+ LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a2:bytearray, b2:(b21:double, b22:long), c2:chararray"));
+
+ LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+ LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:(a11:chararray, a12:float), b1:(), c1:chararray"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+
+ mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+ expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:(a11:chararray, a12:float), b1:(b11:chararray, b12:float), c1:long"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+
+ try {
+ LogicalSchema.merge(b, a, LogicalSchema.MergeMode.LoadForEach);
+ fail();
+ } catch (FrontendException e) {
+ assertTrue(e.getErrorCode()==1031);
+ }
+ }
+
+ @Test
+ public void testNewMergeNullSchemas() throws Throwable {
+ LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:bytearray, b1:(b11:int, b12:float), c1:long"));
+ LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a2:bytearray, b2:(), c2:int"));
+
+ LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+ LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:bytearray, b1:(), c1:long"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+
+ mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+ expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:bytearray, b1:(b11:int, b12:float), c1:long"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+
+ mergedSchema = LogicalSchema.merge(b, a, LogicalSchema.MergeMode.LoadForEach);
+ expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a2:bytearray, b2:(), c2:int"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+ }
+
+ @Test
+ public void testNewMergeDifferentSize1() throws Throwable {
+ LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:bytearray, b1:long, c1:long"));
+ LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a2:bytearray, b2:long"));
+
+ LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+ assertTrue(mergedSchema==null);
+
+ try {
+ LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+ fail();
+ } catch (FrontendException e) {
+ assertTrue(e.getErrorCode()==1031);
+ }
+ }
+
+ @Test
+ public void testNewMergeDifferentSize2() throws Throwable {
+ LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:bytearray, b1:(b11:int, b12:float, b13:float), c1:long"));
+ LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a2:bytearray, b2:(b21:double, b22:long), c2:int"));
+
+ LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+ LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:bytearray, b1:(), c1:long"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+
+ try {
+ LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+ fail();
+ } catch (FrontendException e) {
+ assertTrue(e.getErrorCode()==1031);
+ }
+ }
+
+
+ @Test
+ public void testNewMergeMismatchType1() throws Throwable {
+ LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:chararray, b1:long, c1:long"));
+ LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a2:bytearray, b2:(b21:double, b22:long), c2:int"));
+
+ LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+ LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:chararray, b1:bytearray, c1:long"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+
+ try {
+ LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+ fail();
+ } catch (FrontendException e) {
+ assertTrue(e.getErrorCode()==1031);
+ }
+
+ try {
+ LogicalSchema.merge(b, a, LogicalSchema.MergeMode.LoadForEach);
+ fail();
+ } catch (FrontendException e) {
+ assertTrue(e.getErrorCode()==1031);
+ }
+ }
+
+
+ @Test
+ public void testNewMergeMismatchType2() throws Throwable {
+ LogicalSchema a = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:chararray, b1:(b11:double, b12:(b121:int)), c1:long"));
+ LogicalSchema b = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a2:bytearray, b2:(b21:double, b22:long), c2:int"));
+
+ LogicalSchema mergedSchema = LogicalSchema.merge(a, b, LogicalSchema.MergeMode.Union);
+ LogicalSchema expected = org.apache.pig.newplan.logical.Util.translateSchema(Util.getSchemaFromString(
+ "a1:chararray, b1:(), c1:long"));
+ Assert.assertTrue(LogicalSchema.equals(mergedSchema, expected, false, false));
+
+ try {
+ LogicalSchema.merge(a, b, LogicalSchema.MergeMode.LoadForEach);
+ fail();
+ } catch (FrontendException e) {
+ assertTrue(e.getErrorCode()==1031);
+ }
+
+ try {
+ LogicalSchema.merge(b, a, LogicalSchema.MergeMode.LoadForEach);
+ fail();
+ } catch (FrontendException e) {
+ assertTrue(e.getErrorCode()==1031);
+ }
+ }
}