You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/02 22:36:50 UTC
svn commit: r701235 [1/3] - in /incubator/pig/branches/types: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/impl/logicalLayer/...
Author: olga
Date: Thu Oct 2 13:36:49 2008
New Revision: 701235
URL: http://svn.apache.org/viewvc?rev=701235&view=rev
Log:
PIG-335: lineage
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOIsNull.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Thu Oct 2 13:36:49 2008
@@ -267,3 +267,5 @@
PIG-54: MIN/MAX don't deal with invalid data (pradeepk via olgan)
PIG-470: TextLoader should produce bytearrays (sms via olgan)
+
+ PIG-335: lineage (sms vi olgan)
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Oct 2 13:36:49 2008
@@ -1150,7 +1150,12 @@
ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op
.getExpression());
physOp.setResultType(op.getType());
- ((POCast) physOp).setLoadFSpec(load.getClass().getName());
+ LoadFunc lf = op.getLoadFunc();
+ String lfString = null;
+ if(null != lf) {
+ lfString = lf.getClass().getName();
+ ((POCast) physOp).setLoadFSpec(lfString);
+ }
try {
currentPlan.connect(from, physOp);
} catch (PlanException e) {
@@ -1159,6 +1164,7 @@
}
}
+
@Override
public void visit(LOLimit limit) throws VisitorException {
String scope = limit.getOperatorKey().scope;
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Thu Oct 2 13:36:49 2008
@@ -44,7 +44,7 @@
* Need the full operator implementation.
*/
public class POCast extends ExpressionOperator {
- private String loadFSpec;
+ private String loadFSpec = null;
transient private LoadFunc load;
private Log log = LogFactory.getLog(getClass());
private boolean castNotNeeded = false;
@@ -63,7 +63,9 @@
private void instantiateFunc() {
if(load!=null) return;
- this.load = (LoadFunc) PigContext.instantiateFuncFromSpec(this.loadFSpec);
+ if(this.loadFSpec != null) {
+ this.load = (LoadFunc) PigContext.instantiateFuncFromSpec(this.loadFSpec);
+ }
}
public void setLoadFSpec(String fSpec) {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java Thu Oct 2 13:36:49 2008
@@ -84,8 +84,7 @@
* if there is already a schema and the existing schema cannot
* be reconciled with this new schema.
*/
- public final void setFieldSchema(Schema.FieldSchema fs) throws FrontendException {
- log.debug("Inside setFieldSchema");
+ public void setFieldSchema(Schema.FieldSchema fs) throws FrontendException {
mFieldSchema = fs;
setAlias(fs.alias);
setType(fs.type);
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java Thu Oct 2 13:36:49 2008
@@ -53,9 +53,11 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
mIsFieldSchemaComputed = true;
}
return mFieldSchema;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java Thu Oct 2 13:36:49 2008
@@ -53,10 +53,12 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java Thu Oct 2 13:36:49 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.logicalLayer;
+import org.apache.pig.LoadFunc;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
@@ -30,6 +31,7 @@
private static final long serialVersionUID = 2L;
private ExpressionOperator mExpr;
+ private LoadFunc mLoadFunc = null;
/**
*
@@ -66,14 +68,8 @@
@Override
public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- if(DataType.isAtomic(mType)) {
- mFieldSchema = new Schema.FieldSchema(null, mType);
- if (mExpr.getFieldSchema() != null) {
- mFieldSchema.canonicalName =
- mExpr.getFieldSchema().canonicalName;
- }
- mIsFieldSchemaComputed = true;
- }
+ mFieldSchema = new Schema.FieldSchema(null, mType);
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
@@ -88,4 +84,12 @@
return false;
}
+ public LoadFunc getLoadFunc() {
+ return mLoadFunc;
+ }
+
+ public void setLoadFunc(LoadFunc loadFunc) {
+ mLoadFunc = loadFunc;
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Thu Oct 2 13:36:49 2008
@@ -167,6 +167,7 @@
*/
int arity = mGroupByPlans.get(inputs.get(0)).size();
for (int i = 0; i < arity; ++i) {
+ Schema.FieldSchema groupByFs;
Collection<String> cAliases = positionAlias.get(i);
if(null != cAliases) {
Object[] aliases = cAliases.toArray();
@@ -181,11 +182,14 @@
if(!aliasLookup.get(alias)) {
Schema.FieldSchema fs = eOp.getFieldSchema();
if(null != fs) {
- groupByFss.add(new Schema.FieldSchema(alias, fs.schema, fs.type));
+ groupByFs = new Schema.FieldSchema(alias, fs.schema, fs.type);
+ groupByFss.add(groupByFs);
aliasLookup.put(alias, true);
} else {
- groupByFss.add(new Schema.FieldSchema(alias, null, DataType.BYTEARRAY));
+ groupByFs = new Schema.FieldSchema(alias, null, DataType.BYTEARRAY);
+ groupByFss.add(groupByFs);
}
+ setFieldSchemaParent(groupByFs, positionOperators, i);
break;
} else {
if(j < aliases.length) {
@@ -195,10 +199,19 @@
//just add the schema of the expression operator with the null alias
Schema.FieldSchema fs = eOp.getFieldSchema();
if(null != fs) {
- groupByFss.add(new Schema.FieldSchema(null, fs.schema, fs.type));
+ groupByFs = new Schema.FieldSchema(null, fs.schema, fs.type);
+ groupByFss.add(groupByFs);
+ for(ExpressionOperator op: cEops) {
+ Schema.FieldSchema opFs = op.getFieldSchema();
+ if(null != opFs) {
+ groupByFs.setParent(opFs.canonicalName, eOp);
+ }
+ }
} else {
- groupByFss.add(new Schema.FieldSchema(null, null, DataType.BYTEARRAY));
+ groupByFs = new Schema.FieldSchema(null, null, DataType.BYTEARRAY);
+ groupByFss.add(groupByFs);
}
+ setFieldSchemaParent(groupByFs, positionOperators, i);
break;
}
}
@@ -218,10 +231,10 @@
} else {
//We do not have any alias for this position in the group by columns
//We have positions $1, $2, etc.
- groupByFss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ groupByFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ groupByFss.add(groupByFs);
+ setFieldSchemaParent(groupByFs, positionOperators, i);
}
- //The schema for these columns is the merged schema of the expression operatore
- //This part is handled in the type checker
}
groupBySchema = new Schema(groupByFss);
@@ -229,7 +242,9 @@
if(1 == arity) {
byte groupByType = getAtomicGroupByType();
Schema groupSchema = groupByFss.get(0).schema;
- fss.add(new Schema.FieldSchema("group", groupSchema, groupByType));
+ Schema.FieldSchema groupByFs = new Schema.FieldSchema("group", groupSchema, groupByType);
+ setFieldSchemaParent(groupByFs, positionOperators, 0);
+ fss.add(groupByFs);
} else {
Schema mergedGroupSchema = getTupleGroupBySchema();
if(mergedGroupSchema.size() != groupBySchema.size()) {
@@ -255,9 +270,10 @@
}
for (LogicalOperator op : inputs) {
try {
- Schema cSchema = op.getSchema();
- fss.add(new Schema.FieldSchema(op.getAlias(), op
- .getSchema(), DataType.BAG));
+ Schema.FieldSchema bagFs = new Schema.FieldSchema(op.getAlias(),
+ op.getSchema(), DataType.BAG);
+ fss.add(bagFs);
+ setFieldSchemaParent(bagFs, op);
} catch (FrontendException ioe) {
mIsSchemaComputed = false;
mSchema = null;
@@ -374,12 +390,35 @@
for(int j=0;j < innerPlans.size(); j++) {
byte innerType = innerPlans.get(j).getSingleLeafPlanOutputType() ;
- fsList.get(j).type = DataType.mergeType(fsList.get(j).type,
- innerType) ;
+ ExpressionOperator eOp = (ExpressionOperator)innerPlans.get(j).getSingleLeafPlanOutputOp();
+ Schema.FieldSchema groupFs = fsList.get(j);
+ groupFs.type = DataType.mergeType(groupFs.type, innerType) ;
+ groupFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
}
}
return new Schema(fsList) ;
}
+ private void setFieldSchemaParent(Schema.FieldSchema fs, MultiMap<Integer, ExpressionOperator> positionOperators, int position) throws FrontendException {
+ for(ExpressionOperator op: positionOperators.get(position)) {
+ Schema.FieldSchema opFs = op.getFieldSchema();
+ if(null != opFs) {
+ fs.setParent(opFs.canonicalName, op);
+ }
+ }
+ }
+
+ private void setFieldSchemaParent(Schema.FieldSchema fs, LogicalOperator op) throws FrontendException {
+ Schema s = op.getSchema();
+ if(null != s) {
+ for(Schema.FieldSchema inputFs: s.getFields()) {
+ if(null != inputFs) {
+ fs.setParent(inputFs.canonicalName, op);
+ }
+ }
+ } else {
+ fs.setParent(null, op);
+ }
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java Thu Oct 2 13:36:49 2008
@@ -67,6 +67,7 @@
for (LogicalOperator op : inputs) {
String opAlias = op.getAlias();
Schema s = op.getSchema();
+ Schema.FieldSchema newFs;
//need to extract the children and create the aliases
//assumption here is that flatten is only for one column
@@ -78,7 +79,7 @@
log.debug("fs.alias: " + fs.alias);
if(null != fs.alias) {
String disambiguatorAlias = opAlias + "::" + fs.alias;
- Schema.FieldSchema newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
+ newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
fss.add(newFs);
Integer count;
count = aliases.get(fs.alias);
@@ -99,9 +100,10 @@
//we just need to record if its due to
//flattening
} else {
- Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
fss.add(newFs);
}
+ newFs.setParent(fs.canonicalName, op);
}
} else {
mSchema = null;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java Thu Oct 2 13:36:49 2008
@@ -64,7 +64,7 @@
throw new FrontendException("Could not find operator in plan");
}
if(op instanceof ExpressionOperator) {
- Schema.FieldSchema fs = ((ExpressionOperator)op).getFieldSchema();
+ Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema());
if(DataType.isSchemaType(fs.type)) {
mSchema = fs.schema;
} else {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java Thu Oct 2 13:36:49 2008
@@ -53,9 +53,11 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
mIsFieldSchemaComputed = true;
}
return mFieldSchema;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java Thu Oct 2 13:36:49 2008
@@ -53,10 +53,12 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java Thu Oct 2 13:36:49 2008
@@ -66,7 +66,7 @@
ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
try {
if(input instanceof ExpressionOperator) {
- Schema.FieldSchema fs = ((ExpressionOperator)input).getFieldSchema();
+ Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)input).getFieldSchema());
if(DataType.isSchemaType(fs.type)) {
mSchema = fs.schema;
} else {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Thu Oct 2 13:36:49 2008
@@ -172,6 +172,7 @@
Schema.FieldSchema fs;
try {
fs = new Schema.FieldSchema(s.getField(i));
+ fs.setParent(s.getField(i).canonicalName, op);
} catch (ParseException pe) {
throw new FrontendException(pe.getMessage());
}
@@ -195,13 +196,23 @@
if((null != outerCanonicalAlias) && (null != innerCanonicalAlias)) {
String disambiguatorAlias = outerCanonicalAlias + "::" + innerCanonicalAlias;
newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
- fss.add(newFs);
+ try {
+ newFs.setParent(s.getField(i).canonicalName, op);
+ } catch (ParseException pe) {
+ throw new FrontendException(pe.getMessage());
+ }
+ fss.add(newFs);
updateAliasCount(aliases, disambiguatorAlias);
//it's fine if there are duplicates
//we just need to record if its due to
//flattening
} else {
- newFs = new Schema.FieldSchema(fs.alias, fs.schema, fs.type);
+ newFs = new Schema.FieldSchema(fs);
+ try {
+ newFs.setParent(s.getField(i).canonicalName, op);
+ } catch (ParseException pe) {
+ throw new FrontendException(pe.getMessage());
+ }
fss.add(newFs);
}
updateAliasCount(aliases, innerCanonicalAlias);
@@ -225,10 +236,13 @@
}
updateAliasCount(aliases, newFs.alias);
fss.add(newFs);
+ newFs.setParent(null, op);
} else {
for(Schema.FieldSchema ufs: userDefinedSchema.getFields()) {
QueryParser.SchemaUtils.setFieldSchemaDefaultType(ufs, DataType.BYTEARRAY);
- fss.add(new Schema.FieldSchema(ufs.alias, ufs.schema, ufs.type));
+ newFs = new Schema.FieldSchema(ufs);
+ fss.add(newFs);
+ newFs.setParent(null, op);
updateAliasCount(aliases, ufs.alias);
}
}
@@ -239,6 +253,7 @@
newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
}
fss.add(newFs);
+ newFs.setParent(null, op);
}
}
} else {
@@ -262,14 +277,17 @@
String outerCanonicalAlias = null;
if(null != userDefinedSchema) {
try {
- Schema.FieldSchema userDefinedFieldSchema = userDefinedSchema.getField(0);
+ Schema.FieldSchema userDefinedFieldSchema = new Schema.FieldSchema(userDefinedSchema.getField(0));
fss.add(userDefinedFieldSchema);
+ userDefinedFieldSchema.setParent(null, op);
updateAliasCount(aliases, userDefinedFieldSchema.alias);
} catch (ParseException pe) {
throw new FrontendException(pe.getMessage());
}
} else {
- fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ fss.add(newFs);
+ newFs.setParent(null, op);
}
}
} catch (FrontendException fee) {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java Thu Oct 2 13:36:49 2008
@@ -131,7 +131,7 @@
throw new FrontendException("Could not find operator in plan");
}
if(op instanceof ExpressionOperator) {
- fss.add(((ExpressionOperator)op).getFieldSchema());
+ fss.add(new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema()));
mSchema = new Schema(fss);
} else {
mSchema = op.getSchema();
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java Thu Oct 2 13:36:49 2008
@@ -53,10 +53,12 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java Thu Oct 2 13:36:49 2008
@@ -53,10 +53,12 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOIsNull.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOIsNull.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOIsNull.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOIsNull.java Thu Oct 2 13:36:49 2008
@@ -51,10 +51,11 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getOperand().getFieldSchema().canonicalName, getOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java Thu Oct 2 13:36:49 2008
@@ -53,10 +53,12 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java Thu Oct 2 13:36:49 2008
@@ -53,10 +53,12 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java Thu Oct 2 13:36:49 2008
@@ -108,6 +108,7 @@
} else {
mFieldSchema = new Schema.FieldSchema(null, mValueType);
}
+ mFieldSchema.setParent(mMap.getFieldSchema().canonicalName, mMap);
mIsFieldSchemaComputed = true;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java Thu Oct 2 13:36:49 2008
@@ -53,9 +53,11 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
mIsFieldSchemaComputed = true;
}
return mFieldSchema;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java Thu Oct 2 13:36:49 2008
@@ -53,9 +53,11 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
mIsFieldSchemaComputed = true;
}
return mFieldSchema;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java Thu Oct 2 13:36:49 2008
@@ -47,9 +47,10 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
- if(!mIsSchemaComputed) {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
+ if(!mIsFieldSchemaComputed) {
mFieldSchema = new Schema.FieldSchema(null, getOperand().getType());
+ mFieldSchema.setParent(getOperand().getFieldSchema().canonicalName, getOperand());
mIsFieldSchemaComputed = true;
}
return mFieldSchema;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java Thu Oct 2 13:36:49 2008
@@ -51,10 +51,11 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getOperand().getFieldSchema().canonicalName, getOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java Thu Oct 2 13:36:49 2008
@@ -53,10 +53,12 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java Thu Oct 2 13:36:49 2008
@@ -53,10 +53,12 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java Thu Oct 2 13:36:49 2008
@@ -194,7 +194,9 @@
+ expressionOperator.getClass().getName() + " " + expressionOperator);
if(!mSentinel) {
//we have an expression operator and hence a list of field shcemas
- mFieldSchema = ((ExpressionOperator)expressionOperator).getFieldSchema();
+ Schema.FieldSchema fs = ((ExpressionOperator)expressionOperator).getFieldSchema();
+ mFieldSchema = new Schema.FieldSchema(fs);
+ mFieldSchema.setParent(fs.canonicalName, expressionOperator);
} else {
//we have a relational operator as input and hence a schema
log.debug("expression operator alias: " + expressionOperator.getAlias());
@@ -204,6 +206,7 @@
//the type of the operator will be unknown. when type checking is in place
//add the type of the operator as a parameter to the fieldschema creation
mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema(), DataType.TUPLE);
+ mFieldSchema.setParent(null, expressionOperator);
//mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema());
}
} else {
@@ -233,23 +236,30 @@
if(null != expOpFs) {
Schema s = expOpFs.schema;
if(null != s) {
- mFieldSchema = new Schema.FieldSchema(s.getField(mProjection.get(0)));
+ Schema.FieldSchema fs = s.getField(mProjection.get(0));
+ mFieldSchema = new Schema.FieldSchema(fs);
+ mFieldSchema.setParent(fs.canonicalName, expressionOperator);
} else {
mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ mFieldSchema.setParent(expOpFs.canonicalName, expressionOperator);
}
} else {
mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ mFieldSchema.setParent(null, expressionOperator);
}
} else {
log.debug("Input is a logical operator");
- Schema s = expressionOperator.getSchema();
+ Schema s = expressionOperator.getSchema();
log.debug("s: " + s);
if(null != s) {
- mFieldSchema = new Schema.FieldSchema(s.getField(mProjection.get(0)));
+ Schema.FieldSchema fs = s.getField(mProjection.get(0));
+ mFieldSchema = new Schema.FieldSchema(fs);
+ mFieldSchema.setParent(fs.canonicalName, expressionOperator);
log.debug("mFieldSchema alias: " + mFieldSchema.alias);
log.debug("mFieldSchema schema: " + mFieldSchema.schema);
} else {
mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ mFieldSchema.setParent(null, expressionOperator);
}
mType = mFieldSchema.type ;
}
@@ -259,6 +269,7 @@
for (int colNum : mProjection) {
log.debug("Col: " + colNum);
+ Schema.FieldSchema fs;
if(!mSentinel) {
Schema.FieldSchema expOpFs = ((ExpressionOperator)expressionOperator).getFieldSchema();
if(null != expOpFs) {
@@ -266,22 +277,36 @@
log.debug("Schema s: " + s);
if(null != s) {
if(colNum < s.size()) {
- fss.add(new Schema.FieldSchema(s.getField(colNum)));
+ Schema.FieldSchema parentFs = s.getField(colNum);
+ fs = new Schema.FieldSchema(parentFs);
+ fss.add(fs);
+ fs.setParent(parentFs.canonicalName, expressionOperator);
} else {
- fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ fss.add(fs);
+ fs.setParent(expOpFs.canonicalName, expressionOperator);
}
} else {
- fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ fss.add(fs);
+ fs.setParent(expOpFs.canonicalName, expressionOperator);
}
} else {
+ fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ fs.setParent(null, expressionOperator);
}
} else {
Schema s = expressionOperator.getSchema();
if(null != s) {
- fss.add(new Schema.FieldSchema(s.getField(colNum)));
+ Schema.FieldSchema parentFs = s.getField(colNum);
+ fs = new Schema.FieldSchema(parentFs);
+ fss.add(fs);
+ fs.setParent(parentFs.canonicalName, expressionOperator);
} else {
- fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ fss.add(fs);
+ fs.setParent(null, expressionOperator);
}
}
}
@@ -296,6 +321,7 @@
throw new FrontendException(pe.getMessage());
}
mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), new Schema(fss));
+ mFieldSchema.setParent(null, expressionOperator);
mIsFieldSchemaComputed = true;
log.debug("mIsStar is false, returning computed field schema of expressionOperator");
}
@@ -311,6 +337,7 @@
if(!DataType.isSchemaType(mType)) {
Schema pjSchema = new Schema(mFieldSchema);
mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.TUPLE);
+ mFieldSchema.setParent(null, expressionOperator);
} else {
mFieldSchema.type = DataType.TUPLE;
}
@@ -322,6 +349,7 @@
if(!DataType.isSchemaType(mType)) {
Schema pjSchema = new Schema(mFieldSchema);
mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.BAG);
+ mFieldSchema.setParent(null, expressionOperator);
} else {
mFieldSchema.type = DataType.BAG;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java Thu Oct 2 13:36:49 2008
@@ -93,10 +93,12 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema = fs;
+ mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+ mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Thu Oct 2 13:36:49 2008
@@ -129,7 +129,7 @@
throw new FrontendException("Could not find operator in plan");
}
if(op instanceof ExpressionOperator) {
- Schema.FieldSchema fs = ((ExpressionOperator)op).getFieldSchema();
+ Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema());
if(DataType.isSchemaType(fs.type)) {
mSchema = fs.schema;
} else {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java Thu Oct 2 13:36:49 2008
@@ -53,9 +53,11 @@
}
@Override
- public Schema.FieldSchema getFieldSchema() {
+ public Schema.FieldSchema getFieldSchema() throws FrontendException {
if(!mIsFieldSchemaComputed) {
mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
+ mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+ mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
mIsFieldSchemaComputed = true;
}
return mFieldSchema;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java Thu Oct 2 13:36:49 2008
@@ -68,6 +68,22 @@
mSchema = op.getSchema();
}
}
+ if(null != mSchema) {
+ for(Schema.FieldSchema fs: mSchema.getFields()) {
+ iter = s.iterator();
+ while(iter.hasNext()) {
+ op = iter.next();
+ Schema opSchema = op.getSchema();
+ if(null != s) {
+ for(Schema.FieldSchema opFs: opSchema.getFields()) {
+ fs.setParent(opFs.canonicalName, op);
+ }
+ } else {
+ fs.setParent(null, op);
+ }
+ }
+ }
+ }
mIsSchemaComputed = true;
} catch (FrontendException fe) {
mSchema = null;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java Thu Oct 2 13:36:49 2008
@@ -80,35 +80,37 @@
@Override
public Schema.FieldSchema getFieldSchema() throws FrontendException {
- Schema inputSchema = new Schema();
- for(ExpressionOperator op: mArgs) {
- if (!DataType.isUsableType(op.getType())) {
- String msg = "Problem with input: " + op + " of User-defined function: " + this ;
- mFieldSchema = null;
- mIsFieldSchemaComputed = false;
- throw new FrontendException(msg) ;
+ if(!mIsFieldSchemaComputed) {
+ Schema inputSchema = new Schema();
+ for(ExpressionOperator op: mArgs) {
+ if (!DataType.isUsableType(op.getType())) {
+ String msg = "Problem with input: " + op + " of User-defined function: " + this ;
+ mFieldSchema = null;
+ mIsFieldSchemaComputed = false;
+ throw new FrontendException(msg) ;
+ }
+ inputSchema.add(op.getFieldSchema());
}
- inputSchema.add(op.getFieldSchema());
- }
-
- EvalFunc<?> ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
- Schema udfSchema = ef.outputSchema(inputSchema);
-
- if (null != udfSchema) {
- Schema.FieldSchema fs;
- try {
- fs = new Schema.FieldSchema(udfSchema.getField(0));
- } catch (ParseException pe) {
- throw new FrontendException(pe.getMessage());
+
+ EvalFunc<?> ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
+ Schema udfSchema = ef.outputSchema(inputSchema);
+
+ if (null != udfSchema) {
+ Schema.FieldSchema fs;
+ try {
+ fs = new Schema.FieldSchema(udfSchema.getField(0));
+ } catch (ParseException pe) {
+ throw new FrontendException(pe.getMessage());
+ }
+ setType(fs.type);
+ mFieldSchema = fs;
+ mIsFieldSchemaComputed = true;
+ } else {
+ byte returnType = DataType.findType(ef.getReturnType());
+ setType(returnType);
+ mFieldSchema = new Schema.FieldSchema(null, null, returnType);
+ mIsFieldSchemaComputed = true;
}
- setType(fs.type);
- mFieldSchema = fs;
- mIsFieldSchemaComputed = true;
- } else {
- byte returnType = DataType.findType(ef.getReturnType());
- setType(returnType);
- mFieldSchema = new Schema.FieldSchema(null, null, returnType);
- mIsFieldSchemaComputed = true;
}
return mFieldSchema;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Thu Oct 2 13:36:49 2008
@@ -167,11 +167,6 @@
*/
public void setCanonicalNames() {
for (Schema.FieldSchema fs : mSchema.getFields()) {
- if (fs.canonicalName != null) {
- throw new RuntimeException("Attempt to rename field " +
- fs.alias + " in operator " + name() + " that " +
- "already has canonical name " + fs.canonicalName);
- }
fs.canonicalName = CanonicalNamer.getNewName();
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Thu Oct 2 13:36:49 2008
@@ -36,6 +36,11 @@
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
/**
* A visitor to discover if any schema has been specified for a file being
@@ -129,6 +134,17 @@
p.connect(proj, cast);
cast.setFieldSchema(fs.clone());
+ LoadFunc loadFunc = null;
+ if(lo instanceof LOLoad) {
+ loadFunc = ((LOLoad)lo).getLoadFunc();
+ } else if (lo instanceof LOStream) {
+ StreamingCommand command = ((LOStream)lo).getStreamingCommand();
+ HandleSpec streamOutputSpec = command.getOutputSpec();
+ loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(streamOutputSpec.getSpec());
+ } else {
+ throw new OptimizerException("TypeCastInserter invoked with an invalid operator class name:" + lo.getClass().getSimpleName());
+ }
+ cast.setLoadFunc(loadFunc);
typeChanges.put(fs.canonicalName, fs.type);
// Reset the loads field schema to byte array so that it
// will reflect reality.
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Oct 2 13:36:49 2008
@@ -913,7 +913,6 @@
{
SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY);
op.setSchema(schema);
- op.setCanonicalNames();
log.debug("Load as schema" + schema);
}
| fs = AtomSchema()
@@ -949,7 +948,6 @@
{
SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY);
op.setSchema(schema);
- op.setCanonicalNames();
log.debug("Stream as schema()"+ schema);
}
| fs = AtomSchema()
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu Oct 2 13:36:49 2008
@@ -32,9 +32,25 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.CanonicalNamer;
+/**
+ * The Schema class encapsulates the notion of a schema for a relational operator.
+ * A schema is a list of columns that describe the output of a relational operator.
+ * Each column in the relation is represented as a FieldSchema, a static class inside
+ * the Schema. A column by definition has an alias, a type and a possible schema (if the
+ * column is a bag or a tuple). In addition, each column in the schema has a unique
+ * auto generated name used for tracking the lineage of the column in a sequence of
+ * statements.
+ *
+ * The lineage of the column is tracked using a map of the predecessors' columns to
+ * the operators that generate the predecessor columns. The predecessor columns are the
+ * columns required in order to generate the column under consideration. Similarly, a
+ * reverse lookup of operators that generate the predecessor column to the predecessor
+ * column is maintained.
+ */
public class Schema implements Serializable, Cloneable {
@@ -71,7 +87,7 @@
*/
public String canonicalName = null;
- /**
+ /*
* Map of canonical names used for this field in other sections of the
* plan. It can occur that a single field will have different
* canonical names in different branches of a plan. For example,
@@ -79,10 +95,25 @@
* column will have canonical name, say, of 'r'. But in branches
* above the cogroup it may have been known as 's' in the A branch and
* 't' in the B branch. This map preserves that. The key is a
- * logical operator's key, and the value is the canonical name
+ * logical operator, and the value is the canonical name
* associated with the field for that operator.
*/
- public Map<OperatorKey, String> canonicalMap = null;
+ private Map<String, LogicalOperator> canonicalMap = null;
+
+ /**
+ * A reverse lookup of canonical names to logical operators. The reverse
+ * lookup serves cases where the canonical name of the predecessor
+ * cannot be determined. In such cases the keys of the reverse lookup
+ * can be used to navigate the plan
+ */
+ private MultiMap<LogicalOperator, String> reverseCanonicalMap = null;
+
+ /**
+ * Canonical namer object to generate new canonical names on
+ * request. In order to ensure unique and consistent names, across
+ * all field schema objects, the object is made static.
+ */
+ public static CanonicalNamer canonicalNamer = new CanonicalNamer();
private static Log log = LogFactory.getLog(Schema.FieldSchema.class);
@@ -99,6 +130,9 @@
alias = a;
type = t;
schema = null;
+ canonicalName = canonicalNamer.getNewName();
+ canonicalMap = new HashMap<String, LogicalOperator>();
+ reverseCanonicalMap = new MultiMap<LogicalOperator, String>();
}
/**
@@ -113,6 +147,9 @@
alias = a;
type = DataType.TUPLE;
schema = s;
+ canonicalName = canonicalNamer.getNewName();
+ canonicalMap = new HashMap<String, LogicalOperator>();
+ reverseCanonicalMap = new MultiMap<LogicalOperator, String>();
}
/**
@@ -131,11 +168,14 @@
alias = a;
schema = s;
log.debug("t: " + t + " Bag: " + DataType.BAG + " tuple: " + DataType.TUPLE);
- if ((null != s) && (t != DataType.BAG) && (t != DataType.TUPLE)) {
+ if ((null != s) && !(DataType.isSchemaType(t))) {
throw new FrontendException("Only a BAG or TUPLE can have schemas. Got "
+ DataType.findTypeName(t));
}
type = t;
+ canonicalName = canonicalNamer.getNewName();
+ canonicalMap = new HashMap<String, LogicalOperator>();
+ reverseCanonicalMap = new MultiMap<LogicalOperator, String>();
}
/**
@@ -159,9 +199,27 @@
schema = null;
type = DataType.UNKNOWN;
}
+ canonicalName = canonicalNamer.getNewName();
+ canonicalMap = new HashMap<String, LogicalOperator>();
+ reverseCanonicalMap = new MultiMap<LogicalOperator, String>();
}
- /***
+ public void setParent(String parentCanonicalName, LogicalOperator parent) {
+ if(null != parentCanonicalName) {
+ canonicalMap.put(parentCanonicalName, parent);
+ }
+ reverseCanonicalMap.put(parent, parentCanonicalName);
+ }
+
+ public Map<String, LogicalOperator> getCanonicalMap() {
+ return canonicalMap;
+ }
+
+ public MultiMap<LogicalOperator, String> getReverseCanonicalMap() {
+ return reverseCanonicalMap;
+ }
+
+ /**
* Two field schemas are equal if types and schemas
* are equal in all levels.
*
@@ -350,7 +408,7 @@
fs.canonicalName = canonicalName;
if (canonicalMap != null) {
fs.canonicalMap =
- new HashMap<OperatorKey, String>(canonicalMap);
+ new HashMap<String, LogicalOperator>(canonicalMap);
}
return fs;
} catch (FrontendException fe) {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Oct 2 13:36:49 2008
@@ -25,9 +25,11 @@
import java.util.List;
import java.util.Map;
import java.util.Stack;
+import java.util.HashSet;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
import org.apache.pig.Algebraic;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -44,6 +46,9 @@
import org.apache.pig.impl.plan.*;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.data.DataType ;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -359,7 +364,7 @@
}
}
- private void insertCastForRegexp(LORegexp rg) {
+ private void insertCastForRegexp(LORegexp rg) throws VisitorException {
LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
collectCastWarning(rg, DataType.BYTEARRAY, DataType.CHARARRAY) ;
OperatorKey newKey = genNewOperatorKey(rg) ;
@@ -376,6 +381,7 @@
throw err ;
}
rg.setOperand(cast) ;
+ this.visit(cast);
}
public void visit(LOAnd binOp) throws VisitorException {
@@ -1055,7 +1061,7 @@
}
private void insertLeftCastForBinaryOp(BinaryExpressionOperator binOp,
- byte toType ) {
+ byte toType ) throws VisitorException {
LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
collectCastWarning(binOp,
binOp.getLhsOperand().getType(),
@@ -1075,10 +1081,11 @@
throw err ;
}
binOp.setLhsOperand(cast) ;
+ this.visit(cast);
}
private void insertRightCastForBinaryOp(BinaryExpressionOperator binOp,
- byte toType ) {
+ byte toType ) throws VisitorException {
LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
collectCastWarning(binOp,
binOp.getRhsOperand().getType(),
@@ -1098,6 +1105,7 @@
throw err ;
}
binOp.setRhsOperand(cast) ;
+ this.visit(cast);
}
/**
@@ -1139,7 +1147,7 @@
}
- private void insertCastForUniOp(UnaryExpressionOperator uniOp, byte toType) {
+ private void insertCastForUniOp(UnaryExpressionOperator uniOp, byte toType) throws VisitorException {
collectCastWarning(uniOp,
uniOp.getOperand().getType(),
toType) ;
@@ -1164,6 +1172,8 @@
throw err ;
}
+ this.visit(cast);
+
}
// Currently there is no input type information support in UserFunc
@@ -1507,7 +1517,7 @@
}
- private void insertLeftCastForBinCond(LOBinCond binCond, byte toType) {
+ private void insertLeftCastForBinCond(LOBinCond binCond, byte toType) throws VisitorException {
LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
collectCastWarning(binCond,
@@ -1528,10 +1538,11 @@
throw err ;
}
binCond.setLhsOp(cast) ;
+ this.visit(cast);
}
- private void insertRightCastForBinCond(LOBinCond binCond, byte toType) {
+ private void insertRightCastForBinCond(LOBinCond binCond, byte toType) throws VisitorException {
LogicalPlan currentPlan = (LogicalPlan) mCurrentWalker.getPlan() ;
collectCastWarning(binCond,
@@ -1552,6 +1563,7 @@
throw err ;
}
binCond.setRhsOp(cast) ;
+ this.visit(cast);
}
@@ -1600,6 +1612,17 @@
// cast.getType() already returns the correct type so don't have to
// set here. This is a special case where output type is not
// automatically determined.
+
+ if(inputType == DataType.BYTEARRAY) {
+ try {
+ LoadFunc loadFunc = getLoadFunc(cast.getExpression());
+ cast.setLoadFunc(loadFunc);
+ } catch (FrontendException fee) {
+ throw new VisitorException("Cannot resolve load function to use for casting from " +
+ DataType.findTypeName(inputType) + " to " +
+ DataType.findTypeName(expectedType) + ". " + fee.getMessage());
+ }
+ }
}
@@ -1751,7 +1774,6 @@
LogicalPlan currentPlan = mCurrentWalker.getPlan() ;
List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
- // LOSplitOutput can only have 1 input
try {
// Compute the schema
op.getSchema() ;
@@ -1766,6 +1788,22 @@
}
}
+ @Override
+ protected void visit(LOLimit op) throws VisitorException {
+ try {
+ // Compute the schema
+ op.regenerateSchema() ;
+ }
+ catch (FrontendException fe) {
+ String msg = "Problem while reading"
+ + " schemas from inputs of LODistinct" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(fe) ;
+ throw vse ;
+ }
+ }
+
/***
* Return concatenated of all fields from all input operators
* If one of the inputs have no schema then we cannot construct
@@ -1853,6 +1891,19 @@
checkInnerPlan(comparisonPlan) ;
+
+ /*
+ try {
+ System.err.println("Filter inner plan typechecked");
+ LOPrinter lv = new LOPrinter(System.err, comparisonPlan);
+ lv.visit();
+ System.err.println();
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ }
+ */
+
byte innerCondType = comparisonPlan.getLeaves().get(0).getType() ;
if (innerCondType != DataType.BOOLEAN) {
String msg = "Filter's condition must evaluate to boolean. Found: " + DataType.findTypeName(innerCondType);
@@ -1890,7 +1941,7 @@
try {
// Compute the schema
- split.getSchema() ;
+ split.regenerateSchema() ;
}
catch (FrontendException ioe) {
String msg = "Problem while reconciling output schema of LOSplit" ;
@@ -2040,7 +2091,7 @@
// as a new leave of the plan
private void insertAtomicCastForCOGroupInnerPlan(LogicalPlan innerPlan,
LOCogroup cg,
- byte toType) {
+ byte toType) throws VisitorException {
if(!DataType.isUsableType(toType)) {
throw new AssertionError("Cannot cast to type " + DataType.findTypeName(toType));
}
@@ -2063,6 +2114,7 @@
err.initCause(ioe) ;
throw err ;
}
+ this.visit(cast);
}
/**
@@ -2508,4 +2560,155 @@
return new OperatorKey(scope, newId) ;
}
+ private LoadFunc getLoadFunc(ExpressionOperator exOp) throws FrontendException {
+ Schema.FieldSchema fs = ((ExpressionOperator)exOp).getFieldSchema();
+ if(null == fs) {
+ return null;
+ }
+
+ Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
+ MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
+ MultiMap<String, LoadFunc> loadFuncMap = new MultiMap<String, LoadFunc>();
+
+ if(canonicalMap.keySet().size() > 0) {
+ for(String parentCanonicalName: canonicalMap.keySet()) {
+ LoadFunc lf = getLoadFunc(exOp, parentCanonicalName);
+ if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+ }
+ } else {
+ for(LogicalOperator op: reverseCanonicalMap.keySet()) {
+ for(String parentCanonicalName: reverseCanonicalMap.get(op)) {
+ LoadFunc lf = getLoadFunc(op, parentCanonicalName);
+ if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+ }
+ }
+ }
+ if(loadFuncMap.keySet().size() == 0) {
+ return null;
+ }
+ if(loadFuncMap.keySet().size() == 1) {
+ String lfString = loadFuncMap.keySet().iterator().next();
+ return (LoadFunc)(loadFuncMap.get(lfString).iterator().next());
+ }
+
+ throw new FrontendException("Found more than one load function to use: " + loadFuncMap.keySet());
+ }
+
+ private LoadFunc getLoadFunc(LogicalOperator op, String parentCanonicalName) throws FrontendException {
+ MultiMap<String, LoadFunc> loadFuncMap = new MultiMap<String, LoadFunc>();
+ if(op instanceof ExpressionOperator) {
+ if(op instanceof LOUserFunc) {
+ throw new FrontendException("Found a user defined function. Cannot determine the load function to use");
+ }
+
+ Schema.FieldSchema fs = ((ExpressionOperator)op).getFieldSchema();
+ Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
+ MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
+
+ if(canonicalMap.keySet().size() > 0) {
+ for(String canonicalName: canonicalMap.keySet()) {
+ LoadFunc lf = getLoadFunc(fs, canonicalName);
+ if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+ }
+ } else {
+ for(LogicalOperator lop: reverseCanonicalMap.keySet()) {
+ for(String canonicalName: reverseCanonicalMap.get(lop)) {
+ LoadFunc lf = getLoadFunc(fs, canonicalName);
+ if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+ }
+ }
+ }
+ } else {
+ if(op instanceof LOLoad) {
+ return ((LOLoad)op).getLoadFunc();
+ } else if (op instanceof LOStream) {
+ StreamingCommand command = ((LOStream)op).getStreamingCommand();
+ HandleSpec streamOutputSpec = command.getOutputSpec();
+ LoadFunc streamLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(streamOutputSpec.getSpec());
+ return streamLoader;
+ } else if ((op instanceof LOFilter)
+ || (op instanceof LODistinct)
+ || (op instanceof LOSort)
+ || (op instanceof LOSplit)
+ || (op instanceof LOSplitOutput)
+ || (op instanceof LOLimit)) {
+ LogicalPlan lp = op.getPlan();
+ LoadFunc lf = getLoadFunc(lp.getPredecessors(op).get(0), parentCanonicalName);
+ return lf;
+ //return getLoadFunc(lp.getPredecessors(op).get(0), parentCanonicalName);
+ }
+
+ Schema s = op.getSchema();
+ if(null != s) {
+ for(Schema.FieldSchema fs: s.getFields()) {
+ if(null != parentCanonicalName && (parentCanonicalName.equals(fs.canonicalName))) {
+ if(fs.getCanonicalMap().keySet().size() > 0) {
+ for(String canonicalName: fs.getCanonicalMap().keySet()) {
+ LoadFunc lf = getLoadFunc(fs, canonicalName);
+ if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+ }
+ } else {
+ LoadFunc lf = getLoadFunc(fs, null);
+ if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+ }
+ } else if (null == parentCanonicalName) {
+ LoadFunc lf = getLoadFunc(fs, null);
+ if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+ }
+ }
+ } else {
+ LogicalPlan lp = op.getPlan();
+ for(LogicalOperator pred: lp.getPredecessors(op)) {
+ LoadFunc lf = getLoadFunc(pred, parentCanonicalName);
+ if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+ }
+ }
+ }
+ if(loadFuncMap.keySet().size() == 0) {
+ return null;
+ }
+ if(loadFuncMap.keySet().size() == 1) {
+ String lfString = loadFuncMap.keySet().iterator().next();
+ return (LoadFunc)(loadFuncMap.get(lfString).iterator().next());
+ }
+
+ throw new FrontendException("Found more than one load function to use: " + loadFuncMap.keySet());
+ }
+
+ private LoadFunc getLoadFunc(Schema.FieldSchema fs, String parentCanonicalName) throws FrontendException {
+ if(null == fs) {
+ return null;
+ }
+ Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
+ MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
+ MultiMap<String, LoadFunc> loadFuncMap = new MultiMap<String, LoadFunc>();
+
+ if(canonicalMap.keySet().size() > 0) {
+ for(String canonicalName: canonicalMap.keySet()) {
+ if((null == parentCanonicalName) || (parentCanonicalName.equals(canonicalName))) {
+ LoadFunc lf = getLoadFunc(canonicalMap.get(canonicalName), parentCanonicalName);
+ if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+ }
+ }
+ } else {
+ for(LogicalOperator op: reverseCanonicalMap.keySet()) {
+ for(String canonicalName: reverseCanonicalMap.get(op)) {
+ if((null == parentCanonicalName) || (parentCanonicalName.equals(canonicalName))) {
+ LoadFunc lf = getLoadFunc(op, parentCanonicalName);
+ if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+ }
+ }
+ }
+ }
+ if(loadFuncMap.keySet().size() == 0) {
+ return null;
+ }
+ if(loadFuncMap.keySet().size() == 1) {
+ String lfString = loadFuncMap.keySet().iterator().next();
+ return (LoadFunc)(loadFuncMap.get(lfString).iterator().next());
+ }
+
+ throw new FrontendException("Found more than one load function to use: " + loadFuncMap.keySet());
+ }
+
}