You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/08/29 01:44:13 UTC
svn commit: r690049 - in /incubator/pig/branches/types:
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/logi...
Author: gates
Date: Thu Aug 28 16:44:13 2008
New Revision: 690049
URL: http://svn.apache.org/viewvc?rev=690049&view=rev
Log:
PIG-400 Fix issues with flatten and schema naming.
Modified:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Thu Aug 28 16:44:13 2008
@@ -72,7 +72,6 @@
@Override
public Result getNext(Tuple t) throws ExecException {
- log.info("inputsAccumulated: " + inputsAccumulated);
if (!inputsAccumulated) {
Result in = processInput();
distinctBag = BagFactory.getInstance().newDistinctBag();
@@ -84,11 +83,9 @@
continue;
}
distinctBag.add((Tuple) in.result);
- log.info("Added tuple" + in.result + " to the distinct bag");
in = processInput();
}
inputsAccumulated = true;
- log.info("Distinct bag: " + distinctBag);
}
if (it == null) {
it = distinctBag.iterator();
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Thu Aug 28 16:44:13 2008
@@ -25,10 +25,12 @@
import java.util.Iterator;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.data.DataType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,6 +48,7 @@
private ArrayList<LogicalPlan> mForEachPlans;
private ArrayList<Boolean> mFlatten;
+ private ArrayList<Schema> mUserDefinedSchema = null;
private static Log log = LogFactory.getLog(LOForEach.class);
/**
@@ -65,6 +68,16 @@
mFlatten = flattenList;
}
+ public LOForEach(LogicalPlan plan, OperatorKey k,
+ ArrayList<LogicalPlan> foreachPlans, ArrayList<Boolean> flattenList,
+ ArrayList<Schema> userDefinedSchemaList) {
+
+ super(plan, k);
+ mForEachPlans = foreachPlans;
+ mFlatten = flattenList;
+ mUserDefinedSchema = userDefinedSchemaList;
+ }
+
public ArrayList<LogicalPlan> getForEachPlans() {
return mForEachPlans;
}
@@ -92,6 +105,16 @@
return DataType.BAG ;
}
+ private void updateAliasCount(Map<String, Integer> aliases, String alias) {
+ if((null == aliases) || (null == alias)) return;
+ Integer count = aliases.get(alias);
+ if(null == count) {
+ aliases.put(alias, 1);
+ } else {
+ aliases.put(alias, ++count);
+ }
+ }
+
@Override
public Schema getSchema() throws FrontendException {
log.debug("Entering getSchema");
@@ -124,6 +147,10 @@
try {
planFs = ((ExpressionOperator)op).getFieldSchema();
log.debug("planFs: " + planFs);
+ Schema userDefinedSchema = null;
+ if(null != mUserDefinedSchema) {
+ userDefinedSchema = mUserDefinedSchema.get(planCtr);
+ }
if(null != planFs) {
String outerCanonicalAlias = op.getAlias();
if(null == outerCanonicalAlias) {
@@ -137,56 +164,108 @@
//flatten(B.(x,y,z))
Schema s = planFs.schema;
if(null != s) {
- for(Schema.FieldSchema fs: s.getFields()) {
+ for(int i = 0; i < s.size(); ++i) {
+ Schema.FieldSchema fs;
+ try {
+ fs = s.getField(i);
+ } catch (ParseException pe) {
+ throw new FrontendException(pe.getMessage());
+ }
log.debug("fs: " + fs);
- log.debug("fs.alias: " + fs.alias);
+ if(null != userDefinedSchema) {
+ Schema.FieldSchema userDefinedFieldSchema;
+ try {
+ if(i < userDefinedSchema.size()) {
+ userDefinedFieldSchema = userDefinedSchema.getField(i);
+ fs = fs.mergePrefixFieldSchema(userDefinedFieldSchema);
+ }
+ } catch (ParseException pe) {
+ throw new FrontendException(pe.getMessage());
+ } catch (SchemaMergeException sme) {
+ throw new FrontendException(sme.getMessage());
+ }
+ outerCanonicalAlias = null;
+ }
String innerCanonicalAlias = fs.alias;
+ Schema.FieldSchema newFs;
if((null != outerCanonicalAlias) && (null != innerCanonicalAlias)) {
String disambiguatorAlias = outerCanonicalAlias + "::" + innerCanonicalAlias;
- Schema.FieldSchema newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
+ newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
fss.add(newFs);
- Integer count;
- count = aliases.get(innerCanonicalAlias);
- if(null == count) {
- aliases.put(innerCanonicalAlias, 1);
- } else {
- aliases.put(innerCanonicalAlias, ++count);
- }
- count = aliases.get(disambiguatorAlias);
- if(null == count) {
- aliases.put(disambiguatorAlias, 1);
- } else {
- aliases.put(disambiguatorAlias, ++count);
- }
- flattenAlias.put(newFs, innerCanonicalAlias);
- inverseFlattenAlias.put(innerCanonicalAlias, true);
+ updateAliasCount(aliases, disambiguatorAlias);
//it's fine if there are duplicates
//we just need to record if its due to
//flattening
} else {
- Schema.FieldSchema newFs = new Schema.FieldSchema(null, fs.schema, fs.type);
+ newFs = new Schema.FieldSchema(fs.alias, fs.schema, fs.type);
fss.add(newFs);
}
+ updateAliasCount(aliases, innerCanonicalAlias);
+ flattenAlias.put(newFs, innerCanonicalAlias);
+ inverseFlattenAlias.put(innerCanonicalAlias, true);
}
} else {
- Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
- fss.add(newFs);
+ Schema.FieldSchema newFs;
+ if(null != userDefinedSchema) {
+ if(!DataType.isSchemaType(planFs.type)) {
+ if(userDefinedSchema.size() > 1) {
+ throw new FrontendException("Schema mismatch. A basic type on flattening cannot have more than one column. User defined schema: " + userDefinedSchema);
+ }
+ newFs = new Schema.FieldSchema(null, planFs.type);
+ try {
+ newFs = newFs.mergePrefixFieldSchema(userDefinedSchema.getField(0));
+ } catch (SchemaMergeException sme) {
+ throw new FrontendException(sme.getMessage());
+ } catch (ParseException pe) {
+ throw new FrontendException(pe.getMessage());
+ }
+ updateAliasCount(aliases, newFs.alias);
+ fss.add(newFs);
+ } else {
+ for(Schema.FieldSchema ufs: userDefinedSchema.getFields()) {
+ fss.add(new Schema.FieldSchema(ufs.alias, ufs.schema, ufs.type));
+ updateAliasCount(aliases, ufs.alias);
+ }
+ }
+ } else {
+ if(!DataType.isSchemaType(planFs.type)) {
+ newFs = new Schema.FieldSchema(null, planFs.type);
+ } else {
+ newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ }
+ fss.add(newFs);
+ }
}
} else {
//just populate the schema with the field schema of the expression operator
+ //check if the user has defined a schema for the operator; compare the schema
+ //with that of the expression operator field schema and then add it to the list
+ if(null != userDefinedSchema) {
+ try {
+ planFs = planFs.mergePrefixFieldSchema(userDefinedSchema.getField(0));
+ updateAliasCount(aliases, planFs.alias);
+ } catch (SchemaMergeException sme) {
+ throw new FrontendException(sme.getMessage());
+ } catch (ParseException pe) {
+ throw new FrontendException(pe.getMessage());
+ }
+ }
fss.add(planFs);
- if(null != outerCanonicalAlias) {
- Integer count = aliases.get(outerCanonicalAlias);
- if(null == count) {
- aliases.put(outerCanonicalAlias, 1);
- } else {
- aliases.put(outerCanonicalAlias, ++count);
- }
- }
}
} else {
//did not get a valid list of field schemas
- fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ String outerCanonicalAlias = null;
+ if(null != userDefinedSchema) {
+ try {
+ Schema.FieldSchema userDefinedFieldSchema = userDefinedSchema.getField(0);
+ fss.add(userDefinedFieldSchema);
+ updateAliasCount(aliases, userDefinedFieldSchema.alias);
+ } catch (ParseException pe) {
+ throw new FrontendException(pe.getMessage());
+ }
+ } else {
+ fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ }
}
} catch (FrontendException fee) {
mSchema = null;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java Thu Aug 28 16:44:13 2008
@@ -43,6 +43,7 @@
//private ArrayList<ExpressionOperator> mProjections;
private ArrayList<LogicalPlan> mGeneratePlans;
private ArrayList<Boolean> mFlatten;
+ private ArrayList<Schema> mUserDefinedSchema = null;
private static Log log = LogFactory.getLog(LOGenerate.class);
/**
@@ -64,6 +65,16 @@
mFlatten = flatten;
}
+ public LOGenerate(LogicalPlan plan, OperatorKey key,
+ ArrayList<LogicalPlan> generatePlans, ArrayList<Boolean> flatten,
+ ArrayList<Schema> userDefinedSchemaList) {
+ super(plan, key);
+ mGeneratePlans = generatePlans;
+ mFlatten = flatten;
+ mUserDefinedSchema = userDefinedSchemaList;
+ }
+
+
/**
*
* @param plan
@@ -94,6 +105,10 @@
return mFlatten;
}
+ public List<Schema> getUserDefinedSchema() {
+ return mUserDefinedSchema;
+ }
+
@Override
public String name() {
return "Generate " + mKey.scope + "-" + mKey.id;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Aug 28 16:44:13 2008
@@ -391,6 +391,26 @@
log.trace("Exiting attachPlan");
}
+ public static class SchemaUtils {
+ public static void setSchemaDefaultType(Schema s, byte t) {
+ if(null == s) return;
+ for(Schema.FieldSchema fs: s.getFields()) {
+ setFieldSchemaDefaultType(fs, t);
+ }
+ }
+
+ public static void setFieldSchemaDefaultType(Schema.FieldSchema fs, byte t) {
+ if(null == fs) return;
+ if(DataType.NULL == fs.type) {
+ fs.type = t;
+ }
+ if(DataType.isSchemaType(fs.type)) {
+ setSchemaDefaultType(fs.schema, t);
+ }
+ }
+ }
+
+
}
@@ -478,7 +498,6 @@
}
}
-
PARSER_END(QueryParser)
// Skip all the new lines, tabs and spaces
@@ -670,12 +689,12 @@
LogicalOperator Expr(LogicalPlan lp) :
{
LogicalOperator op;
- Schema schema;
+ Schema schema = null;
log.trace("Entering Expr");
}
{
(
- ( op = NestedExpr(lp) [ <AS> "(" schema = TupleSchema() ")" {op.setSchema(schema);} ] )
+ ( op = NestedExpr(lp) [ <AS> "(" schema = TupleSchema() ")" {SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY); op.setSchema(schema);} ] )
| op = BaseExpr(lp)
)
{log.trace("Exiting Expr"); return op;}
@@ -736,7 +755,25 @@
(
(
(<DEFINE> op = DefineClause(lp))
-| (<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema); op.setCanonicalNames(); log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.debug("Load as atomschema()");schema.printAliases();}) ])
+| (<LOAD> op = LoadClause(lp)
+ [ <AS>
+ (
+ LOOKAHEAD(2) "(" schema = TupleSchema() ")"
+ {
+ SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY);
+ op.setSchema(schema);
+ op.setCanonicalNames();
+ log.debug("Load as schema" + schema);
+ }
+ | fs = AtomSchema()
+ {
+ schema = new Schema(fs);
+ op.setSchema(schema);
+ log.debug("Load as atomschema" + schema);
+ }
+ )
+ ]
+ )
| ((<GROUP> | <COGROUP>) op = CogroupClause(lp))
| (<FILTER> op = FilterClause(lp))
| (<LIMIT> op = LimitClause(lp))
@@ -1149,6 +1186,7 @@
ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>();
LogicalPlan groupByPlan;
ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+ ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>();
log.trace("Entering GroupItem");
log.debug("LogicalPlan: " + lp);
}
@@ -1159,16 +1197,16 @@
( <BY>
(
LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" )
- ( "(" es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList)
+ ( "(" es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
{listPlans.add(groupByPlan);}
(
- "," es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList)
+ "," es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
{listPlans.add(groupByPlan);}
)*
")"
)
| (
- es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList)
+ es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
{listPlans.add(groupByPlan);}
)
)
@@ -1412,6 +1450,7 @@
LOGenerate generate = (LOGenerate)specList.get(specList.size() - 1);
List<LogicalPlan> generatePlans = generate.getGeneratePlans();
List<Boolean> flattenList = generate.getFlatten();
+ List<Schema> userDefinedSchemaList = generate.getUserDefinedSchema();
/*
Generate's nested plans will be translated to foreach's nested plan
If generate contains an expression that does not require generate's
@@ -1461,7 +1500,7 @@
}
resetGenerateState();
- foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), (ArrayList)foreachPlans, (ArrayList)flattenList);
+ foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), (ArrayList)foreachPlans, (ArrayList)flattenList, (ArrayList) userDefinedSchemaList);
try {
lp.add(foreach);
log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
@@ -1802,17 +1841,18 @@
{
ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>();
ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+ ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>();
ExpressionOperator item;
LogicalPlan generatePlan;
log.trace("Entering FlattenedGenerateItemList");
}
{
(
- item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList)
+ item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList, userDefinedSchemaList)
{
generatePlans.add(generatePlan);
}
- ("," item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList)
+ ("," item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList, userDefinedSchemaList)
{
generatePlans.add(generatePlan);
}
@@ -1820,7 +1860,7 @@
)
{
- LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList);
+ LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList, userDefinedSchemaList);
lp.add(generate);
log.debug("Added operator " + generate.getClass().getName() + " to the logical plan");
log.trace("Exiting FlattenedGenerateItemList");
@@ -1829,7 +1869,7 @@
}
-ExpressionOperator FlattenedGenerateItem(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input, ArrayList<Boolean> flattenList):
+ExpressionOperator FlattenedGenerateItem(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input, ArrayList<Boolean> flattenList, ArrayList<Schema> userDefinedSchemaList):
{
ExpressionOperator item;
Schema schema = null;
@@ -1841,13 +1881,16 @@
{
(
(
- (
<FLATTEN> "(" item = InfixExpr(over,specs,lp,input) ")"
{
flatten = true;
}
+ [ <AS> "(" schema = TupleSchema() ")" ]
)
-| (item = InfixExpr(over,specs,lp,input))
+|
+ (
+ (
+ (item = InfixExpr(over,specs,lp,input))
| ( <STAR>
{
LOProject project = new LOProject(lp, new OperatorKey(scope, getNextId()), input, -1);
@@ -1861,15 +1904,17 @@
log.debug("FGItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
}
)
+ )
+ [ <AS> fs = FieldSchema() ]
)
- [ <AS> (fs = FieldSchema()) ]
)
{
log.debug("item: " + item.getClass().getName());
if(null != fs) {
- item.setFieldSchema(fs);
+ schema = new Schema(fs);
}
- flattenList.add(flatten);
+ flattenList.add(flatten);
+ userDefinedSchemaList.add(schema);
log.trace("Exiting FlattenedGenerateItem");
return item;
}
@@ -2289,7 +2334,7 @@
Schema.FieldSchema AtomSchema() :
{
Token t1 = null;
- byte type = DataType.BYTEARRAY;
+ byte type = DataType.NULL;
Schema.FieldSchema fs;
log.trace("Entering AtomSchema");
}
@@ -2316,7 +2361,7 @@
log.trace("Entering SchemaMap");
}
{
- ( t1 = <IDENTIFIER> ) [":" <MAP>] "[" "]"
+ ( t1 = <IDENTIFIER> ) [LOOKAHEAD(2) ":" <MAP>| ":"] "[" "]"
{
if (null != t1) {
log.debug("MAP alias " + t1.image);
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu Aug 28 16:44:13 2008
@@ -335,6 +335,83 @@
}
}
+ /***
+ * Recursively prefix merge two schemas
+ * @param other the other field schema to be merged with
+ * @return the prefix merged field schema this can be null if one schema is null and
+ * allowIncompatibleTypes is true
+ *
+ * @throws SchemaMergeException if they cannot be merged
+ */
+
+ public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs) throws SchemaMergeException {
+ return mergePrefixFieldSchema(otherFs, true);
+ }
+
+ /***
+ * Recursively prefix merge two schemas
+ * @param other the other field schema to be merged with
+ * @param otherTakesAliasPrecedence true if aliases from the other
+ * field schema take precedence
+ * @return the prefix merged field schema this can be null if one schema is null and
+ * allowIncompatibleTypes is true
+ *
+ * @throws SchemaMergeException if they cannot be merged
+ */
+
+ public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs,
+ boolean otherTakesAliasPrecedence)
+ throws SchemaMergeException {
+ Schema.FieldSchema myFs = this;
+ Schema.FieldSchema mergedFs = null;
+ byte mergedType = DataType.NULL;
+
+ if(null == otherFs) {
+ return myFs;
+ }
+
+ if((myFs.type == DataType.NULL || myFs.type == DataType.UNKNOWN)
+ && (otherFs.type == DataType.NULL || otherFs.type == DataType.UNKNOWN)) {
+ throw new SchemaMergeException("Type mismatch. No useful type for merging. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
+ } else if(myFs.type == otherFs.type) {
+ mergedType = myFs.type;
+ } else if((myFs.type == DataType.NULL || myFs.type == DataType.UNKNOWN)
+ && (otherFs.type == DataType.NULL)) {
+ mergedType = DataType.BYTEARRAY;
+ } else if ((myFs.type != DataType.NULL && myFs.type != DataType.UNKNOWN)
+ && (otherFs.type == DataType.NULL)) {
+ mergedType = myFs.type;
+ } else {
+ throw new SchemaMergeException("Type mismatch. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
+ }
+
+ String mergedAlias = mergeAlias(myFs.alias,
+ otherFs.alias,
+ otherTakesAliasPrecedence) ;
+
+ if (!DataType.isSchemaType(mergedType)) {
+ // just normal merge
+ mergedFs = new FieldSchema(mergedAlias, mergedType) ;
+ }
+ else {
+ Schema mergedSubSchema = null;
+ // merge inner schemas because both sides have schemas
+ if(null != myFs.schema) {
+ mergedSubSchema = myFs.schema.mergePrefixSchema(otherFs.schema,
+ otherTakesAliasPrecedence);
+ } else {
+ mergedSubSchema = otherFs.schema;
+ }
+ // create the merged field
+ try {
+ mergedFs = new FieldSchema(mergedAlias, mergedSubSchema, mergedType) ;
+ } catch (FrontendException fee) {
+ throw new SchemaMergeException(fee.getMessage());
+ }
+ }
+ return mergedFs;
+ }
+
}
private List<FieldSchema> mFields;
@@ -1030,6 +1107,70 @@
return new Schema(outerSchema);
}
+ /***
+ * Recursively prefix merge two schemas
+ * @param other the other schema to be merged with
+ * @param otherTakesAliasPrecedence true if aliases from the other
+ * schema take precedence
+ * @return the prefix merged schema this can be null if one schema is null and
+ * allowIncompatibleTypes is true
+ *
+ * @throws SchemaMergeException if they cannot be merged
+ */
+
+ public Schema mergePrefixSchema(Schema other,
+ boolean otherTakesAliasPrecedence)
+ throws SchemaMergeException {
+ Schema schema = this;
+
+ if (other == null) {
+ return this ;
+ }
+
+ if (schema.size() < other.size()) {
+ throw new SchemaMergeException("Schema size mismatch. Other schema size greater than schema size. Schema: " + this + ". Other schema: " + other);
+ }
+
+ List<FieldSchema> outputList = new ArrayList<FieldSchema>() ;
+
+ List<FieldSchema> mylist = schema.mFields ;
+ List<FieldSchema> otherlist = other.mFields ;
+
+ // We iterate up to the smaller one's size
+ int iterateLimit = other.mFields.size();
+
+ int idx = 0;
+ for (; idx< iterateLimit ; idx ++) {
+
+ // Just for readability
+ FieldSchema myFs = mylist.get(idx) ;
+ FieldSchema otherFs = otherlist.get(idx) ;
+
+ FieldSchema mergedFs = myFs.mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence);
+ outputList.add(mergedFs) ;
+ }
+ // if the first schema has leftover, then append the rest
+ for(int i=idx; i < mylist.size(); i++) {
+
+ FieldSchema fs = mylist.get(i) ;
+
+ // for non-schema types
+ if (!DataType.isSchemaType(fs.type)) {
+ outputList.add(new FieldSchema(fs.alias, fs.type)) ;
+ }
+ // for TUPLE & BAG
+ else {
+ try {
+ FieldSchema tmp = new FieldSchema(fs.alias, fs.schema, fs.type) ;
+ outputList.add(tmp) ;
+ } catch (FrontendException fee) {
+ throw new SchemaMergeException(fee.getMessage());
+ }
+ }
+ }
+
+ return new Schema(outputList) ;
+ }
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu Aug 28 16:44:13 2008
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -56,6 +57,8 @@
import org.apache.pig.impl.logicalLayer.LOPrinter;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
+import org.apache.pig.impl.logicalLayer.parser.ParseException ;
public class TestLogicalPlanBuilder extends junit.framework.TestCase {
@@ -863,7 +866,7 @@
@Test
public void testQuery70() {
buildPlan(" a = load 'input1';");
- buildPlan(" b = foreach a generate [10L#'hello', 4.0e-2#10L, 0.5f#(1), 'world'#42, 42#{('guide')}] as mymap;");
+ buildPlan(" b = foreach a generate [10L#'hello', 4.0e-2#10L, 0.5f#(1), 'world'#42, 42#{('guide')}] as mymap:map[];");
buildPlan(" c = foreach b generate mymap#10L;");
}
@@ -1157,6 +1160,112 @@
}
}
+ @Test
+ public void testQuery90() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOForEach foreach;
+
+ buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);");
+ buildPlan("b = group a by (name, age);");
+
+ //the first element in group, i.e., name is renamed as myname
+ lp = buildPlan("c = foreach b generate flatten(group) as (myname), COUNT(a) as mycount;");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+ assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: chararray, age: int, mycount: long")));
+
+ //the first and second elements in group, i.e., name and age are renamed as myname and myage
+ lp = buildPlan("c = foreach b generate flatten(group) as (myname, myage), COUNT(a) as mycount;");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+ assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: chararray, myage: int, mycount: long")));
+
+ //the schema of group is unchanged
+ lp = buildPlan("c = foreach b generate flatten(group) as (), COUNT(a) as mycount;");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+ assertTrue(foreach.getSchema().equals(getSchemaFromString("name: chararray, age: int, mycount: long")));
+
+ //group is renamed as mygroup
+ lp = buildPlan("c = foreach b generate group as mygroup, COUNT(a) as mycount;");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+ assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long")));
+
+ //group is renamed as mygroup and the first element is renamed as myname
+ lp = buildPlan("c = foreach b generate group as mygroup:(myname), COUNT(a) as mycount;");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+ assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(myname: chararray, age: int), mycount: long")));
+
+ //group is renamed as mygroup and the elements are renamed as myname and myage
+ lp = buildPlan("c = foreach b generate group as mygroup:(myname, myage), COUNT(a) as mycount;");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+ assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(myname: chararray, myage: int), mycount: long")));
+
+ //group is renamed to mygroup as the tuple schema is empty
+ lp = buildPlan("c = foreach b generate group as mygroup:(), COUNT(a) as mycount;");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+ assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long")));
+
+ /*
+ //forcing an wrror by having more elements in the fhe schema
+ lp = buildPlan("c = foreach B generate group as mygroup:(myname, myage, mygpa), COUNT(A) as mycount;");
+ lp = buildPlan("c = foreach B generate group as mygroup:(myname: int, myage), COUNT(A) as mycount;");
+ lp = buildPlan("c = foreach B generate group as mygroup:(myname, myage: chararray), COUNT(A) as mycount;");
+ lp = buildPlan("c = foreach B generate group as mygroup:{t: (myname, myage)}, COUNT(A) as mycount;");
+ lp = buildPlan("c = foreach B generate flatten(group) as (myname, myage, mygpa), COUNT(A) as mycount;");
+
+ foreach = (LOForEach) lp.getLeaves().get(0);
+
+ assertTrue(foreach.getSchema().equals(getSchemaFromString()));
+ */
+
+ }
+
+ @Test
+ public void testQueryFail90() throws FrontendException, ParseException {
+ buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);");
+ buildPlan("b = group a by (name, age);");
+
+ try {
+ buildPlan("c = foreach b generate group as mygroup:(myname, myage, mygpa), COUNT(a) as mycount;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Schema size mismatch"));
+ }
+
+ try {
+ buildPlan("c = foreach b generate group as mygroup:(myname: int, myage), COUNT(a) as mycount;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Type mismatch"));
+ }
+
+ try {
+ buildPlan("c = foreach b generate group as mygroup:(myname, myage: chararray), COUNT(a) as mycount;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Type mismatch"));
+ }
+
+ try {
+ buildPlan("c = foreach b generate group as mygroup:{t: (myname, myage)}, COUNT(a) as mycount;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Type mismatch"));
+ }
+
+ try {
+ buildPlan("c = foreach b generate flatten(group) as (myname, myage, mygpa), COUNT(a) as mycount;");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Schema size mismatch"));
+ }
+ }
+
+ private Schema getSchemaFromString(String schemaString) throws ParseException {
+ return getSchemaFromString(schemaString, DataType.BYTEARRAY);
+ }
+
+ private Schema getSchemaFromString(String schemaString, byte defaultType) throws ParseException {
+ ByteArrayInputStream stream = new ByteArrayInputStream(schemaString.getBytes()) ;
+ QueryParser queryParser = new QueryParser(stream) ;
+ Schema schema = queryParser.TupleSchema() ;
+ QueryParser.SchemaUtils.setSchemaDefaultType(schema, defaultType);
+ return schema;
+ }
+
private void printPlan(LogicalPlan lp) {
LOPrinter graphPrinter = new LOPrinter(System.err, lp);
System.err.println("Printing the logical plan");
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java Thu Aug 28 16:44:13 2008
@@ -2418,7 +2418,7 @@
// check outer schema
Schema endResultSchema = foreach1.getSchema() ;
- assertEquals(endResultSchema.getField(0).type, DataType.BYTEARRAY) ;
+ assertEquals(endResultSchema.getField(0).type, DataType.FLOAT) ;
assertEquals(endResultSchema.getField(1).type, DataType.LONG) ;
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java Thu Aug 28 16:44:13 2008
@@ -989,7 +989,7 @@
// check outer schema
Schema endResultSchema = foreach1.getSchema() ;
- assertEquals(endResultSchema.getField(0).type, DataType.BYTEARRAY) ;
+ assertEquals(endResultSchema.getField(0).type, DataType.FLOAT) ;
assertEquals(endResultSchema.getField(1).type, DataType.DOUBLE) ;
}
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java Thu Aug 28 16:44:13 2008
@@ -180,6 +180,7 @@
Schema schema = null ;
try {
schema = queryParser.TupleSchema() ;
+ QueryParser.SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY);
// set all the [NoAlias] to null
for(int i=0; i < dummyAliasCounter; i++) {