You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/30 19:15:55 UTC
svn commit: r990868 - in /hadoop/pig/trunk: ./
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/schema/
src/org/apache/pig/impl/logicalLayer/validators/ test/org/apache/pig/test/
Author: thejas
Date: Mon Aug 30 17:15:54 2010
New Revision: 990868
URL: http://svn.apache.org/viewvc?rev=990868&view=rev
Log:
PIG-1482: Pig gets confused when more than one loader is involved (xuefuz via thejas)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Aug 30 17:15:54 2010
@@ -171,6 +171,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1482: Pig gets confused when more than one loader is involved (xuefuz via thejas)
+
PIG-1579: Intermittent unit test failure for TestScriptUDF.testPythonScriptUDFNullInputOutput (daijy)
PIG-1557: couple of issue mapping aliases to jobs (rding)
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java Mon Aug 30 17:15:54 2010
@@ -69,20 +69,16 @@ public class LODistinct extends Relation
throw new FrontendException(msg, errCode, PigException.BUG, false, null);
}
if(op instanceof ExpressionOperator) {
- Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema());
+ Schema.FieldSchema fs = Schema.FieldSchema.copyAndLink(((ExpressionOperator)op).getFieldSchema(), op);
if(DataType.isSchemaType(fs.type)) {
mSchema = fs.schema;
} else {
fss.add(fs);
mSchema = new Schema(fss);
- for (int i=0;i<getInput().getSchema().size();i++)
- mSchema.getField(i).setParent(getInput().getSchema().getField(i).canonicalName, getInput());
}
} else {
if (op.getSchema()!=null) {
- mSchema = new Schema(op.getSchema());
- for (int i=0;i<op.getSchema().size();i++)
- mSchema.getField(i).setParent(op.getSchema().getField(i).canonicalName, op);
+ mSchema = Schema.copyAndLink(op.getSchema(), op);
}
else
mSchema = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java Mon Aug 30 17:15:54 2010
@@ -75,7 +75,7 @@ public class LOFilter extends Relational
ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
try {
if(input instanceof ExpressionOperator) {
- Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)input).getFieldSchema());
+ Schema.FieldSchema fs = Schema.FieldSchema.copyAndLink(((ExpressionOperator)input).getFieldSchema(), input);
if(DataType.isSchemaType(fs.type)) {
mSchema = fs.schema;
} else {
@@ -84,9 +84,7 @@ public class LOFilter extends Relational
}
} else {
if (getInput().getSchema()!=null) {
- mSchema = new Schema(input.getSchema());
- for (int i=0;i<getInput().getSchema().size();i++)
- mSchema.getField(i).setParent(getInput().getSchema().getField(i).canonicalName, getInput());
+ mSchema = Schema.copyAndLink( input.getSchema(), input );
}
else
mSchema = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Mon Aug 30 17:15:54 2010
@@ -242,8 +242,7 @@ public class LOForEach extends Relationa
if(null != s && s.size()!=0) {
for(int i = 0; i < s.size(); ++i) {
Schema.FieldSchema fs;
- fs = new Schema.FieldSchema(s.getField(i));
- fs.setParent(s.getField(i).canonicalName, op);
+ fs = Schema.FieldSchema.copyAndLink(s.getField(i), op);
log.debug("fs: " + fs);
if(null != userDefinedSchema) {
Schema.FieldSchema userDefinedFieldSchema;
@@ -301,7 +300,7 @@ public class LOForEach extends Relationa
updateAliasCount(aliases, newFs.alias);
fss.add(newFs);
mSchemaPlanMapping.add(plan);
- newFs.setParent(null, op);
+ newFs.setParent(planFs.canonicalName, op);
} else {
for(Schema.FieldSchema ufs: userDefinedSchema.getFields()) {
Schema.FieldSchema.setFieldSchemaDefaultType(ufs, DataType.BYTEARRAY);
@@ -320,14 +319,14 @@ public class LOForEach extends Relationa
}
fss.add(newFs);
mSchemaPlanMapping.add(plan);
- newFs.setParent(null, op);
+ newFs.setParent( planFs.canonicalName, op );
}
}
} else {
//just populate the schema with the field schema of the expression operator
//check if the user has defined a schema for the operator; compare the schema
//with that of the expression operator field schema and then add it to the list
- Schema.FieldSchema newFs = new Schema.FieldSchema(planFs);
+ Schema.FieldSchema newFs = Schema.FieldSchema.copyAndLink(planFs, op);
if(null != userDefinedSchema) {
try {
newFs = newFs.mergePrefixFieldSchema(userDefinedSchema.getField(0));
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOGenerate.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOGenerate.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOGenerate.java Mon Aug 30 17:15:54 2010
@@ -134,7 +134,7 @@ public class LOGenerate extends LogicalO
throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
}
if(op instanceof ExpressionOperator) {
- fss.add(new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema()));
+ fss.add( Schema.FieldSchema.copyAndLink(((ExpressionOperator)op).getFieldSchema(), op));
mSchema = new Schema(fss);
} else {
mSchema = op.getSchema();
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java Mon Aug 30 17:15:54 2010
@@ -64,9 +64,7 @@ public class LOLimit extends RelationalO
if (!mIsSchemaComputed) {
try {
if (getInput().getSchema()!=null) {
- mSchema = new Schema(getInput().getSchema());
- for (int i=0;i<getInput().getSchema().size();i++)
- mSchema.getField(i).setParent(getInput().getSchema().getField(i).canonicalName, getInput());
+ mSchema = Schema.copyAndLink(getInput().getSchema(), getInput());
}
else
mSchema = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Mon Aug 30 17:15:54 2010
@@ -173,6 +173,8 @@ public class LOLoad extends RelationalOp
mSchema = null;
throw fee;
}
+ // Set the parent of all fields in the schema as this (LOLoad instance) with parent canonicalName as null.
+ setParent( mSchema );
}
return mSchema;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java Mon Aug 30 17:15:54 2010
@@ -31,6 +31,7 @@ import org.apache.pig.impl.plan.PlanVisi
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -203,8 +204,7 @@ public class LOProject extends Expressio
if(!mSentinel) {
//we have an expression operator and hence a list of field shcemas
Schema.FieldSchema fs = ((ExpressionOperator)expressionOperator).getFieldSchema();
- mFieldSchema = new Schema.FieldSchema(fs);
- mFieldSchema.setParent(fs.canonicalName, expressionOperator);
+ mFieldSchema = Schema.FieldSchema.copyAndLink(fs, expressionOperator);
} else {
//we have a relational operator as input and hence a schema
log.debug("expression operator alias: " + expressionOperator.getAlias());
@@ -271,8 +271,7 @@ public class LOProject extends Expressio
// normal single level access
fs = s.getField(mProjection.get(0));
}
- mFieldSchema = new Schema.FieldSchema(fs);
- mFieldSchema.setParent(fs.canonicalName, expressionOperator);
+ mFieldSchema = FieldSchema.copyAndLink( fs, expressionOperator );
} else {
mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
mFieldSchema.setParent(expOpFs.canonicalName, expressionOperator);
@@ -287,8 +286,7 @@ public class LOProject extends Expressio
log.debug("s: " + s);
if(null != s) {
Schema.FieldSchema fs = s.getField(mProjection.get(0));
- mFieldSchema = new Schema.FieldSchema(fs);
- mFieldSchema.setParent(fs.canonicalName, expressionOperator);
+ mFieldSchema = FieldSchema.copyAndLink( fs, expressionOperator );
log.debug("mFieldSchema alias: " + mFieldSchema.alias);
log.debug("mFieldSchema schema: " + mFieldSchema.schema);
} else {
@@ -312,9 +310,8 @@ public class LOProject extends Expressio
if(null != s) {
if(colNum < s.size()) {
Schema.FieldSchema parentFs = s.getField(colNum);
- fs = new Schema.FieldSchema(parentFs);
+ fs = Schema.FieldSchema.copyAndLink(parentFs, expressionOperator );
fss.add(fs);
- fs.setParent(parentFs.canonicalName, expressionOperator);
} else {
fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
fss.add(fs);
@@ -334,9 +331,8 @@ public class LOProject extends Expressio
Schema s = expressionOperator.getSchema();
if(null != s) {
Schema.FieldSchema parentFs = s.getField(colNum);
- fs = new Schema.FieldSchema(parentFs);
+ fs = Schema.FieldSchema.copyAndLink(parentFs, expressionOperator);
fss.add(fs);
- fs.setParent(parentFs.canonicalName, expressionOperator);
} else {
fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
fss.add(fs);
@@ -351,7 +347,8 @@ public class LOProject extends Expressio
// throw new FrontendException(pe.getMessage());
//}
mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), new Schema(fss));
- mFieldSchema.setParent(null, expressionOperator);
+ Schema.FieldSchema expOpFs = ((ExpressionOperator)expressionOperator).getFieldSchema();
+ mFieldSchema.setParent( expOpFs.canonicalName, expressionOperator );
mIsFieldSchemaComputed = true;
}
@@ -381,7 +378,7 @@ public class LOProject extends Expressio
if(!DataType.isSchemaType(mType)) {
Schema pjSchema = new Schema(mFieldSchema);
mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.BAG);
- mFieldSchema.setParent(null, expressionOperator);
+ mFieldSchema.setParent( ((LOProject)expressionOperator).mFieldSchema.canonicalName, expressionOperator );
} else {
if(null != mFieldSchema) {
mFieldSchema.type = DataType.BAG;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java Mon Aug 30 17:15:54 2010
@@ -147,7 +147,7 @@ public class LOSort extends RelationalOp
throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
}
if(op instanceof ExpressionOperator) {
- Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema());
+ Schema.FieldSchema fs = Schema.FieldSchema.copyAndLink(((ExpressionOperator)op).getFieldSchema(), op);
if(DataType.isSchemaType(fs.type)) {
mSchema = fs.schema;
} else {
@@ -156,9 +156,7 @@ public class LOSort extends RelationalOp
}
} else {
if (getInput().getSchema()!=null) {
- mSchema = new Schema(op.getSchema());
- for (int i=0;i<getInput().getSchema().size();i++)
- mSchema.getField(i).setParent(getInput().getSchema().getField(i).canonicalName, getInput());
+ mSchema = Schema.copyAndLink( op.getSchema(), op );
}
else
mSchema = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java Mon Aug 30 17:15:54 2010
@@ -88,9 +88,7 @@ public class LOSplit extends RelationalO
}
LogicalOperator input = s.iterator().next();
if (input.getSchema()!=null) {
- mSchema = new Schema(input.getSchema());
- for (int i=0;i<input.getSchema().size();i++)
- mSchema.getField(i).setParent(input.getSchema().getField(i).canonicalName, input);
+ mSchema = Schema.copyAndLink(input.getSchema(), input);
}
else
mSchema = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java Mon Aug 30 17:15:54 2010
@@ -90,9 +90,7 @@ public class LOSplitOutput extends Relat
throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
}
if (input.getSchema()!=null) {
- mSchema = new Schema(input.getSchema());
- for (int i=0;i<input.getSchema().size();i++)
- mSchema.getField(i).setParent(input.getSchema().getField(i).canonicalName, input);
+ mSchema = Schema.copyAndLink(input.getSchema(), input);
}
else
mSchema = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java Mon Aug 30 17:15:54 2010
@@ -56,6 +56,9 @@ public class LOStream extends Relational
// Stream Operator this operator represents
private StreamingCommand command;
transient private ExecutableManager executableManager;
+
+ private boolean isParentSet = false;
+
/**
* Create a new <code>LOStream</code> with the given command.
*
@@ -87,24 +90,15 @@ public class LOStream extends Relational
*/
@Override
public Schema getSchema() throws FrontendException {
- return mSchema;
- /*
- if (!mIsSchemaComputed) {
- /*
- LogicalOperator input = mPlan.getPredecessors(this).get(0);
- ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
- try {
- mSchema = input.getSchema();
- mIsSchemaComputed = true;
- } catch (FrontendException ioe) {
- mSchema = null;
- mIsSchemaComputed = false;
- throw ioe;
- }
+ if( mSchema == null )
+ return null;
+
+ if( !isParentSet ) {
+ setParent( mSchema );
+ isParentSet = true;
}
+
return mSchema;
- */
-
}
/**
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Mon Aug 30 17:15:54 2010
@@ -162,6 +162,22 @@ abstract public class LogicalOperator ex
mSchema.reconcile(schema);
}
}
+
+ /**
+ * Set the parent of the schema field in the schema hierarchy. Currently only used by
+ * LOStream and LOLoad.
+ *
+ * @param schema the schema instance to set parent for
+ */
+ protected final void setParent(Schema schema) {
+ if( schema == null )
+ return;
+
+ for( Schema.FieldSchema fs : schema.getFields() ) {
+ fs.setParent( null, this );
+ setParent( fs.schema );
+ }
+ }
/**
* Directly force the schema without reconcilation
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Mon Aug 30 17:15:54 2010
@@ -177,11 +177,13 @@ public class Schema implements Serializa
alias = a;
schema = s;
log.debug("t: " + t + " Bag: " + DataType.BAG + " tuple: " + DataType.TUPLE);
+
if ((null != s) && !(DataType.isSchemaType(t))) {
int errCode = 1020;
throw new FrontendException("Only a BAG or TUPLE can have schemas. Got "
+ DataType.findTypeName(t), errCode, PigException.INPUT);
}
+
type = t;
canonicalName = canonicalNamer.getNewName();
canonicalMap = new HashMap<String, LogicalOperator>();
@@ -214,6 +216,35 @@ public class Schema implements Serializa
reverseCanonicalMap = new MultiMap<LogicalOperator, String>();
}
+ /**
+ * Make a copy of the FieldSchema instance and link the new one to the old one with canonical map.
+ *
+ * @param fs FieldSchema instance to be copied.
+ * @param op The operator to which the old FieldSchema instance belongs.
+ * @return a new copy
+ * @throws FrontendException
+ */
+ public static FieldSchema copyAndLink(FieldSchema fs, LogicalOperator op) {
+ String alias = null;
+ Schema schema = null;
+ byte type = DataType.UNKNOWN;
+ if( null != fs ) {
+ alias = fs.alias;
+ if( null != fs.schema ) {
+ schema = Schema.copyAndLink( fs.schema, op );
+ } else {
+ schema = null;
+ }
+ type = fs.type;
+ }
+
+ FieldSchema fieldSchema = new FieldSchema( alias, schema );
+ fieldSchema.type = type;
+ fieldSchema.setParent( fs == null ? null : fs.canonicalName, op );
+
+ return fieldSchema;
+ }
+
public void setParent(String parentCanonicalName, LogicalOperator parent) {
if(null != parentCanonicalName) {
canonicalMap.put(parentCanonicalName, parent);
@@ -569,6 +600,22 @@ public class Schema implements Serializa
return (fs.type == DataType.NULL || fs.type == DataType.UNKNOWN);
}
+ /**
+ * Find a field schema instance in this FieldSchema hierarchy (including "this")
+ * that matches the given canonical name.
+ *
+ * @param canonicalName canonical name
+ * @return the FieldSchema instance found
+ */
+ public FieldSchema findFieldSchema(String canonicalName) {
+ if( this.canonicalName.equals(canonicalName) ) {
+ return this;
+ }
+ if( this.schema != null )
+ return schema.findFieldSchema( canonicalName );
+ return null;
+ }
+
}
private List<FieldSchema> mFields;
@@ -673,6 +720,42 @@ public class Schema implements Serializa
}
/**
+ * Make a copy of the given schema object and link the original with the copy using
+ * canonical name map.
+ *
+ * @param s The original schema
+ * @param op The operator to which the original belongs
+ * @return a new copy
+ */
+ public static Schema copyAndLink(Schema s, LogicalOperator op) {
+ Schema result = new Schema();
+ if(null != s) {
+ result.twoLevelAccessRequired = s.twoLevelAccessRequired;
+ try {
+ for( int i = 0; i < s.size(); ++i ) {
+ FieldSchema fs = FieldSchema.copyAndLink( s.getField(i), op );
+ result.mFields.add(fs);
+ if(null != fs) {
+ if (fs.alias != null) {
+ result.mAliases.put(fs.alias, fs);
+ result.mFieldSchemas.put(fs.canonicalName, fs.alias);
+ }
+ }
+ }
+ } catch (FrontendException pe) {
+ result.mFields = new ArrayList<FieldSchema>();
+ result.mAliases = new HashMap<String, FieldSchema>();
+ result.mFieldSchemas = new MultiMap<String, String>();
+ }
+ } else {
+ result.mFields = new ArrayList<FieldSchema>();
+ result.mAliases = new HashMap<String, FieldSchema>();
+ result.mFieldSchemas = new MultiMap<String, String>();
+ }
+ return result;
+ }
+
+ /**
* Given an alias name, find the associated FieldSchema.
* @param alias Alias to look up.
* @return FieldSchema, or null if no such alias is in this tuple.
@@ -1808,6 +1891,24 @@ public class Schema implements Serializa
}
return new Schema(fsList);
}
+
+ /**
+ * Look for a FieldSchema instance in the schema hierarchy which has the given canonical name.
+ * @param canonicalName canonical name
+ * @return the FieldSchema instance found
+ */
+ public FieldSchema findFieldSchema(String canonicalName) {
+ for( FieldSchema fs : mFields ) {
+ if( fs.canonicalName.equals( canonicalName ) )
+ return fs;
+ if( fs.schema != null ) {
+ FieldSchema result = fs.schema.findFieldSchema( canonicalName );
+ if( result != null )
+ return result;
+ }
+ }
+ return null;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Mon Aug 30 17:15:54 2010
@@ -1015,6 +1015,18 @@ public class TypeCheckingVisitor extends
this.visit(cast);
}
+ /**
+ * The cast insertion for UDF is slight different in that we need to link the SchemaField
+ * in the cast with its parent. This is because we don't call its getSchemafield() when
+ * looking for loadfuncSpec. See getLoadFuncSpec(LogicalOperator op, String parentCanonicalName)
+ * for more information.
+ */
+ private void insertCastForUDF(LOUserFunc udf,
+ FieldSchema fromFS, FieldSchema toFs, ExpressionOperator predecessor)
+ throws VisitorException {
+ toFs.setParent( fromFS.canonicalName, predecessor );
+ insertCast( udf, fromFS.type, toFs, predecessor );
+ }
/**
@@ -1593,7 +1605,7 @@ public class TypeCheckingVisitor extends
if(fFSch.type==tFSch.type) {
continue;
}
- insertCast(udf, tFSch.type, tFSch, args.get(i));
+ insertCastForUDF(udf, fFSch, tFSch, args.get(i));
}
}
@@ -1764,8 +1776,11 @@ public class TypeCheckingVisitor extends
if(inputType == DataType.BYTEARRAY) {
try {
- FuncSpec loadFuncSpec = getLoadFuncSpec(cast.getExpression());
- cast.setLoadFuncSpec(loadFuncSpec);
+ Map<String, LogicalOperator> canonicalMap = cast.getFieldSchema().getCanonicalMap();
+ for( Map.Entry<String, LogicalOperator> entry : canonicalMap.entrySet() ) {
+ FuncSpec loadFuncSpec = getLoadFuncSpec( entry.getValue(), entry.getKey() );
+ cast.setLoadFuncSpec( loadFuncSpec );
+ }
} catch (FrontendException fee) {
int errCode = 1053;
String msg = "Cannot resolve load function to use for casting from " +
@@ -3049,44 +3064,6 @@ public class TypeCheckingVisitor extends
return new OperatorKey(scope, newId) ;
}
- private FuncSpec getLoadFuncSpec(ExpressionOperator exOp) throws FrontendException {
- Schema.FieldSchema fs = exOp.getFieldSchema();
- if(null == fs) {
- return null;
- }
-
- Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
- MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
- MultiMap<String, FuncSpec> loadFuncSpecMap = new MultiMap<String, FuncSpec>();
-
- if(canonicalMap.keySet().size() > 0) {
- for(String parentCanonicalName: canonicalMap.keySet()) {
- FuncSpec lfSpec = getLoadFuncSpec(exOp, parentCanonicalName);
- if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
- }
- } else {
- for(LogicalOperator op: reverseCanonicalMap.keySet()) {
- for(String parentCanonicalName: reverseCanonicalMap.get(op)) {
- FuncSpec lfSpec = getLoadFuncSpec(op, parentCanonicalName);
- if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
- }
- }
- }
- if(loadFuncSpecMap.keySet().size() == 0) {
- return null;
- }
- if(loadFuncSpecMap.keySet().size() == 1) {
- String lfString = loadFuncSpecMap.keySet().iterator().next();
- return loadFuncSpecMap.get(lfString).iterator().next();
- }
-
- {
- int errCode = 1065;
- String msg = "Found more than one load function to use: " + loadFuncSpecMap.keySet();
- throw new FrontendException(msg, errCode, PigException.INPUT);
- }
- }
-
private FuncSpec getLoadFuncSpec(LogicalOperator op, String parentCanonicalName) throws FrontendException {
MultiMap<String, FuncSpec> loadFuncSpecMap = new MultiMap<String, FuncSpec>();
if(op instanceof ExpressionOperator) {
@@ -3095,22 +3072,10 @@ public class TypeCheckingVisitor extends
}
Schema.FieldSchema fs = ((ExpressionOperator)op).getFieldSchema();
- Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
- MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
-
- if(canonicalMap.keySet().size() > 0) {
- for(String canonicalName: canonicalMap.keySet()) {
- FuncSpec lfSpec = getLoadFuncSpec(fs, canonicalName);
- if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
- }
- } else {
- for(LogicalOperator lop: reverseCanonicalMap.keySet()) {
- for(String canonicalName: reverseCanonicalMap.get(lop)) {
- FuncSpec lfSpec = getLoadFuncSpec(fs, canonicalName);
- if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
- }
- }
+ if( parentCanonicalName != null ) {
+ fs = fs.findFieldSchema( parentCanonicalName );
}
+ getLoadFuncSpec( fs, loadFuncSpecMap );
} else {
if(op instanceof LOLoad) {
return ((LOLoad)op).getInputFile().getFuncSpec();
@@ -3123,22 +3088,8 @@ public class TypeCheckingVisitor extends
Schema s = op.getSchema();
if(null != s) {
- for(Schema.FieldSchema fs: s.getFields()) {
- if(null != parentCanonicalName && (parentCanonicalName.equals(fs.canonicalName))) {
- if(fs.getCanonicalMap().keySet().size() > 0) {
- for(String canonicalName: fs.getCanonicalMap().keySet()) {
- FuncSpec lfSpec = getLoadFuncSpec(fs, canonicalName);
- if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
- }
- } else {
- FuncSpec lfSpec = getLoadFuncSpec(fs, null);
- if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
- }
- } else if (null == parentCanonicalName) {
- FuncSpec lfSpec = getLoadFuncSpec(fs, null);
- if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
- }
- }
+ FieldSchema fieldSchema = s.findFieldSchema( parentCanonicalName );
+ getLoadFuncSpec( fieldSchema, loadFuncSpecMap );
} else {
LogicalPlan lp = op.getPlan();
for(LogicalOperator pred: lp.getPredecessors(op)) {
@@ -3161,44 +3112,29 @@ public class TypeCheckingVisitor extends
throw new FrontendException(msg, errCode, PigException.INPUT);
}
}
-
- private FuncSpec getLoadFuncSpec(Schema.FieldSchema fs, String parentCanonicalName) throws FrontendException {
- if(null == fs) {
- return null;
- }
- Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
- MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
- MultiMap<String, FuncSpec> loadFuncSpecMap = new MultiMap<String, FuncSpec>();
-
- if(canonicalMap.keySet().size() > 0) {
- for(Map.Entry<String, LogicalOperator> e: canonicalMap.entrySet()) {
- if((null == parentCanonicalName) || (parentCanonicalName.equals(e.getKey()))) {
- FuncSpec lfSpec = getLoadFuncSpec(e.getValue(), parentCanonicalName);
- if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
- }
- }
+
+ /**
+ * Results are stored in the input param, loadFuncSpecMap
+ */
+ private void getLoadFuncSpec(FieldSchema fieldSchema, MultiMap<String, FuncSpec> loadFuncSpecMap)
+ throws FrontendException {
+ if( fieldSchema == null )
+ return;
+
+ Map<String, LogicalOperator> canonicalMap = fieldSchema.getCanonicalMap();
+ if( canonicalMap.size() > 0 ) {
+ for(Map.Entry<String, LogicalOperator> entry : canonicalMap.entrySet() ) {
+ FuncSpec lfSpec = getLoadFuncSpec( entry.getValue(), entry.getKey() );
+ if( null != lfSpec )
+ loadFuncSpecMap.put( lfSpec.getClassName(), lfSpec );
+ }
} else {
- for(LogicalOperator op: reverseCanonicalMap.keySet()) {
- for(String canonicalName: reverseCanonicalMap.get(op)) {
- if((null == parentCanonicalName) || (parentCanonicalName.equals(canonicalName))) {
- FuncSpec lfSpec = getLoadFuncSpec(op, parentCanonicalName);
- if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
- }
- }
- }
- }
- if(loadFuncSpecMap.keySet().size() == 0) {
- return null;
- }
- if(loadFuncSpecMap.keySet().size() == 1) {
- String lfString = loadFuncSpecMap.keySet().iterator().next();
- return loadFuncSpecMap.get(lfString).iterator().next();
- }
-
- {
- int errCode = 1065;
- String msg = "Found more than one load function to use: " + loadFuncSpecMap.keySet();
- throw new FrontendException(msg, errCode, PigException.INPUT);
+ MultiMap<LogicalOperator, String> reverseCanonicalMap = fieldSchema.getReverseCanonicalMap();
+ for( LogicalOperator lop : reverseCanonicalMap.keySet() ) {
+ FuncSpec lfSpec = getLoadFuncSpec( lop, null );
+ if( null != lfSpec )
+ loadFuncSpecMap.put( lfSpec.getClassName(), lfSpec );
+ }
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java Mon Aug 30 17:15:54 2010
@@ -5777,5 +5777,163 @@ public class TestTypeCheckingValidator e
}
}
+
+ @Test
+ public void testLineageMultipleLoader1() throws FrontendException {
+ planTester.buildPlan( "A = LOAD 'data1' USING PigStorage() AS (u, v, w);" ) ;
+ planTester.buildPlan( "B = LOAD 'data2' USING TextLoader() AS (x, y);" ) ;
+ planTester.buildPlan("C = JOIN A BY u, B BY x USING 'replicated';") ;
+ planTester.buildPlan("D = GROUP C BY (u, x);");
+ LogicalPlan plan = planTester.buildPlan( "E = FOREACH D GENERATE (chararray)group.u, (int)group.x;" );
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ // Check group.u
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+ LOProject proj = (LOProject)foreachPlan.getSuccessors(exOp).get(0);
+ LOCast cast = (LOCast)foreachPlan.getSuccessors( proj ).get( 0 );
+ assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
+
+ // Check group.x
+ foreachPlan = foreach.getForEachPlans().get( 1 );
+ exOp = foreachPlan.getRoots().get(0);
+ proj = (LOProject)foreachPlan.getSuccessors(exOp).get(0);
+ cast = (LOCast)foreachPlan.getSuccessors( proj ).get( 0 );
+ assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("TextLoader"));
+ }
+
+ /**
+ * From JIRA 1482
+ * @throws FrontendException
+ */
+ @Test
+ public void testLineageMultipleLoader2() throws FrontendException {
+ planTester.buildPlan( "A = LOAD 'data1' USING PigStorage() AS (s, m, l);" ) ;
+ planTester.buildPlan( "B = FOREACH A GENERATE s#'k1' as v1, m#'k2' as v2, l#'k3' as v3;" ) ;
+ planTester.buildPlan( "C = FOREACH B GENERATE v1, (v2 == 'v2' ? 1L : 0L) as v2:long, (v3 == 'v3' ? 1 :0) as v3:int;" ) ;
+ planTester.buildPlan( "D = LOAD 'data2' USING TextLoader() AS (a);");
+ planTester.buildPlan( "E = JOIN C BY v1, D BY a USING 'replicated';" );
+ planTester.buildPlan( "F = GROUP E BY (v1, a);" );
+ LogicalPlan plan = planTester.buildPlan( "G = FOREACH F GENERATE (chararray)group.v1, group.a;" );
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ // Check group.u
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+ LOProject proj = (LOProject)foreachPlan.getSuccessors(exOp).get(0);
+ LOCast cast = (LOCast)foreachPlan.getSuccessors( proj ).get( 0 );
+ assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
+ }
+
+ /**
+ * A special invalid case.
+ */
+ @Test
+ public void testLineageMultipleLoader3() throws FrontendException {
+ planTester.buildPlan( "A = LOAD 'data1' USING PigStorage() AS (u, v, w);" ) ;
+ planTester.buildPlan( "B = LOAD 'data2' USING TextLoader() AS (x, y);" ) ;
+ planTester.buildPlan("C = COGROUP A BY u, B by x;");
+ LogicalPlan plan = planTester.buildPlan( "D = FOREACH C GENERATE (chararray)group;" );
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ try {
+ typeValidator.validate(plan, collector) ;
+ } catch(PlanValidationException ex) {
+ assertTrue( ex.getCause().toString().contains( "Cannot resolve load function to use for casting from bytearray to chararray." ) );
+ return;
+ }
+ assertTrue( "Validation failure is expected.", false );
+ }
+
+ /**
+ * In case of filter with tuple type
+ */
+ @Test
+ public void testLineageFilterWithTuple() throws FrontendException {
+ planTester.buildPlan( "A = LOAD 'data1' USING PigStorage() AS (u, v, w:tuple(a,b));" ) ;
+ planTester.buildPlan( "B = FOREACH A generate v, w;");
+ planTester.buildPlan( "C = FILTER B by v < 50;" ) ;
+ LogicalPlan plan = planTester.buildPlan("D = FOREACH C generate (int)w.a;");
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+ LOProject proj = (LOProject)foreachPlan.getSuccessors(exOp).get(0);
+ LOCast cast = (LOCast)foreachPlan.getSuccessors( proj ).get( 0 );
+ assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
+ }
+
+ @Test
+ public void testLineageExpressionCasting() throws FrontendException {
+ planTester.buildPlan( "A = LOAD 'data1' USING PigStorage() AS (u:int, v);" ) ;
+ planTester.buildPlan( "B = FILTER A by u < 50;" ) ;
+ LogicalPlan plan = planTester.buildPlan("C = FOREACH B generate u + v;");
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+ List<LogicalOperator> projs = foreachPlan.getRoots();
+ LogicalOperator proj = projs.get(1);
+ LogicalOperator op = foreachPlan.getSuccessors( proj ).get( 0 );
+ if( !( op instanceof LOCast ) ) {
+ proj = projs.get(1);
+ op = foreachPlan.getSuccessors( proj ).get( 0 );
+ }
+ LOCast cast = (LOCast)op;
+ assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
+ }
+
}