You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/07/28 23:04:53 UTC
svn commit: r680494 - in /incubator/pig/branches/types:
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/validators/ test/org/apache/pig/test/
Author: gates
Date: Mon Jul 28 14:04:53 2008
New Revision: 680494
URL: http://svn.apache.org/viewvc?rev=680494&view=rev
Log:
PIG-320 Santhosh's patch to address the type checker using the schema of the udf return type when it is provided.
Modified:
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=680494&r1=680493&r2=680494&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Mon Jul 28 14:04:53 2008
@@ -30,6 +30,7 @@
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -280,4 +281,14 @@
mGroupByPlans.put(newOp, innerPlans);
}
+ public void resetSchema() throws VisitorException{
+ for(LogicalOperator input: getInputs()) {
+ for(LogicalPlan plan: mGroupByPlans.get(input)) {
+ SchemaRemover sr = new SchemaRemover(plan);
+ sr.visit();
+ }
+ }
+ unsetSchema();
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=680494&r1=680493&r2=680494&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Mon Jul 28 14:04:53 2008
@@ -25,6 +25,7 @@
import java.util.Iterator;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
@@ -247,4 +248,13 @@
log.debug("Exiting getSchema");
return mSchema;
}
+
+ public void resetSchema() throws VisitorException{
+ for(LogicalPlan plan: mForEachPlans) {
+ SchemaRemover sr = new SchemaRemover(plan);
+ sr.visit();
+ }
+ unsetSchema();
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=680494&r1=680493&r2=680494&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Mon Jul 28 14:04:53 2008
@@ -1370,9 +1370,10 @@
} else {
func.setFuncSpec(matchingSpec);
ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(matchingSpec);
- func.setType(DataType.findType(ef.getReturnType()));
+ setUdfSchema(func, ef, s);
}
-
+ } else {
+ setUdfSchema(func, ef, s);
}
/*
while (iterator.hasNext()) {
@@ -1381,6 +1382,26 @@
*/
}
+ private void setUdfSchema(LOUserFunc func, EvalFunc ef, Schema inputSchema) throws VisitorException {
+ Schema udfSchema = ef.outputSchema(inputSchema);
+ if (null != udfSchema) {
+ Schema.FieldSchema fs;
+ try {
+ fs = udfSchema.getField(0);
+ } catch (ParseException pe) {
+ throw new VisitorException(pe.getMessage());
+ }
+ func.setType(fs.type);
+ try {
+ func.setFieldSchema(fs);
+ } catch (FrontendException fe) {
+ throw new VisitorException(fe.getMessage());
+ }
+ } else {
+ func.setType(DataType.findType(ef.getReturnType()));
+ }
+ }
+
/**
* For Bincond, lhsOp and rhsOp must have the same output type
* or both sides have to be number
@@ -1570,6 +1591,7 @@
@Override
protected void visit(LOUnion u) throws VisitorException {
+ u.unsetSchema();
// Have to make a copy, because as we insert operators, this list will
// change under us.
List<LogicalOperator> inputs =
@@ -1690,6 +1712,7 @@
@Override
protected void visit(LODistinct op) throws VisitorException {
+ op.unsetSchema();
LogicalPlan currentPlan = mCurrentWalker.getPlan() ;
List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
@@ -1716,6 +1739,7 @@
* @throws VisitorException
*/
protected void visit(LOCross cs) throws VisitorException {
+ cs.unsetSchema();
List<LogicalOperator> inputs = cs.getInputs() ;
List<FieldSchema> fsList = new ArrayList<FieldSchema>() ;
@@ -1753,6 +1777,7 @@
*/
protected void visit(LOSort s) throws VisitorException {
+ s.unsetSchema();
LogicalOperator input = s.getInput() ;
// Type checking internal plans.
@@ -1794,6 +1819,7 @@
@Override
protected void visit(LOFilter filter) throws VisitorException {
+ filter.unsetSchema();
LogicalOperator input = filter.getInput() ;
LogicalPlan comparisonPlan = filter.getComparisonPlan() ;
@@ -1860,6 +1886,16 @@
* same type
*/
protected void visit(LOCogroup cg) throws VisitorException {
+ cg.resetSchema();
+ try {
+ cg.getSchema();
+ } catch (FrontendException fe) {
+ String msg = "Cannot resolve COGroup output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(fe) ;
+ throw vse ;
+ }
MultiMap<LogicalOperator, LogicalPlan> groupByPlans
= cg.getGroupByPlans() ;
List<LogicalOperator> inputs = cg.getInputs() ;
@@ -2156,6 +2192,7 @@
List<LogicalPlan> plans = f.getForEachPlans() ;
List<Boolean> flattens = f.getFlatten() ;
+ f.resetSchema();
try {
// Have to resolve all inner plans before calling getSchema
@@ -2193,7 +2230,6 @@
}
- f.setSchemaComputed(false);
f.getSchema();
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=680494&r1=680493&r2=680494&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Mon Jul 28 14:04:53 2008
@@ -44,6 +44,8 @@
import org.apache.pig.data.*;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.PigFile;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import junit.framework.TestCase;
@@ -173,7 +175,6 @@
assertFalse(iter.hasNext());
}
-
static public class TitleNGrams extends EvalFunc<DataBag> {
@Override
@@ -232,6 +233,16 @@
}
return sb.toString();
}
+
+ public Schema outputSchema(Schema input) {
+ try {
+ Schema stringSchema = new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY));
+ Schema.FieldSchema fs = new Schema.FieldSchema(null, stringSchema, DataType.BAG);
+ return new Schema(fs);
+ } catch (Exception e) {
+ return null;
+ }
+ }
}
@@ -240,6 +251,10 @@
public Tuple exec(Tuple input) throws IOException {
return input;
}
+
+ public Schema outputSchema(Schema input) {
+ return input;
+ }
}