You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/05/12 23:14:01 UTC
svn commit: r1102461 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/builtin/ src/org/apache/pig/newplan/logical/optimizer/
src/org/apache/pig/newplan/logic...
Author: thejas
Date: Thu May 12 21:14:00 2011
New Revision: 1102461
URL: http://svn.apache.org/viewvc?rev=1102461&view=rev
Log:
PIG-1938: support project-range as udf argument
Added:
pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjStarInUdfExpander.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpanderUtil.java
pig/trunk/test/org/apache/pig/test/TestProjectStarRangeInUdf.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/builtin/TOBAG.java
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java
pig/trunk/src/org/apache/pig/parser/AstPrinter.g
pig/trunk/src/org/apache/pig/parser/AstValidator.g
pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
pig/trunk/src/org/apache/pig/parser/QueryParser.g
pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
pig/trunk/test/org/apache/pig/test/TestProjectRange.java
pig/trunk/test/org/apache/pig/test/Util.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu May 12 21:14:00 2011
@@ -42,6 +42,8 @@ PIG-2011: Speed up TestTypedMap.java (dv
BUG FIXES
+PIG-1938: support project-range as udf argument (thejas)
+
PIG-2048: Add zookeeper to pig jar (gbowyer via gates)
PIG-2008: Cache outputFormat in HBaseStorage (thedatachef via gates)
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Thu May 12 21:14:00 2011
@@ -90,6 +90,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.visitor.CastLineageSetter;
import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
+import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
import org.apache.pig.newplan.logical.visitor.ScalarVisitor;
import org.apache.pig.newplan.logical.visitor.SchemaAliasVisitor;
import org.apache.pig.newplan.logical.visitor.TypeCheckingRelVisitor;
@@ -1665,6 +1666,7 @@ public class PigServer {
}
private void compile(LogicalPlan lp) throws FrontendException {
+ new ProjStarInUdfExpander(lp).visit();
new ColumnAliasConversionVisitor( lp ).visit();
new SchemaAliasVisitor( lp ).visit();
new ScalarVisitor( lp, pigContext ).visit();
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu May 12 21:14:00 2011
@@ -164,7 +164,7 @@ public class POUserFunc extends Expressi
if(op instanceof POProject &&
op.getResultType() == DataType.TUPLE){
POProject projOp = (POProject)op;
- if(projOp.isStar()){
+ if(projOp.isProjectToEnd()){
Tuple trslt = (Tuple) temp.result;
Tuple rslt = (Tuple) res.result;
for(int i=0;i<trslt.size();i++) {
Modified: pig/trunk/src/org/apache/pig/builtin/TOBAG.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TOBAG.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TOBAG.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TOBAG.java Thu May 12 21:14:00 2011
@@ -99,21 +99,22 @@ public class TOBAG extends EvalFunc<Data
*
*/
@Override
- public Schema outputSchema(Schema input) {
+ public Schema outputSchema(Schema inputSch) {
byte type = DataType.ERROR;
Schema innerSchema = null;
-
- for(FieldSchema fs : input.getFields()){
- if(type == DataType.ERROR){
- type = fs.type;
- innerSchema = fs.schema;
- }else{
- if( type != fs.type || !nullEquals(innerSchema, fs.schema)){
- // invalidate the type
- type = DataType.ERROR;
- break;
- }
- }
+ if(inputSch != null){
+ for(FieldSchema fs : inputSch.getFields()){
+ if(type == DataType.ERROR){
+ type = fs.type;
+ innerSchema = fs.schema;
+ }else{
+ if( type != fs.type || !nullEquals(innerSchema, fs.schema)){
+ // invalidate the type
+ type = DataType.ERROR;
+ break;
+ }
+ }
+ }
}
try {
if(type == DataType.ERROR){
Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java Thu May 12 21:14:00 2011
@@ -56,8 +56,10 @@ public class ProjectionPatcher implement
@Override
public void visit(ProjectExpression p) throws FrontendException {
- // if projection is for everything, just return
- if (p.isProjectStar()) {
+ // if project is a project-star or range, ie it could not be expanded
+ // then its not possible to determine the matching input columns
+ // before runtime
+ if (p.isRangeOrStarProject()) {
return;
}
Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjStarInUdfExpander.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjStarInUdfExpander.java?rev=1102461&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjStarInUdfExpander.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjStarInUdfExpander.java Thu May 12 21:14:00 2011
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pig.PigException;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+
+/**
+ * Expand project-star or project-range when used as udf argument.
+ * This is different from {@link ProjectStarExpander} because in those
+ * cases, the project star gets expanded as new {@link LogicalExpressionPlan}.
+ * In case of project-star or project-range within udf, it should get expanded
+ * only as multiple inputs to this udf, no addtional {@link LogicalExpressionPlan}s
+ * are created.
+ * The expansion happens only if input schema is not null
+ */
+public class ProjStarInUdfExpander extends AllExpressionVisitor {
+
+ public ProjStarInUdfExpander(OperatorPlan plan) throws FrontendException {
+ super( plan, new DependencyOrderWalker( plan ) );
+ }
+
+ @Override
+ protected LogicalExpressionVisitor getVisitor(final LogicalExpressionPlan exprPlan)
+ throws FrontendException {
+ //This handles the expansion udf in all operators other than foreach
+ return new ProjExpanderForNonForeach(exprPlan);
+ }
+
+
+ /*
+ * LOForeach needs special handling because LOInnerLoad's inner ProjectExpression
+ * is the one that gets expanded
+ */
+ @Override
+ public void visit(LOForEach foreach) throws FrontendException{
+ LogicalPlan innerPlan = foreach.getInnerPlan();
+
+ //visit the inner plan first
+ PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
+ pushWalker(newWalker);
+ currentWalker.walk(this);
+ popWalker();
+
+ //get the LOGenerate
+ List<Operator> feOutputs = innerPlan.getSinks();
+ LOGenerate gen = null;
+ for( Operator op : feOutputs){
+ if(op instanceof LOGenerate){
+ if(gen != null){
+ String msg = "Expected single LOGenerate output in innerplan of foreach";
+ throw new VisitorException(foreach,
+ msg,
+ 2266,
+ PigException.BUG
+ );
+ }
+ gen = (LOGenerate) op;
+ }
+ }
+
+ List<Operator> loGenPreds = innerPlan.getPredecessors(gen);
+
+ if(loGenPreds == null){
+ // there are no LOInnerLoads , must be working on just constants
+ // no project-star expansion to be done
+ return;
+ }
+
+ //get mapping of LOGenerate predecessor current position to object
+ Map<Integer, LogicalRelationalOperator> oldPos2Rel =
+ new HashMap<Integer, LogicalRelationalOperator>();
+
+ for(int i=0; i<loGenPreds.size(); i++){
+ oldPos2Rel.put(i, (LogicalRelationalOperator) loGenPreds.get(i));
+ }
+
+ //store mapping between the projection in inner plans of
+ // of LOGenerate to the input relation object
+ Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel =
+ new HashMap<ProjectExpression, LogicalRelationalOperator>();
+
+ List<LOInnerLoad> expandedInLoads = new ArrayList<LOInnerLoad>();
+
+ //visit each expression plan, and expand the projects in the udf
+ for( OperatorPlan plan : gen.getOutputPlans()){
+ ProjExpanderForForeach projExpander = new ProjExpanderForForeach(
+ plan,
+ gen,
+ oldPos2Rel,
+ proj2InpRel,
+ foreach,
+ expandedInLoads
+ );
+ projExpander.visit();
+ }
+
+ //remove the LOInnerLoads that have been expanded
+ for(LOInnerLoad inLoad : expandedInLoads){
+ innerPlan.disconnect(inLoad, gen);
+ innerPlan.remove(inLoad);
+ }
+
+ //reset the input relation position in the projects
+ //get mapping of LoGenerate input relation to current position
+ Map<LogicalRelationalOperator, Integer> rel2pos = new HashMap<LogicalRelationalOperator, Integer>();
+ List<Operator> newGenPreds = innerPlan.getPredecessors(gen);
+ int numNewGenPreds = 0;
+ if(newGenPreds != null)
+ numNewGenPreds = newGenPreds.size();
+
+ for(int i=0; i<numNewGenPreds; i++){
+ rel2pos.put((LogicalRelationalOperator) newGenPreds.get(i),i);
+ }
+
+ //correct the input num for projects
+ for(Entry<ProjectExpression, LogicalRelationalOperator> projAndInp : proj2InpRel.entrySet()){
+ ProjectExpression proj = projAndInp.getKey();
+ LogicalRelationalOperator rel = projAndInp.getValue();
+ proj.setInputNum(rel2pos.get(rel));
+ }
+ }
+
+ @Override
+ public void visit(LOGenerate gen) throws FrontendException{
+
+ }
+}
+
+class ProjExpanderForForeach extends LogicalExpressionVisitor{
+
+ private LOGenerate loGen;
+ private LogicalPlan innerRelPlan;
+ private Map<Integer, LogicalRelationalOperator> oldPos2Rel;
+ private Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel;
+ private LOForEach foreach;
+ private List<LOInnerLoad> expandedInLoads;
+
+ protected ProjExpanderForForeach(
+ OperatorPlan p,
+ LOGenerate loGen,
+ Map<Integer, LogicalRelationalOperator> oldPos2Rel,
+ Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel,
+ LOForEach foreach,
+ List<LOInnerLoad> expandedInLoads
+ )
+ throws FrontendException {
+ super(p, new ReverseDependencyOrderWalker(p));
+ this.loGen = loGen;
+ this.innerRelPlan = (LogicalPlan) loGen.getPlan();
+ this.oldPos2Rel = oldPos2Rel;
+ this.proj2InpRel = proj2InpRel;
+ this.foreach = foreach;
+ this.expandedInLoads = expandedInLoads;
+
+ }
+
+ @Override
+ public void visit(UserFuncExpression func) throws FrontendException{
+ if(plan.getSuccessors(func) == null){
+ // no args for the udf, so nothing to do
+ return;
+ }
+ List<Operator> inputs = new ArrayList<Operator>(plan.getSuccessors(func));
+
+ // expandedProjectStars will be removed from plan
+ List<Operator> expandedProjectStars = new ArrayList<Operator>();
+
+ // new projects to be added to the plan
+ List<Operator> newExpandedProjects = new ArrayList<Operator>();
+
+ //new set of inputs
+ List<Operator> newInputs = new ArrayList<Operator>();
+
+
+
+ for(Operator inp : inputs){
+ if(inp instanceof ProjectExpression && ((ProjectExpression)inp).isRangeOrStarProject()
+ && oldPos2Rel.get(((ProjectExpression)inp).getInputNum()) instanceof LOInnerLoad
+ ){
+ //under foreach the ProjectExpression is always a project-star,
+ // need to check if the input relation is a LOInnerLoad
+ // containing project-star/range
+ LOInnerLoad inLoad =
+ (LOInnerLoad)oldPos2Rel.get(((ProjectExpression)inp).getInputNum());
+
+ ProjectExpression innerProj = inLoad.getProjection();
+ if(!innerProj.isRangeOrStarProject()){
+ newInputs.add(inp);
+ continue;
+ }
+
+ //try expanding the project-star/range
+ List<Operator> expandedOps = expandProjectStar(innerProj);
+ if(expandedOps != null){
+ expandedProjectStars.add(inp);//to remove
+ expandedInLoads.add(inLoad);//to remove
+ newInputs.addAll(expandedOps);
+ newExpandedProjects.addAll(expandedOps);//to add
+ }else {
+ newInputs.add(inp);
+ }
+ }else{
+ newInputs.add(inp);
+ }
+ }
+
+ //make changes to the plan if there is a project that was expanded
+ if(expandedProjectStars.size() > 0){
+
+ //disconnect old inputs
+ for(Operator inp : inputs){
+ plan.disconnect(func, inp);
+ }
+
+ //remove expanded projects
+ for(Operator op : expandedProjectStars){
+ plan.remove(op);
+ proj2InpRel.remove(op);
+ }
+
+ //add new projects
+ for(Operator op : newExpandedProjects){
+ plan.add(op);
+ }
+
+ //connect new inputs
+ for(Operator newInp : newInputs){
+ plan.connect(func, newInp);
+ }
+ }
+
+
+ }
+
+ @Override
+ public void visit(ProjectExpression proj){
+ //add project to LOInnerLoad mapping so that the input number can be
+ //corrected later
+ proj2InpRel.put(proj, oldPos2Rel.get(proj.getInputNum()));
+ }
+
+ private List<Operator> expandProjectStar(ProjectExpression proj)
+ throws FrontendException {
+ Pair<Integer, Integer> firstLastCols =
+ ProjectStarExpanderUtil.getProjectStartEndCols((LogicalExpressionPlan)plan, proj);
+
+ if(firstLastCols == null){
+ //no expansion happening now
+ return null;
+ }
+
+ //expand from firstProjCol to lastProjCol
+ int firstProjCol = firstLastCols.first;
+ int lastProjCol = firstLastCols.second;
+
+
+ List<Operator> newProjects = new ArrayList<Operator>();
+ for(int i = firstProjCol; i <= lastProjCol; i++){
+ LOInnerLoad newILoad = new LOInnerLoad(innerRelPlan, foreach, i);
+ innerRelPlan.add(newILoad);
+ innerRelPlan.connect(newILoad, loGen);
+ ProjectExpression newProj = new ProjectExpression(plan, -2, -1, loGen) ;
+ proj2InpRel.put(newProj, newILoad);
+ newProjects.add(newProj);
+ }
+
+ return newProjects;
+
+ }
+}
+
+class ProjExpanderForNonForeach extends LogicalExpressionVisitor{
+
+ protected ProjExpanderForNonForeach(OperatorPlan p)
+ throws FrontendException {
+ super(p, new ReverseDependencyOrderWalker(p));
+ }
+
+ @Override
+ public void visit(UserFuncExpression func) throws FrontendException {
+ if(plan.getSuccessors(func) == null){
+ //udf without args, nothing to do
+ return;
+ }
+ List<Operator> inputs = new ArrayList<Operator>(plan.getSuccessors(func));
+
+ // expandedProjectStars will be removed from plan
+ List<Operator> expandedProjectStars = new ArrayList<Operator>();
+
+ // new projects to be added to the plan
+ List<Operator> newExpandedProjects = new ArrayList<Operator>();
+
+ //new set of inputs
+ List<Operator> newInputs = new ArrayList<Operator>();
+
+ for(Operator inp : inputs){
+ if(inp instanceof ProjectExpression && ((ProjectExpression)inp).isRangeOrStarProject() ){
+ //try expanding the project-star/range
+ List<Operator> expandedOps = expandProjectStar((ProjectExpression)inp);
+ if(expandedOps != null){
+ expandedProjectStars.add(inp);//to remove
+ newInputs.addAll(expandedOps);
+ newExpandedProjects.addAll(expandedOps);//to add
+ }else {
+ newInputs.add(inp);
+ }
+ }else{
+ newInputs.add(inp);
+ }
+ }
+
+
+ //make changes to the plan if there is a project that was expanded
+ if(expandedProjectStars.size() > 0){
+
+ //disconnect old inputs
+ for(Operator inp : inputs){
+ plan.disconnect(func, inp);
+ }
+
+ //remove expanded projects
+ for(Operator op : expandedProjectStars){
+ plan.remove(op);
+ }
+
+ //add new projects
+ for(Operator op : newExpandedProjects){
+ plan.add(op);
+ }
+
+ //connect new inputs
+ for(Operator newInp : newInputs){
+ plan.connect(func, newInp);
+ }
+ }
+
+ }
+
+ private List<Operator> expandProjectStar(
+ ProjectExpression proj) throws FrontendException {
+
+ Pair<Integer, Integer> firstLastCols =
+ ProjectStarExpanderUtil.getProjectStartEndCols((LogicalExpressionPlan)plan, proj);
+
+
+ if(firstLastCols == null){
+ //no expansion happening now
+ return null;
+ }
+ //expand from firstProjCol to lastProjCol
+ int firstProjCol = firstLastCols.first;
+ int lastProjCol = firstLastCols.second;
+
+
+ List<Operator> newProjects = new ArrayList<Operator>();
+ LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
+ for(int i = firstProjCol; i <= lastProjCol; i++){
+ newProjects.add(new ProjectExpression(plan, proj.getInputNum(), i, relOp));
+ }
+
+ return newProjects;
+ }
+
+
+};
\ No newline at end of file
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java Thu May 12 21:14:00 2011
@@ -27,6 +27,7 @@ import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.DepthFirstWalker;
import org.apache.pig.newplan.Operator;
@@ -488,46 +489,22 @@ public class ProjectStarExpander extends
private List<LogicalExpressionPlan> expandPlan(
LogicalExpressionPlan expPlan, ProjectExpression proj, int inputNum)
throws FrontendException {
- LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
-
- // list of inputs of attached relation
- List<Operator> inputRels = relOp.getPlan().getPredecessors(relOp);
-
- //the relation that is input to this project
- LogicalRelationalOperator inputRel =
- (LogicalRelationalOperator) inputRels.get(proj.getInputNum());
-
+
+ Pair<Integer, Integer> startAndEndProjs =
+ ProjectStarExpanderUtil.getProjectStartEndCols(expPlan, proj);
List<LogicalExpressionPlan> newPlans = new ArrayList<LogicalExpressionPlan>();
- LogicalSchema inputSchema = inputRel.getSchema();
- if(inputSchema == null &&
- (proj.isProjectStar() || (proj.isRangeProject() && proj.getEndCol() == -1))
- ){
- // can't expand if input schema is null and it is a project-star
- // or project-range-until-end
+ if(startAndEndProjs == null){
+ // can't expand this project
newPlans.add(expPlan);
return newPlans;
}
- //expand from firstProjCol to lastProjCol after setting their values
- int firstProjCol;
- int lastProjCol;
-
- //the range values are set in the project in LOInnerLoad
- if(proj.isRangeProject()){
- proj.setColumnNumberFromAlias();
- firstProjCol = proj.getStartCol();
-
- if(proj.getEndCol() >= 0)
- lastProjCol = proj.getEndCol();
- else
- lastProjCol = inputSchema.size() - 1;
- }else{
- //project-star
- firstProjCol = 0;
- lastProjCol = inputSchema.size() - 1;
- }
-
+ //expand from firstProjCol to lastProjCol
+ int firstProjCol = startAndEndProjs.first;
+ int lastProjCol = startAndEndProjs.second;
+
+ LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
for(int i = firstProjCol; i <= lastProjCol; i++){
newPlans.add(createExpPlanWithProj(relOp, inputNum, i));
}
Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpanderUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpanderUtil.java?rev=1102461&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpanderUtil.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpanderUtil.java Thu May 12 21:14:00 2011
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+
+/**
+ * Util function(s) for project-(star/range) expansion
+ */
+public class ProjectStarExpanderUtil{
+
+ /**
+ * If the argument project is a project-star or project-range that
+ * can be expanded, find the position of first and last columns
+ * it should project
+ * @param expPlan
+ * @param proj
+ * @return pair that has the first and last columns that need to be projected
+ * @throws FrontendException
+ */
+ static Pair<Integer, Integer> getProjectStartEndCols(
+ LogicalExpressionPlan expPlan, ProjectExpression proj)
+ throws FrontendException {
+
+ // get the input schema first
+
+ LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
+
+ // list of inputs of attached relation
+ List<Operator> inputRels = relOp.getPlan().getPredecessors(relOp);
+
+ //the relation that is input to this project
+ LogicalRelationalOperator inputRel =
+ (LogicalRelationalOperator) inputRels.get(proj.getInputNum());
+
+ LogicalSchema inputSchema = inputRel.getSchema();
+
+
+ if(inputSchema == null &&
+ (proj.isProjectStar() || (proj.isRangeProject() && proj.getEndCol() == -1))
+ ){
+ // can't expand if input schema is null and it is a project-star
+ // or project-range-until-end
+ return null;
+ }
+
+ //find first and last column in input schema to be projected
+ int firstProjCol;
+ int lastProjCol;
+
+ //the range values are set in the project in LOInnerLoad
+ if(proj.isRangeProject()){
+ proj.setColumnNumberFromAlias();
+ firstProjCol = proj.getStartCol();
+
+ if(proj.getEndCol() >= 0)
+ lastProjCol = proj.getEndCol();
+ else
+ lastProjCol = inputSchema.size() - 1;
+ }else{
+ //project-star
+ firstProjCol = 0;
+ lastProjCol = inputSchema.size() - 1;
+ }
+ return new Pair<Integer, Integer>(firstProjCol, lastProjCol);
+
+ }
+
+
+
+}
Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Thu May 12 21:14:00 2011
@@ -218,7 +218,7 @@ rel
;
flatten_generated_item
- : ( flatten_clause | expr | STAR { sb.append(" ").append($STAR.text); } ) ( { sb.append(" AS "); } field_def_list)?
+ : ( flatten_clause | col_range | expr | STAR { sb.append(" ").append($STAR.text); } ) ( { sb.append(" AS "); } field_def_list)?
;
flatten_clause
@@ -248,7 +248,7 @@ func_eval
;
real_arg
- : expr | STAR { sb.append($STAR.text); }
+ : expr | STAR { sb.append($STAR.text); } | col_range
;
expr
@@ -302,6 +302,9 @@ col_index
: DOLLARVAR { sb.append($DOLLARVAR.text); }
;
+col_range : ^(COL_RANGE col_ref? { sb.append(".."); } DOUBLE_PERIOD col_ref?)
+;
+
pound_proj
: ^( POUND { sb.append($POUND.text); }
( QUOTEDSTRING { sb.append($QUOTEDSTRING.text); } | NULL { sb.append($NULL.text); } ) )
@@ -332,7 +335,7 @@ order_by_clause
;
order_col
- : col_ref ( ASC { sb.append(" ").append($ASC.text); } | DESC { sb.append(" ").append($DESC.text); } )?
+ : (col_range | col_ref) ( ASC { sb.append(" ").append($ASC.text); } | DESC { sb.append(" ").append($DESC.text); } )?
;
distinct_clause
@@ -377,7 +380,7 @@ join_group_by_clause
;
join_group_by_expr
- : expr | STAR { sb.append($STAR.text); }
+ : col_range | expr | STAR { sb.append($STAR.text); }
;
union_clause
Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Thu May 12 21:14:00 2011
@@ -278,7 +278,7 @@ cond : ^( OR cond cond )
func_eval: ^( FUNC_EVAL func_name real_arg* )
;
-real_arg : expr | STAR
+real_arg : expr | STAR | col_range
;
expr : ^( PLUS expr expr )
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu May 12 21:14:00 2011
@@ -653,6 +653,7 @@ real_arg [LogicalExpressionPlan plan] re
$expr = builder.buildProjectExpr( new SourceLocation( (PigParserNode)$STAR ), $plan, $GScope::currentOp,
$statement::inputIndex, null, -1 );
}
+ | cr = col_range[$plan] { $expr = $cr.expr;}
;
expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Thu May 12 21:14:00 2011
@@ -386,7 +386,7 @@ real_arg_list : real_arg ( COMMA real_ar
-> real_arg+
;
-real_arg : expr | STAR
+real_arg : expr | STAR | col_range
;
null_check_cond : expr IS! NOT? NULL^
Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Thu May 12 21:14:00 2011
@@ -226,7 +226,7 @@ public class TestNewPlanFilterAboveForea
public void test2() throws Exception {
String query = "A =LOAD 'file.txt' AS (a:(u,v), b, c);" +
"B = FOREACH A GENERATE $0, b;" +
- "C = FILTER B BY " + SIZE.class.getName() +"(*) > 5;" +
+ "C = FILTER B BY " + SIZE.class.getName() +"(TOTUPLE(*)) > 5;" +
"STORE C INTO 'empty';";
LogicalPlan newLogicalPlan = buildPlan( query );
@@ -271,7 +271,7 @@ public class TestNewPlanFilterAboveForea
public void test4() throws Exception {
String query = "A =LOAD 'file.txt' AS (a:(u,v), b, c);" +
"B = FOREACH A GENERATE $0, b, flatten(1);" +
- "C = FILTER B BY " + SIZE.class.getName() +"(*) > 5;" +
+ "C = FILTER B BY " + SIZE.class.getName() +"(TOTUPLE(*)) > 5;" +
"STORE C INTO 'empty';";
LogicalPlan newLogicalPlan = buildPlan( query );
Modified: pig/trunk/test/org/apache/pig/test/TestProjectRange.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestProjectRange.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestProjectRange.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestProjectRange.java Thu May 12 21:14:00 2011
@@ -22,6 +22,7 @@ import static org.apache.pig.ExecType.LO
import static org.apache.pig.ExecType.MAPREDUCE;
import static org.junit.Assert.assertEquals;
+import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
@@ -80,10 +81,8 @@ public class TestProjectRange {
execType = PigServer.parseExecType(execTypeString);
}
- PrintWriter w = new PrintWriter(new FileWriter(INP_FILE_5FIELDS));
- w.println("10\t20\t30\t40\t50");
- w.println("11\t21\t31\t41\t51");
- w.close();
+ String[] input = {"10\t20\t30\t40\t50", "11\t21\t31\t41\t51"};
+ Util.createLocalInputFile(INP_FILE_5FIELDS, input);
if(execType == MAPREDUCE) {
cluster = MiniCluster.buildCluster();
@@ -107,7 +106,7 @@ public class TestProjectRange {
@AfterClass
public static void oneTimeTearDown() throws Exception {
-
+ new File(INP_FILE_5FIELDS).delete();
if(cluster != null)
cluster.shutDown();
}
@@ -415,11 +414,9 @@ public class TestProjectRange {
// without aliases
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
- + "f = foreach l1 generate ..$3 ;"
+ + "f = foreach l1 generate ..$3 as (a,b,c,d);"
;
- // the schema should be null, but that is not the case
- // see - PIG-1910
- //compileAndCompareSchema(null, query, "f");
+ compileAndCompareSchema("a : bytearray,b : bytearray,c : bytearray,d : bytearray", query, "f");
Util.registerMultiLineQuery(pigServer, query);
@@ -430,8 +427,8 @@ public class TestProjectRange {
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
- "(10,20,30,40)",
- "(11,21,31,41)",
+ "('10','20','30','40')",
+ "('11','21','31','41')",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
@@ -453,9 +450,7 @@ public class TestProjectRange {
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ "f = foreach l1 generate ..$0 as (first), $4.. as (last), $3 .., .. $1 ;"
;
- // the schema should be null, but that is not the case
- // see - PIG-1910
- //compileAndCompareSchema(null, query, "f");
+ compileAndCompareSchema((Schema)null, query, "f");
Util.registerMultiLineQuery(pigServer, query);
@@ -466,8 +461,8 @@ public class TestProjectRange {
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
- "(10,50,40,50,10,20)",
- "(11,51,41,51,11,21)",
+ "('10','50','40','50','10','20')",
+ "('11','51','41','51','11','21')",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
@@ -478,11 +473,8 @@ public class TestProjectRange {
* @throws IOException
* @throws ParserException
*/
- //@Test
+ @Test
public void testRangeForeachWFilterNOSchema() throws IOException, ParserException {
- //TODO: fix depends on PIG-1910
- //fails with - In alias fil, incompatible types in GreaterThan
- // Operator left hand side:Unknown right hand side:Unknown
String query;
query =
@@ -494,12 +486,12 @@ public class TestProjectRange {
Util.registerMultiLineQuery(pigServer, query);
pigServer.explain("fil", System.err);
- Iterator<Tuple> it = pigServer.openIterator("f");
+ Iterator<Tuple> it = pigServer.openIterator("fil");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
- "(11,51,41,51,11,21)",
+ "('11','51','41','51','11','21')",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
Added: pig/trunk/test/org/apache/pig/test/TestProjectStarRangeInUdf.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestProjectStarRangeInUdf.java?rev=1102461&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestProjectStarRangeInUdf.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestProjectStarRangeInUdf.java Thu May 12 21:14:00 2011
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.LOCAL;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Test Project-(star/range) expansion when used as udf argument
+ */
+public class TestProjectStarRangeInUdf {
+
+ protected final Log log = LogFactory.getLog(getClass());
+
+ protected static PigServer pigServer;
+ private static final String INP_FILE_5FIELDS = "TestProjectRange_5fields";
+
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ String[] input = {"10\t20\t30\t40\t50", "11\t21\t31\t41\t51"};
+ Util.createLocalInputFile(INP_FILE_5FIELDS, input);
+ }
+
+ @Before
+ public void setup() throws ExecException{
+ pigServer = new PigServer(LOCAL);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ new File(INP_FILE_5FIELDS).delete();
+ }
+
+ @Test
+ public void testProjStarExpandInForeach1() throws IOException{
+ //star expansion lets CONCAT be used if input has two cols
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a, b);"
+ + "f = foreach l1 generate CONCAT(*) as ct;"
+ ;
+ compileAndCompareSchema("ct : bytearray", query, "f");
+ }
+
+ @Test
+ public void testProjStarExpandInForeach1Negative() throws IOException{
+ //star expansion gives 3 columns, so CONCAT(*) gives error
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a, b, c);"
+ + "f = foreach l1 generate CONCAT(*) as ct;"
+ ;
+ Util.checkExceptionMessage(query, "f",
+ "Could not infer the matching function for " +
+ "org.apache.pig.builtin.CONCAT");
+ }
+
+ @Test
+ public void testProjStarExpandInForeach2() throws IOException {
+
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int);"
+ + "f = foreach l1 generate TOTUPLE(*) as tb;"
+ ;
+ compileAndCompareSchema("tb : (a : int, b : int, c : int)", query, "f");
+ Iterator<Tuple> it = pigServer.openIterator("f");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "((10,20,30))",
+ "((11,21,31))",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+ @Test
+ public void testProjStarExpandInFilter1() throws IOException{
+ //TOBAG has * and a bincond expression as argument
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int);"
+ + "f = filter l1 by SUM(TOBAG((a == 10 ? 100 : 0), *)) == 130;"
+ ;
+ compileAndCompareSchema("a : int, b : int", query, "f");
+ Iterator<Tuple> it = pigServer.openIterator("f");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(10,20)",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+ }
+
+ @Test
+ public void testProjRangeExpandInFilterNoSchema1() throws IOException{
+ //star expansion lets CONCAT be used if input has two cols
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' ;"
+ + "f = filter l1 by SUM(TOBAG($0 .. $1)) == 30;"
+ ;
+ compileAndCompareSchema((Schema)null, query, "f");
+ Iterator<Tuple> it = pigServer.openIterator("f");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStringAsByteArray(
+ new String[] {
+ "('10','20','30','40','50')",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+ }
+
+ /**
+ * Test project-range in foreach with limits on both sides
+ * @throws IOException
+ * @throws ParseException
+ */
+ @Test
+ public void testProjRangeExpandInForeach() throws IOException {
+
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a, b : chararray, c : chararray, d);"
+ + "f = foreach l1 generate CONCAT($1 .. $2) as ct;"
+ ;
+ compileAndCompareSchema("ct : chararray", query, "f");
+ Iterator<Tuple> it = pigServer.openIterator("f");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('2030')",
+ "('2131')",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+ @Test
+ public void testProjRangeExpandInJoin() throws IOException {
+
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a : chararray, b : chararray, c : chararray, d);"
+ + "f1 = foreach l1 generate a, b, c, '1' as num;"
+ + "l2 = load '" + INP_FILE_5FIELDS + "' as (a : chararray, b : chararray, c : chararray, d);"
+ + "f2 = foreach l1 generate c, a, b, '2' as num;"
+ + "j = join f1 by CONCAT(c, $0 .. $1), f2 by CONCAT($0, a .. b);"
+ ;
+ String schStr =
+ "f1::a : chararray, f1::b : chararray, f1::c : chararray, f1::num : chararray," +
+ "f2::c : chararray, f2::a : chararray, f2::b : chararray, f2::num : chararray";
+
+ compileAndCompareSchema(schStr, query, "j");
+ Iterator<Tuple> it = pigServer.openIterator("j");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('10', '20', '30', '1', '30', '10', '20', '2')",
+ "('11', '21', '31', '1', '31', '11', '21', '2')",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+
+ @Test
+ public void testProjMixExpand1() throws IOException {
+
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int);"
+ + "f = foreach l1 generate TOBAG(*, $0 .. $2) as tt;"
+ ;
+
+ compileAndCompareSchema("tt : {(NullAlias : int)}", query, "f");
+ Iterator<Tuple> it = pigServer.openIterator("f");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "({(10),(20),(30),(10),(20),(30)})",
+ "({(11),(21),(31),(11),(21),(31)})",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+ @Test
+ public void testProjMixExpand1NoSchema() throws IOException {
+
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "';"
+ + "f = foreach l1 generate TOBAG(*, $0 .. $2) as tt;"
+ ;
+ Schema sch = Utils.getSchemaFromString("tt : {(NullALias)}");
+ sch.getField(0).schema.getField(0).schema.getField(0).alias = null;
+ sch.getField(0).schema.getField(0).schema.getField(0).type = DataType.NULL;
+
+ compileAndCompareSchema(sch, query, "f");
+ Iterator<Tuple> it = pigServer.openIterator("f");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStringAsByteArray(
+ new String[] {
+ "({('10'),('20'),('30'),('40'),('50'),('10'),('20'),('30')})",
+ "({('11'),('21'),('31'),('41'),('51'),('11'),('21'),('31')})",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+ @Test
+ public void testProjMixExpand2() throws IOException {
+
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : int);"
+ + "f = foreach l1 generate TOTUPLE(1, $0 .. $1, 2+3, $2 .. , d - 1) as tt;"
+ ;
+
+ String schStr = "tt : (NullAliasA : int, a : int, b : int," +
+ " NullAliasB : int, c : int, d : int, NullAliasC : int)";
+ compileAndCompareSchema(schStr, query, "f");
+ Iterator<Tuple> it = pigServer.openIterator("f");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "((1,10,20,5,30,40,39))",
+ "((1,11,21,5,31,41,40))",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+ @Test
+ public void testProjMixExpand2NoSchema() throws IOException {
+
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' ;"
+ + "f = foreach l1 generate TOTUPLE(1, $0 .. $1, 2+3, $2 .. , $4 -1) as tt;"
+ ;
+
+ compileAndCompareSchema("tt :()", query, "f");
+ pigServer.explain("f", System.out);
+ Iterator<Tuple> it = pigServer.openIterator("f");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStringAsByteArray(
+ new String[] {
+ "((1,'10','20',5,'30','40','50',49))",
+ "((1,'11','21',5,'31','41','51',50))",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+ @Test
+ public void testProjMixExpand3() throws IOException {
+
+ String query;
+
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : chararray, d : chararray);"
+ + "f = foreach l1 generate TOTUPLE($0 .. $1, CONCAT($2 .. )) as tt;"
+ ;
+
+ String schStr = "tt : (a : int, b : int, NullAlias : chararray)";
+ compileAndCompareSchema(schStr, query, "f");
+ Iterator<Tuple> it = pigServer.openIterator("f");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "((10,20,'3040'))",
+ "((11,21,'3141'))",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+
+ private void compileAndCompareSchema(String expectedSchStr, String query, String alias)
+ throws IOException {
+
+ Schema expectedSch = null;
+
+ if(expectedSchStr != null)
+ expectedSch = Utils.getSchemaFromString(expectedSchStr);
+ Util.schemaReplaceNullAlias(expectedSch);
+ compileAndCompareSchema(expectedSch, query, alias);
+
+ }
+
+ private void compileAndCompareSchema(Schema expectedSch, String query,
+ String alias) throws IOException {
+ Util.registerMultiLineQuery(pigServer, query);
+
+ Schema sch = pigServer.dumpSchema(alias);
+ assertEquals("Checking expected schema", expectedSch, sch);
+ }
+
+}
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Thu May 12 21:14:00 2011
@@ -76,6 +76,8 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
@@ -621,6 +623,12 @@ public class Util {
return queryParser.parseConstant(pigConstantAsString);
}
+ /**
+ * Parse list of strings in to list of tuples, convert quoted strings into
+ * @param tupleConstants
+ * @return
+ * @throws ParserException
+ */
public static List<Tuple> getTuplesFromConstantTupleStrings(String[] tupleConstants) throws ParserException {
List<Tuple> result = new ArrayList<Tuple>(tupleConstants.length);
for(int i = 0; i < tupleConstants.length; i++) {
@@ -629,19 +637,52 @@ public class Util {
return result;
}
+ /**
+ * Parse list of strings in to list of tuples, convert quoted strings into
+ * DataByteArray
+ * @param tupleConstants
+ * @return
+ * @throws ParserException
+ * @throws ExecException
+ */
public static List<Tuple> getTuplesFromConstantTupleStringAsByteArray(String[] tupleConstants)
throws ParserException, ExecException {
List<Tuple> tuples = getTuplesFromConstantTupleStrings(tupleConstants);
for(Tuple t : tuples){
- for(int i=0; i<t.size(); i++){
- DataByteArray dba = (t.get(i) == null) ?
- null : new DataByteArray(t.get(i).toString().getBytes());
- t.set(i, dba);
- }
+ convertStringToDataByteArray(t);
}
return tuples;
}
+ /**
+ * Convert String objects in argument t to DataByteArray objects
+ * @param t
+ * @throws ExecException
+ */
+ private static void convertStringToDataByteArray(Tuple t) throws ExecException {
+ if(t == null)
+ return;
+ for(int i=0; i<t.size(); i++){
+ Object col = t.get(i);
+ if(col == null)
+ continue;
+ if(col instanceof String){
+ DataByteArray dba = (col == null) ?
+ null : new DataByteArray((String)col);
+ t.set(i, dba);
+ }else if(col instanceof Tuple){
+ convertStringToDataByteArray((Tuple)col);
+ }else if(col instanceof DataBag){
+ Iterator<Tuple> it = ((DataBag)col).iterator();
+ while(it.hasNext()){
+ convertStringToDataByteArray((Tuple)it.next());
+ }
+ }
+
+
+ }
+ }
+
public static File createFile(String[] data) throws Exception{
File f = File.createTempFile("tmp", "");
PrintWriter pw = new PrintWriter(f);
@@ -951,4 +992,23 @@ public class Util {
return lp;
}
+
+ /**
+ * Replaces any alias in given schema that has name that starts with
+ * "NullAlias" with null . it does a case insensitive comparison of
+ * the alias name
+ * @param sch
+ */
+ public static void schemaReplaceNullAlias(Schema sch){
+ if(sch == null)
+ return ;
+ for(FieldSchema fs : sch.getFields()){
+ if(fs.alias != null && fs.alias.toLowerCase().startsWith("nullalias")){
+ fs.alias = null;
+ }
+ schemaReplaceNullAlias(fs.schema);
+ }
+ }
+
+
}