You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/11 01:06:16 UTC
svn commit: r703598 - in /incubator/pig/branches/types: ./
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/parser/
src/org/apache/pig/impl/logicalLayer/schema/
src/org/apache/pig/impl/logicalLayer/validators/ test/org/apache/...
Author: olga
Date: Fri Oct 10 16:06:16 2008
New Revision: 703598
URL: http://svn.apache.org/viewvc?rev=703598&view=rev
Log:
PIG-489: (*) handling
Added:
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Fri Oct 10 16:06:16 2008
@@ -278,3 +278,5 @@
PIG-465: performance improvement - removing keys from the value (pradeepk
via olgan)
+
+ PIG-489: (*) processing (sms via olgan)
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Fri Oct 10 16:06:16 2008
@@ -83,6 +83,10 @@
return mIsInner;
}
+ public void setInner(boolean[] inner) {
+ mIsInner = inner;
+ }
+
@Override
public String name() {
return "CoGroup " + mKey.scope + "-" + mKey.id;
@@ -192,7 +196,7 @@
setFieldSchemaParent(groupByFs, positionOperators, i);
break;
} else {
- if(j < aliases.length) {
+ if((j + 1) < aliases.length) {
continue;
} else {
//we have seen this alias before
@@ -205,6 +209,8 @@
Schema.FieldSchema opFs = op.getFieldSchema();
if(null != opFs) {
groupByFs.setParent(opFs.canonicalName, eOp);
+ } else {
+ groupByFs.setParent(null, eOp);
}
}
} else {
@@ -231,8 +237,26 @@
} else {
//We do not have any alias for this position in the group by columns
//We have positions $1, $2, etc.
- groupByFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
- groupByFss.add(groupByFs);
+ Collection<ExpressionOperator> cEops = positionOperators.get(i);
+ if(null != cEops) {
+ ExpressionOperator eOp = (ExpressionOperator) (cEops.toArray())[0];
+ if(null != eOp) {
+ Schema.FieldSchema fs = eOp.getFieldSchema();
+ if(null != fs) {
+ groupByFs = new Schema.FieldSchema(null, fs.schema, fs.type);
+ groupByFss.add(groupByFs);
+ } else {
+ groupByFs = new Schema.FieldSchema(null, null, DataType.BYTEARRAY);
+ groupByFss.add(groupByFs);
+ }
+ } else {
+ groupByFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ groupByFss.add(groupByFs);
+ }
+ } else {
+ groupByFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ groupByFss.add(groupByFs);
+ }
setFieldSchemaParent(groupByFs, positionOperators, i);
}
}
@@ -266,7 +290,11 @@
}
}
- fss.add(new Schema.FieldSchema("group", mergedGroupSchema));
+ Schema.FieldSchema groupByFs = new Schema.FieldSchema("group", mergedGroupSchema);
+ fss.add(groupByFs);
+ for(int i = 0; i < arity; ++i) {
+ setFieldSchemaParent(groupByFs, positionOperators, i);
+ }
}
for (LogicalOperator op : inputs) {
try {
@@ -341,6 +369,7 @@
throw new FrontendException("getAtomicGroupByType is used only when"
+ " dealing with atomic group col") ;
}
+
byte groupType = DataType.BYTEARRAY ;
// merge all the inner plan outputs so we know what type
// our group column should be
@@ -388,13 +417,32 @@
List<LogicalPlan> innerPlans
= new ArrayList<LogicalPlan>(getGroupByPlans().get(input)) ;
+ boolean seenProjectStar = false;
for(int j=0;j < innerPlans.size(); j++) {
byte innerType = innerPlans.get(j).getSingleLeafPlanOutputType() ;
ExpressionOperator eOp = (ExpressionOperator)innerPlans.get(j).getSingleLeafPlanOutputOp();
+
+ if(eOp instanceof LOProject) {
+ if(((LOProject)eOp).isStar()) {
+ seenProjectStar = true;
+ }
+ }
+
Schema.FieldSchema groupFs = fsList.get(j);
groupFs.type = DataType.mergeType(groupFs.type, innerType) ;
- groupFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
+ Schema.FieldSchema fs = eOp.getFieldSchema();
+ if(null != fs) {
+ groupFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
+ } else {
+ groupFs.setParent(null, eOp);
+ }
}
+
+ if(seenProjectStar) {
+ throw new FrontendException("Grouping attributes can either be star (*) or a list of expressions, but not both.");
+
+ }
+
}
return new Schema(fsList) ;
@@ -405,6 +453,8 @@
Schema.FieldSchema opFs = op.getFieldSchema();
if(null != opFs) {
fs.setParent(opFs.canonicalName, op);
+ } else {
+ fs.setParent(null, op);
}
}
}
@@ -415,6 +465,8 @@
for(Schema.FieldSchema inputFs: s.getFields()) {
if(null != inputFs) {
fs.setParent(inputFs.canonicalName, op);
+ } else {
+ fs.setParent(null, op);
}
}
} else {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Fri Oct 10 16:06:16 2008
@@ -86,10 +86,26 @@
return mForEachPlans;
}
+ public void setForEachPlans(ArrayList<LogicalPlan> foreachPlans) {
+ mForEachPlans = foreachPlans;
+ }
+
public List<Boolean> getFlatten() {
return mFlatten;
}
+ public void setFlatten(ArrayList<Boolean> flattenList) {
+ mFlatten = flattenList;
+ }
+
+ public List<Schema> getUserDefinedSchema() {
+ return mUserDefinedSchema;
+ }
+
+ public void setUserDefinedSchema(ArrayList<Schema> userDefinedSchema) {
+ mUserDefinedSchema = userDefinedSchema;
+ }
+
@Override
public String name() {
return "ForEach " + mKey.scope + "-" + mKey.id;
@@ -148,6 +164,24 @@
log.debug("Flatten: " + mFlatten.get(planCtr));
Schema.FieldSchema planFs;
+ if(op instanceof LOProject) {
+ //the check for the type is required for statements like
+ //foreach cogroup {
+ // a1 = order a by *;
+ // generate a1;
+ //}
+ //In the above script, the generate a1, will translate to
+ //project(a1) -> project(*) and will not be translated to a sequence of projects
+ //As a result the project(*) will remain but the return type is a bag
+ //project*) with a data type set to tuple indicataes a project(*) from an input
+ //that has no schema
+ if( (((LOProject)op).isStar() ) && (((LOProject)op).getType() == DataType.TUPLE) ) {
+ mSchema = null;
+ mIsSchemaComputed = true;
+ return mSchema;
+ }
+ }
+
try {
planFs = ((ExpressionOperator)op).getFieldSchema();
log.debug("planFs: " + planFs);
@@ -285,9 +319,9 @@
throw new FrontendException(pe.getMessage());
}
} else {
- Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
- fss.add(newFs);
- newFs.setParent(null, op);
+ mSchema = null;
+ mIsSchemaComputed = true;
+ return mSchema;
}
}
} catch (FrontendException fee) {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java Fri Oct 10 16:06:16 2008
@@ -205,7 +205,7 @@
//TODO
//the type of the operator will be unknown. when type checking is in place
//add the type of the operator as a parameter to the fieldschema creation
- mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema(), DataType.TUPLE);
+ mFieldSchema = new Schema.FieldSchema(null, expressionOperator.getSchema(), DataType.TUPLE);
mFieldSchema.setParent(null, expressionOperator);
//mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema());
}
@@ -218,9 +218,6 @@
mIsFieldSchemaComputed = false;
throw fee;
}
- log.debug("mIsStar is true, returning schema of expressionOperator");
- log.debug("Exiting getSchema()");
- return mFieldSchema;
} else {
//its n list of columns to project including a single column
List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(mProjection.size());
@@ -323,8 +320,8 @@
mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), new Schema(fss));
mFieldSchema.setParent(null, expressionOperator);
mIsFieldSchemaComputed = true;
- log.debug("mIsStar is false, returning computed field schema of expressionOperator");
}
+
}
if(null != mFieldSchema) {
@@ -339,7 +336,9 @@
mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.TUPLE);
mFieldSchema.setParent(null, expressionOperator);
} else {
- mFieldSchema.type = DataType.TUPLE;
+ if(null != mFieldSchema) {
+ mFieldSchema.type = DataType.TUPLE;
+ }
}
setOverloaded(true);
setType(DataType.TUPLE);
@@ -351,7 +350,9 @@
mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.BAG);
mFieldSchema.setParent(null, expressionOperator);
} else {
- mFieldSchema.type = DataType.BAG;
+ if(null != mFieldSchema) {
+ mFieldSchema.type = DataType.BAG;
+ }
}
setType(DataType.BAG);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Fri Oct 10 16:06:16 2008
@@ -77,10 +77,18 @@
return mSortColPlans;
}
+ public void setSortColPlans(List<LogicalPlan> sortPlans) {
+ mSortColPlans = sortPlans;
+ }
+
public List<Boolean> getAscendingCols() {
return mAscCols;
}
+ public void setAscendingCols(List<Boolean> ascCols) {
+ mAscCols = ascCols;
+ }
+
public FuncSpec getUserFunc() {
return mSortFunc;
}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=703598&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Fri Oct 10 16:06:16 2008
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.List;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Map;
+import java.util.ArrayList;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.data.DataType;
+
+/**
+ * A visitor to walk operators that contain a nested plan and translate project( * )
+ * operators to a list of projection operators, i.e.,
+ * project( * ) -> project(0), project(1), ... project(n-2), project(n-1)
+ */
+public class ProjectStarTranslator extends
+ LOVisitor {
+
+ public ProjectStarTranslator(LogicalPlan plan) {
+ super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+ }
+
+ /**
+ *
+ * @param cg
+ * the logical cogroup operator that has to be visited
+ * @throws VisitorException
+ */
+ protected void visit(LOCogroup cg) throws VisitorException {
+ //get the attributes of cogroup that are modified during the trnalsation
+
+ MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cg.getGroupByPlans();
+
+ for(LogicalOperator op: cg.getInputs()) {
+ ArrayList<LogicalPlan> newGByPlans = new ArrayList<LogicalPlan>();
+ for(LogicalPlan lp: mapGByPlans.get(op)) {
+ if (checkPlanForProjectStar(lp)) {
+ ArrayList<LogicalPlan> translatedPlans = translateProjectStarInPlan(lp);
+ for(int j = 0; j < translatedPlans.size(); ++j) {
+ newGByPlans.add(translatedPlans.get(j));
+ }
+ } else {
+ newGByPlans.add(lp);
+ }
+ }
+ mapGByPlans.removeKey(op);
+ mapGByPlans.put(op, newGByPlans);
+ }
+ }
+
+ /**
+ *
+ * @param forEach
+ * the logical foreach operator that has to be visited
+ * @throws VisitorException
+ */
+ protected void visit(LOForEach forEach) throws VisitorException {
+ //get the attributes of foreach that are modified during the trnalsation
+
+ super.visit(forEach);
+
+ //List of inner plans
+ ArrayList<LogicalPlan> foreachPlans = forEach.getForEachPlans();
+ ArrayList<LogicalPlan> newForeachPlans = new ArrayList<LogicalPlan>();
+
+ //the flatten list
+ List<Boolean> flattenList = forEach.getFlatten();
+ ArrayList<Boolean> newFlattenList = new ArrayList<Boolean>();
+
+ //user specified schemas in the as clause
+ List<Schema> userDefinedSchemaList = forEach.getUserDefinedSchema();
+ ArrayList<Schema> newUserDefinedSchemaList = new ArrayList<Schema>();
+
+ for(int i = 0; i < foreachPlans.size(); ++i) {
+ LogicalPlan lp = foreachPlans.get(i);
+ if(checkPlanForProjectStar(lp)) {
+ ArrayList<LogicalPlan> translatedPlans = translateProjectStarInPlan(lp);
+ Schema s = userDefinedSchemaList.get(i);
+ for(int j = 0; j < translatedPlans.size(); ++j) {
+ LogicalPlan translatedPlan = translatedPlans.get(j);
+ newForeachPlans.add(translatedPlan);
+ newFlattenList.add(flattenList.get(i));
+ if(null != s) {
+ try {
+ if(j < s.size()) {
+ newUserDefinedSchemaList.add(new Schema(s.getField(j)));
+ } else {
+ newUserDefinedSchemaList.add(null);
+ }
+ } catch (ParseException pe) {
+ throw new VisitorException(pe.getMessage(), pe);
+ }
+ } else {
+ newUserDefinedSchemaList.add(null);
+ }
+ }
+ } else {
+ newForeachPlans.add(lp);
+ newFlattenList.add(flattenList.get(i));
+ if(null != userDefinedSchemaList) {
+ newUserDefinedSchemaList.add(userDefinedSchemaList.get(i));
+ } else {
+ newUserDefinedSchemaList.add(null);
+ }
+ }
+ }
+ forEach.setForEachPlans(newForeachPlans);
+ forEach.setFlatten(newFlattenList);
+ forEach.setUserDefinedSchema(newUserDefinedSchemaList);
+
+ }
+
+ /**
+ *
+ * @param s
+ * the logical sort operator that has to be visited
+ * @throws VisitorException
+ */
+ protected void visit(LOSort s) throws VisitorException {
+ //get the attributes of sort that are modified during the trnalsation
+
+ //List of inner plans
+ List<LogicalPlan> sortPlans = s.getSortColPlans();
+ ArrayList<LogicalPlan> newSortPlans = new ArrayList<LogicalPlan>();
+
+ //sort order
+ List<Boolean> sortOrder = s.getAscendingCols();
+ ArrayList<Boolean> newSortOrder = new ArrayList<Boolean>();
+
+ for(int i = 0; i < sortPlans.size(); ++i) {
+ LogicalPlan lp = sortPlans.get(i);
+ if(checkPlanForProjectStar(lp)) {
+ ArrayList<LogicalPlan> translatedPlans = translateProjectStarInPlan(lp);
+ for(int j = 0; j < translatedPlans.size(); ++j) {
+ newSortPlans.add(translatedPlans.get(j));
+ newSortOrder.add(sortOrder.get(i));
+ }
+ } else {
+ newSortPlans.add(lp);
+ newSortOrder.add(sortOrder.get(i));
+ }
+ }
+ s.setSortColPlans(newSortPlans);
+ s.setAscendingCols(newSortOrder);
+ }
+
+ private boolean checkPlanForProjectStar(LogicalPlan lp) {
+ List<LogicalOperator> leaves = lp.getLeaves();
+
+ for(LogicalOperator op: leaves) {
+ if(op instanceof LOProject) {
+ if(((LOProject) op).isStar() && ((LOProject)op).getType() != DataType.BAG) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ private LOProject getProjectStarFromPlan(LogicalPlan lp) {
+ List<LogicalOperator> leaves = lp.getLeaves();
+
+ for(LogicalOperator op: leaves) {
+ if(op instanceof LOProject) {
+ if(((LOProject) op).isStar()) {
+ return (LOProject)op;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private ArrayList<LogicalPlan> translateProjectStarInPlan(LogicalPlan lp) throws VisitorException {
+ //translate the project( * ) into a list of projections
+ LOProject projectStar = getProjectStarFromPlan(lp);
+ LogicalOperator projectInput = projectStar.getExpression();
+ ArrayList<LogicalPlan> translatedPlans = new ArrayList<LogicalPlan>();
+ Schema s = null;
+ try {
+ if(!(projectInput instanceof ExpressionOperator)) {
+ s = projectInput.getSchema();
+ } else {
+ Schema.FieldSchema fs = ((ExpressionOperator)projectInput).getFieldSchema();
+ if(null != fs) {
+ s = fs.schema;
+ }
+ }
+ } catch (FrontendException fee) {
+ throw new VisitorException(fee.getMessage(), fee);
+ }
+ if (null != s) {
+ for(int i = 0; i < s.size(); ++i) {
+ LogicalPlan replicatedPlan = replicatePlan(lp);
+ replaceProjectStar(replicatedPlan, projectStar, i);
+ translatedPlans.add(replicatedPlan);
+ }
+ } else {
+ translatedPlans.add(replicatePlan(lp));
+ }
+ return translatedPlans;
+ }
+
+ private LogicalPlan replicatePlan(LogicalPlan lp) throws VisitorException {
+ LogicalPlan replicatedPlan = new LogicalPlan();
+
+ for(LogicalOperator root: lp.getRoots()) {
+ replicatedPlan.add(root);
+ addSuccessors(lp, replicatedPlan, root);
+ }
+
+ return replicatedPlan;
+ }
+
+ private void addSuccessors(LogicalPlan lp, LogicalPlan replicatedPlan, LogicalOperator root) throws VisitorException {
+ List<LogicalOperator> successors = lp.getSuccessors(root);
+ if(null == successors) return;
+ for(LogicalOperator succ: successors) {
+ replicatedPlan.add(succ);
+ try {
+ replicatedPlan.connect(root, succ);
+ } catch (PlanException pe) {
+ throw new VisitorException(pe.getMessage(), pe);
+ }
+ addSuccessors(lp, replicatedPlan, succ);
+ }
+ }
+
+ private void replaceProjectStar(LogicalPlan lp, LOProject projectStar, int column) throws VisitorException {
+ String scope = projectStar.getOperatorKey().getScope();
+ LogicalOperator projectInput = projectStar.getExpression();
+ LogicalPlan projectPlan = projectStar.getPlan();
+ LOProject replacementProject = new LOProject(projectPlan, OperatorKey.genOpKey(scope), projectInput, column);
+ try {
+ lp.replace(projectStar, replacementProject);
+ } catch (PlanException pe) {
+ throw new VisitorException(pe.getMessage(), pe);
+ }
+ }
+
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Oct 10 16:06:16 2008
@@ -55,6 +55,7 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
+import org.apache.pig.impl.logicalLayer.ProjectStarTranslator;
public class QueryParser {
@@ -764,7 +765,13 @@
{
if(null != root) {
log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
+
+ //Translate all the project(*) leaves in the plan to a sequence of projections
+ ProjectStarTranslator translate = new ProjectStarTranslator(lp);
+ translate.visit();
+
addLogicalPlan(root, lp);
+
try {
log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
} catch(FrontendException fee) {
@@ -2479,7 +2486,9 @@
(
{
Schema.FieldSchema fs = item.getFieldSchema();
- subSchema = fs.schema;
+ if(null != fs) {
+ subSchema = fs.schema;
+ }
log.debug("subSchema: " + subSchema);
}
(
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Fri Oct 10 16:06:16 2008
@@ -390,6 +390,7 @@
sb.append(" cn: ");
sb.append(canonicalName);
}
+
return sb.toString();
}
@@ -515,9 +516,9 @@
mAliases = new HashMap<String, FieldSchema>(fields.size());
mFieldSchemas = new MultiMap<String, String>();
for (FieldSchema fs : fields) {
- if (fs.alias != null) {
- mAliases.put(fs.alias, fs);
- if(null != fs) {
+ if(null != fs) {
+ if (fs.alias != null) {
+ mAliases.put(fs.alias, fs);
mFieldSchemas.put(fs.canonicalName, fs.alias);
}
}
@@ -533,9 +534,9 @@
mFields.add(fieldSchema);
mAliases = new HashMap<String, FieldSchema>(1);
mFieldSchemas = new MultiMap<String, String>();
- if (fieldSchema.alias != null) {
- mAliases.put(fieldSchema.alias, fieldSchema);
- if(null != fieldSchema) {
+ if(null != fieldSchema) {
+ if (fieldSchema.alias != null) {
+ mAliases.put(fieldSchema.alias, fieldSchema);
mFieldSchemas.put(fieldSchema.canonicalName, fieldSchema.alias);
}
}
@@ -554,13 +555,11 @@
try {
for (int i = 0; i < s.size(); ++i) {
FieldSchema fs = new FieldSchema(s.getField(i));
+ mFields.add(fs);
if(null != fs) {
- mFields.add(fs);
if (fs.alias != null) {
mAliases.put(fs.alias, fs);
- if(null != fs) {
- mFieldSchemas.put(fs.canonicalName, fs.alias);
- }
+ mFieldSchemas.put(fs.canonicalName, fs.alias);
}
}
}
@@ -850,6 +849,11 @@
FieldSchema fs = schema.getField(i) ;
+ if(fs == null) {
+ sb.append("null");
+ continue;
+ }
+
if (fs.alias != null) {
sb.append(fs.alias);
sb.append(": ");
@@ -870,8 +874,9 @@
}
} else if (fs.type == DataType.MAP) {
sb.append(DataType.findTypeName(fs.type) + "[ ]") ;
+ } else {
+ sb.append(DataType.findTypeName(fs.type)) ;
}
- // TODO: Support Map
}
}
@@ -886,8 +891,11 @@
public void add(FieldSchema f) {
mFields.add(f);
- if (null != f.alias) {
- mAliases.put(f.alias, f);
+ if(null != f) {
+ mFieldSchemas.put(f.canonicalName, f.alias);
+ if (null != f.alias) {
+ mAliases.put(f.alias, f);
+ }
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Fri Oct 10 16:06:16 2008
@@ -1963,8 +1963,7 @@
} catch (FrontendException fe) {
String msg = "Cannot resolve COGroup output schema" ;
msgCollector.collect(msg, MessageType.Error) ;
- VisitorException vse = new VisitorException(msg) ;
- vse.initCause(fe) ;
+ VisitorException vse = new VisitorException(msg, fe) ;
throw vse ;
}
MultiMap<LogicalOperator, LogicalPlan> groupByPlans
@@ -2034,7 +2033,7 @@
byte innerType = innerPlan.getSingleLeafPlanOutputType() ;
byte expectedType = DataType.BYTEARRAY ;
- if (!DataType.isAtomic(innerType)) {
+ if (!DataType.isAtomic(innerType) && (DataType.TUPLE != innerType)) {
String msg = "Sorry, group by complex types"
+ " will be supported soon" ;
msgCollector.collect(msg, MessageType.Error) ;
@@ -2065,8 +2064,7 @@
catch (FrontendException fe) {
String msg = "Cannot resolve COGroup output schema" ;
msgCollector.collect(msg, MessageType.Error) ;
- VisitorException vse = new VisitorException(msg) ;
- vse.initCause(fe) ;
+ VisitorException vse = new VisitorException(msg, fe) ;
throw vse ;
}
@@ -2633,9 +2631,7 @@
|| (op instanceof LOSplitOutput)
|| (op instanceof LOLimit)) {
LogicalPlan lp = op.getPlan();
- LoadFunc lf = getLoadFunc(lp.getPredecessors(op).get(0), parentCanonicalName);
- return lf;
- //return getLoadFunc(lp.getPredecessors(op).get(0), parentCanonicalName);
+ return getLoadFunc(lp.getPredecessors(op).get(0), parentCanonicalName);
}
Schema s = op.getSchema();
@@ -2711,4 +2707,5 @@
throw new FrontendException("Found more than one load function to use: " + loadFuncMap.keySet());
}
+
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Fri Oct 10 16:06:16 2008
@@ -44,7 +44,6 @@
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.ExecType;
-//import org.apache.pig.impl.builtin.ShellBagEvalFunc;
import org.apache.pig.impl.builtin.GFAny;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.plan.OperatorKey;
@@ -60,6 +59,7 @@
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
import org.apache.pig.impl.logicalLayer.parser.ParseException ;
+import org.apache.pig.impl.util.MultiMap;
public class TestLogicalPlanBuilder extends junit.framework.TestCase {
@@ -290,6 +290,16 @@
}
@Test
+ public void testQuery22Fail() {
+ buildPlan("A = load 'a';");
+ try {
+ buildPlan("B = group A by (*, $0);");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
+ }
+ }
+
+ @Test
public void testQuery23() {
buildPlan("A = load 'a';");
buildPlan("B = load 'b';");
@@ -312,6 +322,17 @@
}
@Test
+ public void testQuery23Fail() {
+ buildPlan("A = load 'a';");
+ buildPlan("B = load 'b';");
+ try {
+ buildPlan("C = group A by (*, $0), B by ($0, $1);");
+ } catch (AssertionFailedError e) {
+ assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
+ }
+ }
+
+ @Test
public void testQuery24() {
buildPlan("a = load 'a';");
@@ -1525,12 +1546,12 @@
// by unambiguous free form alias, fully qualified alias
// and partially qualified unambiguous alias
String query = "a = load 'st10k' as (name, age, gpa);" +
-"b = group a by name;" +
-"c = foreach b generate flatten(a);" +
-"d = filter c by name != 'fred';" +
-"e = group d by name;" +
-"f = foreach e generate flatten(d);" +
-"g = foreach f generate name, d::a::name, a::name;";
+ "b = group a by name;" +
+ "c = foreach b generate flatten(a);" +
+ "d = filter c by name != 'fred';" +
+ "e = group d by name;" +
+ "f = foreach e generate flatten(d);" +
+ "g = foreach f generate name, d::a::name, a::name;";
buildPlan(query);
}
@@ -1539,12 +1560,145 @@
// test that the alias "group" can be used
// after a flatten(group)
String query = "a = load 'st10k' as (name, age, gpa);" +
-"b = group a by name;" +
-"c = foreach b generate flatten(group), COUNT(a) as cnt;" +
-"d = foreach c generate group;";
+ "b = group a by name;" +
+ "c = foreach b generate flatten(group), COUNT(a) as cnt;" +
+ "d = foreach c generate group;";
buildPlan(query);
}
+ @Test
+ public void testQuery106() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOForEach foreach;
+
+ buildPlan("a = load 'one' as (name, age, gpa);");
+
+ lp = buildPlan("b = foreach a generate *;");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+ assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), false, true));
+
+ }
+
+ @Test
+ public void testQuery107() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOForEach foreach;
+
+ buildPlan("a = load 'one';");
+
+ lp = buildPlan("b = foreach a generate *;");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ assertTrue(checkPlanForProjectStar(foreachPlan));
+
+ }
+
+ @Test
+ public void testQuery108() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOCogroup cogroup;
+
+ buildPlan("a = load 'one' as (name, age, gpa);");
+
+ lp = buildPlan("b = group a by *;");
+ cogroup = (LOCogroup) lp.getLeaves().get(0);
+ Schema groupSchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+ Schema bagASchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+ Schema.FieldSchema groupFs = new Schema.FieldSchema("group", groupSchema, DataType.TUPLE);
+ Schema.FieldSchema bagAFs = new Schema.FieldSchema("a", bagASchema, DataType.BAG);
+ Schema expectedSchema = new Schema(groupFs);
+ expectedSchema.add(bagAFs);
+ assertTrue(Schema.equals(cogroup.getSchema(), expectedSchema, false, true));
+
+ }
+
+ @Test
+ public void testQuery109() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOCogroup cogroup;
+
+ buildPlan("a = load 'one' as (name, age, gpa);");
+ buildPlan("b = load 'two' as (first_name, enrol_age, high_school_gpa);");
+
+ lp = buildPlan("c = group a by *, b by *;");
+ cogroup = (LOCogroup) lp.getLeaves().get(0);
+ Schema groupSchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+ Schema bagASchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+ Schema bagBSchema = getSchemaFromString("first_name: bytearray, enrol_age: bytearray, high_school_gpa: bytearray");
+ Schema.FieldSchema groupFs = new Schema.FieldSchema("group", groupSchema, DataType.TUPLE);
+ Schema.FieldSchema bagAFs = new Schema.FieldSchema("a", bagASchema, DataType.BAG);
+ Schema.FieldSchema bagBFs = new Schema.FieldSchema("b", bagBSchema, DataType.BAG);
+ Schema expectedSchema = new Schema(groupFs);
+ expectedSchema.add(bagAFs);
+ expectedSchema.add(bagBFs);
+ assertTrue(Schema.equals(cogroup.getSchema(), expectedSchema, false, true));
+
+ }
+
+ @Test
+ public void testQuery110() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOLoad load;
+ LOCogroup cogroup;
+
+ buildPlan("a = load 'one' as (name, age, gpa);");
+ lp = buildPlan("b = load 'two';");
+
+ load = (LOLoad) lp.getLeaves().get(0);
+
+ lp = buildPlan("c = cogroup a by $0, b by *;");
+ cogroup = (LOCogroup) lp.getLeaves().get(0);
+
+ MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cogroup.getGroupByPlans();
+ LogicalPlan cogroupPlan = (LogicalPlan)(mapGByPlans.get(load).toArray())[0];
+ assertTrue(checkPlanForProjectStar(cogroupPlan) == true);
+
+ }
+
+ @Test
+ public void testQuery111() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOSort sort;
+
+ buildPlan("a = load 'one' as (name, age, gpa);");
+
+ lp = buildPlan("b = order a by *;");
+ sort = (LOSort) lp.getLeaves().get(0);
+
+ for(LogicalPlan sortPlan: sort.getSortColPlans()) {
+ assertTrue(checkPlanForProjectStar(sortPlan) == false);
+ }
+
+ }
+
+ @Test
+ public void testQuery112() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOForEach foreach;
+ LOSort sort;
+
+ buildPlan("a = load 'one' as (name, age, gpa);");
+
+ buildPlan("b = group a by *;");
+ lp = buildPlan("c = foreach b {a1 = order a by *; generate a1;};");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+
+ for(LogicalPlan foreachPlan: foreach.getForEachPlans()) {
+ printPlan(foreachPlan);
+ assertTrue(checkPlanForProjectStar(foreachPlan) == true);
+ }
+
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ sort = (LOSort)foreachPlan.getPredecessors(foreachPlan.getLeaves().get(0)).get(0);
+
+ for(LogicalPlan sortPlan: sort.getSortColPlans()) {
+ assertTrue(checkPlanForProjectStar(sortPlan) == true);
+ }
+
+ }
+
private Schema getSchemaFromString(String schemaString) throws ParseException {
return getSchemaFromString(schemaString, DataType.BYTEARRAY);
}
@@ -1568,6 +1722,20 @@
System.err.println();
}
+ private boolean checkPlanForProjectStar(LogicalPlan lp) {
+ List<LogicalOperator> leaves = lp.getLeaves();
+
+ for(LogicalOperator op: leaves) {
+ if(op instanceof LOProject) {
+ if(((LOProject) op).isStar()) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
// Helper Functions
// Helper Functions
@@ -1598,24 +1766,6 @@
//System.err.println("Query: " + query);
- //Just the top level roots and their children
- //Need a recursive one to travel down the tree
- /*
- for(LogicalOperator op: lp.getRoots()) {
- System.err.println("Logical Plan Root: " + op.getClass().getName() + " object " + op);
-
- List<LogicalOperator> listOp = lp.getSuccessors(op);
-
- if(null != listOp) {
- Iterator<LogicalOperator> iter = listOp.iterator();
- while(iter.hasNext()) {
- LogicalOperator lop = iter.next();
- System.err.println("Successor: " + lop.getClass().getName() + " object " + lop);
- }
- }
- }
- */
-
assertNotNull(lp != null);
return lp;
} catch (IOException e) {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java Fri Oct 10 16:06:16 2008
@@ -3009,6 +3009,72 @@
}
@Test
+ public void testGroupLineageStar() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (name, age, gpa);") ;
+ planTester.buildPlan("b = group a by *;") ;
+ planTester.buildPlan("c = foreach b generate flatten(group);") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $0 + 1;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ }
+
+ @Test
+ public void testGroupLineageStarNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = group a by *;") ;
+ planTester.buildPlan("c = foreach b generate flatten(group);") ;
+ LogicalPlan plan = planTester.buildPlan("d = foreach c generate $0 + 1;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ }
+
+ @Test
public void testCogroupLineage() throws Throwable {
planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
@@ -3090,6 +3156,385 @@
}
@Test
+ public void testCogroupStarLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'b' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by *, b by * ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, field1 + 1, field4 + 2.0;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ //not good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupStarLineageFail() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'b' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by *, b by * ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, field1 + 1, field4 + 2.0;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ try {
+ typeValidator.validate(plan, collector) ;
+ fail("Exception expected") ;
+ }
+ catch (PlanValidationException pve) {
+ //not good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
+ }
+
+ @Test
+ public void testCogroupStarLineage1() throws Throwable {
+ planTester.buildPlan("a = load 'a' using PigStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'b' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by *, b by * ;") ;
+ planTester.buildPlan("d = foreach c generate flatten(group), flatten($1), flatten($2);") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate $0 + 1, a::field1 + 1, field4 + 2.0;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ //not good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(1);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupStarLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by *, b by * ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, $1 + 1, $2 + 2.0;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ //not good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupStarLineageNoSchemaFail() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by *, b by * ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
+ LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, $1 + 1, $2 + 2.0;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ try {
+ typeValidator.validate(plan, collector) ;
+ fail("Exception expected") ;
+ }
+ catch (PlanValidationException pve) {
+ //not good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (!collector.hasError()) {
+ throw new AssertionError("Expect error") ;
+ }
+
+ }
+
+ @Test
+ public void testCogroupMultiColumnProjectLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'b' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, a.(field1, field2), b.(field4);") ;
+ planTester.buildPlan("e = foreach d generate group, flatten($1), flatten($2);") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ //not good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupProjectStarLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'b' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate * ;") ;
+ planTester.buildPlan("f = foreach d generate group, flatten(a), flatten(b) ;") ;
+ LogicalPlan plan = planTester.buildPlan("g = foreach f generate group, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ //not good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupProjectStarLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+ planTester.buildPlan("d = foreach c generate * ;") ;
+ planTester.buildPlan("f = foreach d generate group, flatten(a), flatten(b) ;") ;
+ LogicalPlan plan = planTester.buildPlan("g = foreach f generate group, $1 + 1, $2 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ //not good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+ }
+
+ @Test
+ public void testCogroupProjectStarLineageMixSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by field1, b by $0 ;") ;
+ planTester.buildPlan("d = foreach c generate * ;") ;
+ planTester.buildPlan("f = foreach d generate group, flatten(a), flatten(b) ;") ;
+ LogicalPlan plan = planTester.buildPlan("g = foreach f generate group, field1 + 1, $4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ try {
+ typeValidator.validate(plan, collector) ;
+ }
+ catch (PlanValidationException pve) {
+ //not good
+ }
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+ }
+
+ @Test
public void testCogroupLineageFail() throws Throwable {
planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
@@ -4168,6 +4613,88 @@
}
@Test
+ public void testCogroupSortStarLineage() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+ planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = order d by * desc;") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
+ public void testCogroupSortStarLineageNoSchema() throws Throwable {
+ planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+ planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+ planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+ planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b) ;") ;
+ planTester.buildPlan("e = order d by * desc;") ;
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, $1 + 1, $2 + 2.0 ;") ;
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+ LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+ LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+ foreachPlan = foreach.getForEachPlans().get(2);
+ exOp = foreachPlan.getRoots().get(0);
+ if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+ cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+ assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+ }
+
+ @Test
public void testCrossLineage() throws Throwable {
planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;