You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/01/04 23:30:58 UTC
svn commit: r895805 - in /hadoop/pig/branches/load-store-redesign:
src/org/apache/pig/ src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/optimizer/ test/org/apache/pig/test/
Author: pradeepkth
Date: Mon Jan 4 22:30:57 2010
New Revision: 895805
URL: http://svn.apache.org/viewvc?rev=895805&view=rev
Log:
PIG-1090: Update sources to reflect recent changes in load-store interfaces - changes to implement interactions with LoadMetadata interface from pig runtime code -committing files missed last time (pradeepkth)
Added:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Expression.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/PColFilterExtractor.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPartitionFilterOptimization.java
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Expression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Expression.java?rev=895805&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Expression.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/Expression.java Mon Jan 4 22:30:57 2010
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+
+/**
+ * A class to communicate Filter expressions to LoadFuncs
+ */
+public abstract class Expression {
+
+ // Operator type
+ public static enum OpType {
+
+ // binary arith ops
+ OP_PLUS (" + "),
+ OP_MINUS(" - "),
+ OP_TIMES(" * "),
+ OP_DIV(" / "),
+ OP_MOD(" % "),
+
+ //binary ops
+ OP_EQ(" == "),
+ OP_NE(" != "),
+ OP_GT(" > "),
+ OP_GE(" >= "),
+ OP_LT(" < "),
+ OP_LE(" <= "),
+
+ //binary logical
+ OP_AND(" and "),
+ OP_OR(" or "),
+ TERM_COL(" Column "),
+ TERM_CONST(" Constant ");
+
+ private String str = "";
+ private OpType(String rep){
+ this.str = rep;
+ }
+ private OpType(){
+ }
+
+ @Override
+ public String toString(){
+ return this.str;
+ }
+
+ }
+
+ protected OpType opType;
+
+ /**
+ * @return the opType
+ */
+ public OpType getOpType() {
+ return opType;
+ }
+
+
+
+
+ public static class BinaryExpression extends Expression {
+
+ /**
+ * left hand operand
+ */
+ Expression lhs;
+
+ /**
+ * right hand operand
+ */
+ Expression rhs;
+
+
+ /**
+ * @param lhs
+ * @param rhs
+ */
+ public BinaryExpression(Expression lhs, Expression rhs, OpType opType) {
+ this.opType = opType;
+ this.lhs = lhs;
+ this.rhs = rhs;
+ }
+
+ /**
+ * @return the left hand operand
+ */
+ public Expression getLhs() {
+ return lhs;
+ }
+
+ /**
+ * @return the right hand operand
+ */
+ public Expression getRhs() {
+ return rhs;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + lhs.toString() + opType.toString() + rhs.toString()
+ + ")";
+ }
+ }
+
+ public static class Column extends Expression {
+
+ /**
+ * name of column
+ */
+ private String name;
+
+ /**
+ * @param name
+ */
+ public Column(String name) {
+ this.opType = OpType.TERM_COL;
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ /**
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @param name the name to set
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+ }
+
+ public static class Const extends Expression {
+
+ /**
+ * value of the constant
+ */
+ Object value;
+
+ /**
+ * @return the value
+ */
+ public Object getValue() {
+ return value;
+ }
+
+ /**
+ * @param value
+ */
+ public Const(Object value) {
+ this.opType = OpType.TERM_CONST;
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return (value instanceof String) ? "\'" + value + "\'":
+ value.toString();
+ }
+ }
+
+}
+
+
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/PColFilterExtractor.java?rev=895805&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/PColFilterExtractor.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/PColFilterExtractor.java Mon Jan 4 22:30:57 2010
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.Expression;
+import org.apache.pig.PigException;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This Visitor works on the filter condition of a LOFilter which immediately
+ * follows a LOLoad that interacts with a metadata system (currently OWL) to
+ * read table data. The visitor looks for conditions on partition columns in the
+ * filter condition and extracts those conditions out of the filter condition.
+ * The condition on partition cols will be used to prune partitions of the table.
+ *
+ */
+public class PColFilterExtractor extends LOVisitor {
+
+ /**
+ * partition columns associated with the table
+ * present in the load on which the filter whose
+ * inner plan is being visited is applied
+ */
+ private List<String> partitionCols;
+
+ /**
+ * will contain the partition column filter conditions
+ * accumulated during the visit - the final condition will an expression
+ * built from these sub expressions connected with AND
+ */
+ private ArrayList<Expression> pColConditions = new ArrayList<Expression>();
+
+ /**
+ * flag used during visit to indicate if a partition key
+ * was seen
+ */
+ private boolean sawKey;
+
+ private boolean sawNonKeyCol;
+
+ private enum Side { LEFT, RIGHT, NONE };
+ private Side replaceSide = Side.NONE;
+
+ private boolean filterRemovable = false;
+
+ @Override
+ public void visit() throws VisitorException {
+ // we will visit the leaf and it will recursively walk the plan
+ try {
+ ExpressionOperator leaf = (ExpressionOperator)mPlan.getLeaves().get(0);
+ // if the leaf is a unary operator it should be a FilterFunc in
+ // which case we don't try to extract partition filter conditions
+ if(leaf instanceof BinaryExpressionOperator) {
+ visit((BinaryExpressionOperator)leaf);
+ replaceChild(leaf);
+ // if the entire expression is to be removed, then the above
+ // replaceChild will not set sawKey to false (sawKey is set to
+ // false only in replaceChild()
+ if(sawKey == true) {
+ //there are only conditions on partition columns in the filter
+ //extract it
+ pColConditions.add(getExpression(leaf));
+ filterRemovable = true;
+ }
+ }
+ } catch (FrontendException e) {
+ throw new VisitorException(e);
+ }
+
+ }
+
+ /**
+ *
+ * @param plan logical plan corresponding the filter's comparison condition
+ * @param partitionCols list of partition columns of the table which is
+ * being loaded in the LOAD statement which is input to the filter
+ */
+ public PColFilterExtractor(LogicalPlan plan,
+ List<String> partitionCols) {
+ // though we configure a DepthFirstWalker to be the walker, we will not
+ // use it - we will visit the leaf and it will recursively walk the
+ // plan
+ super(plan, new DepthFirstWalker<LogicalOperator,
+ LogicalPlan>(plan));
+ this.partitionCols = new ArrayList<String>(partitionCols);
+ }
+
+ @Override
+ protected void visit(LOProject project) throws VisitorException {
+ try {
+ String fieldName = project.getFieldSchema().alias;
+ if(partitionCols.contains(fieldName)) {
+ sawKey = true;
+ // The condition on partition column will be used to prune the
+ // scan and removed from the filter condition. Hence the condition
+ // on the partition column will not be re applied when data is read,
+ // so the following cases should throw error until that changes.
+ List<Class<?>> opsToCheckFor = new
+ ArrayList<Class<?>>();
+ opsToCheckFor.add(LORegexp.class);
+ int errCode = 1110;
+ if(checkSuccessors(project, opsToCheckFor)) {
+ throw new FrontendException("Unsupported query: " +
+ "You have an partition column ("
+ + fieldName + ") inside a regexp operator in the " +
+ "filter condition.", errCode, PigException.INPUT);
+ }
+ opsToCheckFor.set(0, LOUserFunc.class);
+ if(checkSuccessors(project, opsToCheckFor)) {
+ throw new FrontendException("Unsupported query: " +
+ "You have an partition column ("
+ + fieldName + ") inside a function in the " +
+ "filter condition.", errCode, PigException.INPUT);
+ }
+ opsToCheckFor.set(0, LOCast.class);
+ if(checkSuccessors(project, opsToCheckFor)) {
+ throw new FrontendException("Unsupported query: " +
+ "You have an partition column ("
+ + fieldName + ") inside a cast in the " +
+ "filter condition.", errCode, PigException.INPUT); }
+
+ opsToCheckFor.set(0, LOIsNull.class);
+ if(checkSuccessors(project, opsToCheckFor)) {
+ throw new FrontendException("Unsupported query: " +
+ "You have an partition column ("
+ + fieldName + ") inside a null check operator in the " +
+ "filter condition.", errCode, PigException.INPUT); }
+ opsToCheckFor.set(0, LOBinCond.class);
+ if(checkSuccessors(project, opsToCheckFor)) {
+ throw new FrontendException("Unsupported query: " +
+ "You have an partition column ("
+ + fieldName + ") inside a bincond operator in the " +
+ "filter condition.", errCode, PigException.INPUT);
+ }
+ opsToCheckFor.set(0, LOAnd.class);
+ opsToCheckFor.add(LOOr.class);
+ if(checkSuccessors(project, opsToCheckFor)) {
+ errCode = 1112;
+ throw new FrontendException("Unsupported query: " +
+ "You have an partition column (" + fieldName +
+ " ) in a construction like: " +
+ "(pcond and ...) or (pcond and ...) " +
+ "where pcond is a condition on a partition column.",
+ errCode, PigException.INPUT);
+ }
+ } else {
+ sawNonKeyCol = true;
+ }
+ } catch (FrontendException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ @Override
+ protected void visit(BinaryExpressionOperator binOp)
+ throws VisitorException {
+
+ try {
+ boolean lhsSawKey = false;
+ boolean rhsSawKey = false;
+ boolean lhsSawNonKeyCol = false;
+ boolean rhsSawNonKeyCol = false;
+
+ sawKey = false;
+ sawNonKeyCol = false;
+ binOp.getLhsOperand().visit(this);
+ replaceChild(binOp.getLhsOperand());
+ lhsSawKey = sawKey;
+ lhsSawNonKeyCol = sawNonKeyCol;
+
+
+ sawKey = false;
+ sawNonKeyCol = false;
+ binOp.getRhsOperand().visit(this);
+ replaceChild(binOp.getRhsOperand());
+ rhsSawKey = sawKey;
+ rhsSawNonKeyCol = sawNonKeyCol;
+
+ // only in the case of an AND, we potentially split the AND to
+ // remove conditions on partition columns out of the AND. For this
+ // we set replaceSide accordingly so that when we reach a predecessor
+ // we can trim the appropriate side. If both sides of the AND have
+ // conditions on partition columns, we will remove the AND completely -
+ // in this case, we will not set replaceSide, but sawKey will be
+ // true so that as we go to higher predecessor ANDs we can trim later.
+ if(binOp instanceof LOAnd) {
+ if(lhsSawKey && rhsSawNonKeyCol){
+ replaceSide = Side.LEFT;
+ }else if(rhsSawKey && lhsSawNonKeyCol){
+ replaceSide = Side.RIGHT;
+ }
+ } else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && lhsSawNonKeyCol){
+ int errCode = 1111;
+ String errMsg = "Use of partition column/condition with" +
+ " non partition column/condition in filter expression is not " +
+ "supported." ;
+ throw new FrontendException(errMsg, errCode, PigException.INPUT);
+ }
+
+ sawKey = lhsSawKey || rhsSawKey;
+ sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
+ } catch (FrontendException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+
+
+ /**
+ * @return the condition on partition columns extracted from filter
+ */
+ public Expression getPColCondition(){
+ if(pColConditions.size() == 0)
+ return null;
+ Expression cond = pColConditions.get(0);
+ for(int i=1; i<pColConditions.size(); i++){
+ //if there is more than one condition expression
+ // connect them using "AND"s
+ cond = new BinaryExpression(cond, pColConditions.get(i),
+ OpType.OP_AND);
+ }
+ return cond;
+ }
+
+ /**
+ * @return the filterRemovable
+ */
+ public boolean isFilterRemovable() {
+ return filterRemovable;
+ }
+
+ //////// helper methods /////////////////////////
+ /**
+ * check for the presence of a certain operator type in the Successors
+ * @param opToStartFrom
+ * @param opsToCheckFor operators to be checked for at each level of
+ * Successors - the ordering in the list is the order in which the ops
+ * will be checked.
+ * @return true if opsToCheckFor are found
+ * @throws FrontendException
+ */
+ private boolean checkSuccessors(LogicalOperator opToStartFrom,
+ List<Class<?>> opsToCheckFor) throws FrontendException {
+ boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
+ if(!done && !opsToCheckFor.isEmpty()) {
+ // continue checking if there is more to check
+ while(!done) {
+ opToStartFrom = mPlan.getSuccessors(opToStartFrom).get(0);
+ done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
+ }
+ }
+ return opsToCheckFor.isEmpty();
+ }
+
+ private boolean checkSuccessorsHelper(LogicalOperator opToStartFrom,
+ List<Class<?>> opsToCheckFor) throws FrontendException {
+ List<LogicalOperator> successors = mPlan.getSuccessors(
+ opToStartFrom);
+ if(successors == null || successors.size() == 0) {
+ return true; // further checking cannot be done
+ }
+ if(successors.size() == 1) {
+ LogicalOperator suc = successors.get(0);
+ if(suc.getClass().getCanonicalName().equals(
+ opsToCheckFor.get(0).getCanonicalName())) {
+ // trim the list of operators to check
+ opsToCheckFor.remove(0);
+ if(opsToCheckFor.isEmpty()) {
+ return true; //no further checks required
+ }
+ }
+ } else {
+ throwException();
+ }
+ return false; // more checking can be done
+ }
+
+ private void replaceChild(ExpressionOperator childExpr) throws
+ FrontendException {
+
+ if(replaceSide == Side.NONE) {
+ // the child is trimmed when the appropriate
+ // flag is set to indicate that it needs to be trimmed.
+ return;
+ }
+
+ // eg if replaceSide == Side.LEFT
+ // binexpop
+ // / \ \
+ // child (this is the childExpr argument send in)
+ // / \
+ // Lt Rt
+ //
+ // gets converted to
+ // binexpop
+ // /
+ // Rt
+
+ if(! (childExpr instanceof BinaryExpressionOperator)){
+ throwException();
+ }
+ // child's lhs operand
+ ExpressionOperator childLhs =
+ ((BinaryExpressionOperator)childExpr).getLhsOperand();
+ // child's rhs operand
+ ExpressionOperator childRhs =
+ ((BinaryExpressionOperator)childExpr).getRhsOperand();
+
+ mPlan.disconnect(childLhs, childExpr);
+ mPlan.disconnect(childRhs, childExpr);
+
+ if(replaceSide == Side.LEFT) {
+ // remove left child and replace childExpr with its right child
+ remove(childLhs);
+ mPlan.replace(childExpr, childRhs);
+ } else if(replaceSide == Side.RIGHT){
+ // remove right child and replace childExpr with its left child
+ remove(childRhs);
+ mPlan.replace(childExpr, childLhs);
+ }else {
+ throwException();
+ }
+ //reset
+ replaceSide = Side.NONE;
+ sawKey = false;
+
+ }
+
+ /**
+ * @param op
+ * @throws FrontendException
+ */
+ private void remove(ExpressionOperator op) throws FrontendException {
+ pColConditions.add(getExpression(op));
+ mPlan.trimAbove(op);
+ mPlan.remove(op);
+ }
+
+ public static Expression getExpression(ExpressionOperator op) throws
+ FrontendException {
+ if(op instanceof LOConst) {
+ return new Expression.Const(((LOConst)op).getValue());
+ } else if (op instanceof LOProject) {
+ String fieldName = ((LOProject)op).getFieldSchema().alias;
+ return new Expression.Column(fieldName);
+ } else {
+ if(!(op instanceof BinaryExpressionOperator)) {
+ throwException();
+ }
+ BinaryExpressionOperator binOp = (BinaryExpressionOperator)op;
+ if(binOp instanceof LOAdd) {
+ return getExpression(binOp, OpType.OP_PLUS);
+ } else if(binOp instanceof LOSubtract) {
+ return getExpression(binOp, OpType.OP_MINUS);
+ } else if(binOp instanceof LOMultiply) {
+ return getExpression(binOp, OpType.OP_TIMES);
+ } else if(binOp instanceof LODivide) {
+ return getExpression(binOp, OpType.OP_DIV);
+ } else if(binOp instanceof LOMod) {
+ return getExpression(binOp, OpType.OP_MOD);
+ } else if(binOp instanceof LOAnd) {
+ return getExpression(binOp, OpType.OP_AND);
+ } else if(binOp instanceof LOOr) {
+ return getExpression(binOp, OpType.OP_OR);
+ } else if(binOp instanceof LOEqual) {
+ return getExpression(binOp, OpType.OP_EQ);
+ } else if(binOp instanceof LONotEqual) {
+ return getExpression(binOp, OpType.OP_NE);
+ } else if(binOp instanceof LOGreaterThan) {
+ return getExpression(binOp, OpType.OP_GT);
+ } else if(binOp instanceof LOGreaterThanEqual) {
+ return getExpression(binOp, OpType.OP_GE);
+ } else if(binOp instanceof LOLesserThan) {
+ return getExpression(binOp, OpType.OP_LT);
+ } else if(binOp instanceof LOLesserThanEqual) {
+ return getExpression(binOp, OpType.OP_LE);
+ } else {
+ throwException();
+ }
+ }
+ return null;
+ }
+
+ private static Expression getExpression(BinaryExpressionOperator binOp, OpType
+ opType) throws FrontendException {
+ return new BinaryExpression(getExpression(binOp.getLhsOperand())
+ ,getExpression(binOp.getRhsOperand()), opType);
+ }
+
+ public static void throwException() throws FrontendException {
+ int errCode = 2209;
+ throw new FrontendException(
+ "Internal error while processing any partition filter " +
+ "conditions in the filter after the load" ,
+ errCode,
+ PigException.BUG
+ );
+ }
+
+ // unfortunately LOVisitor today has each visit() method separately defined
+ // so just implementing visit(BinaryExpressionOperator) will not result in
+ // that method being call when LOAdd (say) is encountered (sigh! - we should
+ // fix that at some point) - for now, let's define visit() on each specific
+ // BinaryExpressionOperator that we want to visit to inturn call the
+ // visit(BinaryExpressionOperator) method
+ @Override
+ public void visit(LOAdd op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LOAnd op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LODivide op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LOEqual op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LOGreaterThan op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LOGreaterThanEqual op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+
+ @Override
+ public void visit(LOLesserThan op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LOLesserThanEqual op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LOMod op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LOMultiply op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LONotEqual op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LOOr op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+ @Override
+ public void visit(LOSubtract op) throws VisitorException {
+ visit((BinaryExpressionOperator)op);
+ }
+
+ // this might get called from some visit() - in that case, delegate to
+ // the other visit()s which we have defined here
+ @Override
+ protected void visit(ExpressionOperator op) throws VisitorException {
+ if(op instanceof LOProject) {
+ visit((LOProject)op);
+ } else if (op instanceof BinaryExpressionOperator) {
+ visit((BinaryExpressionOperator)op);
+ } else if (op instanceof LOCast) {
+ visit((LOCast)op);
+ } else if (op instanceof LOBinCond) {
+ visit((LOBinCond)op);
+ } else if (op instanceof LOUserFunc) {
+ visit((LOUserFunc)op);
+ } else if (op instanceof LOIsNull) {
+ visit((LOIsNull)op);
+ }
+
+ }
+
+ // some specific operators which are of interest to catch some
+ // unsupported scenarios
+ @Override
+ protected void visit(LOCast cast) throws VisitorException {
+ visit(cast.getExpression());
+ }
+
+ @Override
+ public void visit(LONot not) throws VisitorException {
+ visit(not.getOperand());
+ }
+
+ @Override
+ protected void visit(LORegexp regexp) throws VisitorException {
+ visit((BinaryExpressionOperator)regexp);
+ }
+
+ @Override
+ protected void visit(LOBinCond binCond) throws VisitorException {
+ visit(binCond.getCond());
+ visit(binCond.getLhsOp());
+ visit(binCond.getRhsOp());
+ }
+
+ @Override
+ protected void visit(LOUserFunc udf) throws VisitorException {
+ for (ExpressionOperator op : udf.getArguments()) {
+ visit(op);
+ }
+ }
+
+ @Override
+ public void visit(LOIsNull isNull) throws VisitorException {
+ visit(isNull.getOperand());
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java?rev=895805&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java Mon Jan 4 22:30:57 2010
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigException;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.Expression.Column;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.PColFilterExtractor;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+
+/**
+ * When the load statement in a pig script is loading a table from a meta data
+ * system (like owl), the load can be followed by a filter which can contain
+ * conditions on partition columns. This filter can also contain conditions on
+ * non partition columns. This optimizer looks at the logical plan and checks if
+ * there is a load followed by such a filter which has conditions on partition
+ * columns. If so, it extracts the conditions on partition columns out of the
+ * filter.
+ */
+public class PartitionFilterOptimizer extends
+ LogicalTransformer {
+
+ private String[] partitionKeys;
+
+ /**
+ * a reference to the LoadMetada implementation
+ */
+ private LoadMetadata loadMetadata;
+
+ /**
+ * a reference to the LoadFunc implementation
+ */
+ private LoadFunc loadFunc;
+
+ private LOLoad loLoad;
+ private LOFilter loFilter;
+
+ /**
+ * flag to ensure we only do the optimization once for performance reasons
+ */
+ private boolean alreadyCalled = false;
+
+ /**
+ * a map between column names as reported in
+ * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}
+ * and as present in {@link LOLoad#getSchema()}. The two will be different
+ * when the user has provided a schema in the load statement
+ */
+ private Map<String, String> colNameMap = new HashMap<String, String>();
+
+ /**
+ * a map between column nameas as present in {@link LOLoad#getSchema()} and
+ * as reported in
+ * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}.
+ * The two will be different when the user has provided a schema in the
+ * load statement.
+ */
+ private Map<String, String> reverseColNameMap = new HashMap<String, String>();
+
+
+ protected PartitionFilterOptimizer(LogicalPlan plan) {
+ super(plan);
+ }
+
+ @Override
+ public boolean check(List<LogicalOperator> nodes) throws OptimizerException
+ {
+ if(!alreadyCalled) {
+ // first call
+ alreadyCalled = true;
+ } else {
+ // already called, just return
+ return false;
+ }
+ if((nodes == null) || (nodes.size() <= 0)) {
+ int errCode = 2052;
+ String msg = "Internal error. Cannot retrieve operator from null " +
+ "or empty list.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+ }
+ if(nodes.size() != 1|| !(nodes.get(0) instanceof LOLoad)) {
+ return false;
+ }
+ loLoad = (LOLoad)nodes.get(0);
+ List<LogicalOperator> sucs = mPlan.getSuccessors(loLoad);
+ if(sucs == null || sucs.size() != 1 || !(sucs.get(0) instanceof LOFilter)) {
+ return false;
+ }
+ loFilter = (LOFilter)sucs.get(0);
+
+ // we have to check more only if LoadFunc implements LoadMetada
+ loadFunc = loLoad.getLoadFunc();
+ if(!(loadFunc instanceof LoadMetadata)) {
+ return false;
+ }
+ loadMetadata = (LoadMetadata)loadFunc;
+ try {
+ partitionKeys = loadMetadata.getPartitionKeys(
+ loLoad.getInputFile().getFileName(), loLoad.getConfiguration());
+ if(partitionKeys == null || partitionKeys.length == 0) {
+ return false;
+ }
+ } catch (IOException e) {
+ int errCode = 2209;
+ throw new OptimizerException(
+ "Internal error while processing any partition filter " +
+ "conditions in the filter after the load" ,
+ errCode,
+ PigException.BUG
+ );
+ }
+
+ // we found a load-filter pattern where the load returns partition keys
+ return true;
+ }
+
+ @Override
+ public void transform(List<LogicalOperator> nodes)
+ throws OptimizerException {
+ try {
+ setupColNameMaps();
+ PColFilterExtractor pColFilterFinder = new PColFilterExtractor(
+ loFilter.getComparisonPlan(), getMappedKeys(partitionKeys));
+ pColFilterFinder.visit();
+ Expression partitionFilter = pColFilterFinder.getPColCondition();
+ if(partitionFilter != null) {
+ // the column names in the filter may be the ones provided by
+ // the user in the schema in the load statement - we may need
+ // to replace them with partition column names as given by
+ // LoadFunc.getSchema()
+ updateMappedColNames(partitionFilter);
+ loadMetadata.setPartitionFilter(partitionFilter);
+ if(pColFilterFinder.isFilterRemovable()) {
+ // remove this filter from the plan
+ mPlan.removeAndReconnect(loFilter);
+ }
+ }
+ } catch (Exception e) {
+ int errCode = 2209;
+ throw new OptimizerException(
+ "Internal error while processing any partition filter " +
+ "conditions in the filter after the load:" ,
+ errCode,
+ PigException.BUG,
+ e
+ );
+ }
+ }
+
+
+
+ /**
+ * @param expr
+ */
+ private void updateMappedColNames(Expression expr) {
+ if(expr instanceof BinaryExpression) {
+ updateMappedColNames(((BinaryExpression) expr).getLhs());
+ updateMappedColNames(((BinaryExpression) expr).getRhs());
+ } else if (expr instanceof Column) {
+ Column col = (Column) expr;
+ col.setName(reverseColNameMap.get(col.getName()));
+ }
+ }
+
+ /**
+ * The partition keys in the argument are as reported by
+ * {@link LoadMetadata#getPartitionKeys(String, org.apache.hadoop.conf.Configuration)}.
+ * The user may have renamed these by providing a schema with different names
+ * in the load statement - this method will replace the former names with
+ * the latter names.
+ * @param partitionKeys
+ * @return
+ */
+ private List<String> getMappedKeys(String[] partitionKeys) {
+ List<String> mappedKeys = new ArrayList<String>(partitionKeys.length);
+ for (int i = 0; i < partitionKeys.length; i++) {
+ mappedKeys.add(colNameMap.get(partitionKeys[i]));
+ }
+ return mappedKeys;
+ }
+
+
+
+ /**
+ * @throws FrontendException
+ *
+ */
+ private void setupColNameMaps() throws FrontendException {
+ Schema loadFuncSchema = loLoad.getDeterminedSchema();
+ Schema loLoadSchema = loLoad.getSchema();
+ for(int i = 0; i < loadFuncSchema.size(); i++) {
+ colNameMap.put(loadFuncSchema.getField(i).alias,
+ (i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
+ loadFuncSchema.getField(i).alias));
+
+ reverseColNameMap.put((i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
+ loadFuncSchema.getField(i).alias),
+ loadFuncSchema.getField(i).alias);
+ }
+ }
+
+}
Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPartitionFilterOptimization.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPartitionFilterOptimization.java?rev=895805&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPartitionFilterOptimization.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPartitionFilterOptimization.java Mon Jan 4 22:30:57 2010
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.ExecType;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.ExpressionOperator;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.PColFilterExtractor;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Test;
+
+/**
+ * unit tests to test extracting partition filter conditions out of the filter
+ * condition in the filter following a load which talks to metadata system (.i.e.
+ * implements {@link LoadMetadata})
+ */
+public class TestPartitionFilterOptimization extends TestCase {
+
+ LogicalPlanTester lpTester;
+
+ @Override
+ protected void setUp() throws Exception {
+ lpTester = new LogicalPlanTester();
+ lpTester.buildPlan("a = load 'foo' as (srcid, mrkt, dstid, name, age);");
+ }
+
+ /**
+ * test case where there is a single expression on partition columns in
+ * the filter expression along with an expression on non partition column
+ * @throws FrontendException
+ */
+ @Test
+ public void testSimpleMixed() throws FrontendException {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by srcid == 10 and name == 'foo';");
+ test(lp, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
+ }
+
+ /**
+ * test case where filter does not contain any condition on partition cols
+ * @throws Exception
+ */
+ @Test
+ public void testNoPartFilter() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by age == 20 and name == 'foo';");
+ test(lp, Arrays.asList("srcid"), null,
+ "((age == 20) and (name == 'foo'))");
+ }
+
+ /**
+ * test case where filter only contains condition on partition cols
+ * @throws Exception
+ */
+ @Test
+ public void testOnlyPartFilter1() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by srcid > 20 and mrkt == 'us';");
+ test(lp, Arrays.asList("srcid", "mrkt"),
+ "((srcid > 20) and (mrkt == 'us'))", null);
+
+ }
+
+ /**
+ * test case where filter only contains condition on partition cols
+ * @throws Exception
+ */
+ @Test
+ public void testOnlyPartFilter2() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by mrkt == 'us';");
+ test(lp, Arrays.asList("srcid", "mrkt"),
+ "(mrkt == 'us')", null);
+
+ }
+
+ /**
+ * test case where filter only contains condition on partition cols
+ * @throws Exception
+ */
+ @Test
+ public void testOnlyPartFilter3() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by srcid == 20 or mrkt == 'us';");
+ test(lp, Arrays.asList("srcid", "mrkt"),
+ "((srcid == 20) or (mrkt == 'us'))", null);
+
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns
+ */
+ @Test
+ public void testMixed1() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by " +
+ "(age < 20 and mrkt == 'us') and (srcid == 10 and " +
+ "name == 'foo');");
+ test(lp, Arrays.asList("srcid", "mrkt"),
+ "((mrkt == 'us') and (srcid == 10))",
+ "((age < 20) and (name == 'foo'))");
+ }
+
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns
+ */
+ @Test
+ public void testMixed2() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by " +
+ "(age >= 20 and mrkt == 'us') and (srcid == 10 and " +
+ "dstid == 15);");
+ test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))",
+ "(age >= 20)");
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns
+ */
+ @Test
+ public void testMixed3() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by " +
+ "age >= 20 and mrkt == 'us' and srcid == 10;");
+ test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns - this testcase also has a condition
+ * based on comparison of two partition columns
+ */
+ @Test
+ public void testMixed4() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by " +
+ "age >= 20 and mrkt == 'us' and name == 'foo' and " +
+ "srcid == dstid;");
+ test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and (srcid == dstid))",
+ "((age >= 20) and (name == 'foo'))");
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns -
+ * This testcase has two partition col conditions with OR + non parition
+ * col conditions
+ */
+ @Test
+ public void testMixed5() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by " +
+ "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
+ "dstid == 30;");
+ test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
+ "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))",
+ "(name == 'foo')");
+ }
+
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns -
+ * This testcase has two partition col conditions with OR + non parition
+ * col conditions
+ */
+ @Test
+ public void testMixed6() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by " +
+ "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';");
+ test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))",
+ "(name == 'foo')");
+ }
+ /**
+ * test case where filter has both conditions on partition cols and non
+ * partition cols and the filter condition will be split to extract the
+ * conditions on partition columns. This testcase also tests arithmetic
+ * in partition column conditions
+ */
+ @Test
+ public void testMixedArith() throws Exception {
+ LogicalPlan lp =
+ lpTester.buildPlan("b = filter a by " +
+ "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;");
+ test(lp, Arrays.asList("srcid", "dstid", "mrkt"),
+ "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))",
+ "(age != 15)");
+ }
+
+ @Test
+ public void testNegPColConditionWithNonPCol() throws Exception {
+ // use of partition column condition and non partition column in
+ // same condition should fail
+ LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+ "srcid > age;");
+ negativeTest(lp, Arrays.asList("srcid"), 1111);
+ lp = lpTester.buildPlan("b = filter a by " +
+ "srcid + age == 20;");
+ negativeTest(lp, Arrays.asList("srcid"), 1111);
+
+ // OR of partition column condition and non partiton col condition
+ // should fail
+ lp = lpTester.buildPlan("b = filter a by " +
+ "srcid > 10 or name == 'foo';");
+ negativeTest(lp, Arrays.asList("srcid"), 1111);
+ }
+
+ @Test
+ public void testNegPColInWrongPlaces() throws Exception {
+
+ int expectedErrCode = 1112;
+ LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+ "(srcid > 10 and name == 'foo') or dstid == 10;");
+ negativeTest(lp, Arrays.asList("srcid", "dstid"), expectedErrCode);
+
+ expectedErrCode = 1110;
+ lp = lpTester.buildPlan("b = filter a by " +
+ "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
+ negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+
+ lp = lpTester.buildPlan("b = filter a by " +
+ "mrkt matches '.*us.*' and age < 15;");
+ negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+
+ lp = lpTester.buildPlan("b = filter a by " +
+ "(int)mrkt == 10 and name matches '.*foo.*';");
+ negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"),expectedErrCode);
+
+ lp = lpTester.buildPlan("b = filter a by " +
+ "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
+ negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+
+ lp = lpTester.buildPlan("b = filter a by " +
+ "(mrkt is null) and name matches '.*foo.*';");
+ negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+
+ lp = lpTester.buildPlan("b = filter a by " +
+ "(mrkt is not null) and name matches '.*foo.*';");
+ negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+ }
+
+
+ /**
+ * Test that pig sends correct partition column names in setPartitionFilter
+ * when the user has a schema in the load statement which renames partition
+ * columns
+ * @throws Exception
+ */
+ @Test
+ public void testColNameMapping1() throws Exception {
+ TestLoader.partFilter = null;
+ lpTester.buildPlan("a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+ "'srcid,mrkt') as (f1, f2, f3, f4, f5);");
+ LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+ "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
+ lpTester.typeCheckPlan(lp);
+ lpTester.optimizePlan(lp);
+ assertEquals("checking partition filter:",
+ "((mrkt == 'us') and (srcid == 10))",
+ TestLoader.partFilter.toString());
+ LOFilter filter = (LOFilter) lp.getLeaves().get(0);
+ String actual = PColFilterExtractor.getExpression(
+ (ExpressionOperator) filter.getComparisonPlan().
+ getLeaves().get(0)).
+ toString().toLowerCase();
+ assertEquals("checking trimmed filter expression:",
+ "((f5 >= 20) and (f3 == 15))", actual);
+ }
+
+
+ /**
+ * Test that pig sends correct partition column names in setPartitionFilter
+ * when the user has a schema in the load statement which renames partition
+ * columns - in this test case there is no condition on partition columns
+ * - so setPartitionFilter() should not be called and the filter condition
+ * should remain as is.
+ * @throws Exception
+ */
+ @Test
+ public void testColNameMapping2() throws Exception {
+ TestLoader.partFilter = null;
+ lpTester.buildPlan("a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+ "'srcid') as (f1, f2, f3, f4, f5);");
+ LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+ "f5 >= 20 and f2 == 'us' and f3 == 15;");
+ lpTester.typeCheckPlan(lp);
+ lpTester.optimizePlan(lp);
+ assertEquals("checking partition filter:",
+ null,
+ TestLoader.partFilter);
+ LOFilter filter = (LOFilter) lp.getLeaves().get(0);
+ String actual = PColFilterExtractor.getExpression(
+ (ExpressionOperator) filter.getComparisonPlan().
+ getLeaves().get(0)).
+ toString().toLowerCase();
+ assertEquals("checking trimmed filter expression:",
+ "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual);
+ }
+
+ /**
+ * Test that pig sends correct partition column names in setPartitionFilter
+ * when the user has a schema in the load statement which renames partition
+ * columns - in this test case the filter only has conditions on partition
+ * columns
+ * @throws Exception
+ */
+ @Test
+ public void testColNameMapping3() throws Exception {
+ TestLoader.partFilter = null;
+ lpTester.buildPlan("a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+ "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);");
+ LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+ "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);");
+ lpTester.typeCheckPlan(lp);
+ lpTester.optimizePlan(lp);
+ assertEquals("checking partition filter:",
+ "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
+ "(dstid == 15)))",
+ TestLoader.partFilter.toString());
+ Iterator<LogicalOperator> it = lp.iterator();
+ assertTrue("Checking that filter has been removed since it contained" +
+ " only conditions on partition cols:",
+ (it.next() instanceof LOLoad));
+ assertFalse("Checking that filter has been removed since it contained" +
+ " only conditions on partition cols:",
+ it.hasNext());
+
+ }
+
+ /**
+ * Test that pig sends correct partition column names in setPartitionFilter
+ * when the user has a schema in the load statement which renames partition
+ * columns - in this test case the schema in load statement is a prefix
+ * (with columns renamed) of the schema returned by
+ * {@link LoadMetadata#getSchema(String, Configuration)}
+ * @throws Exception
+ */
+ @Test
+ public void testColNameMapping4() throws Exception {
+ TestLoader.partFilter = null;
+ lpTester.buildPlan("a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
+ "'srcid,mrkt') as (f1, f2, f3);");
+ LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
+ "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
+ lpTester.typeCheckPlan(lp);
+ lpTester.optimizePlan(lp);
+ assertEquals("checking partition filter:",
+ "((mrkt == 'us') and (srcid == 10))",
+ TestLoader.partFilter.toString());
+ LOFilter filter = (LOFilter) lp.getLeaves().get(0);
+ String actual = PColFilterExtractor.getExpression(
+ (ExpressionOperator) filter.getComparisonPlan().
+ getLeaves().get(0)).
+ toString().toLowerCase();
+ assertEquals("checking trimmed filter expression:",
+ "((age >= 20) and (f3 == 15))", actual);
+ }
+
+ //// helper methods ///////
+
+ private PColFilterExtractor test(LogicalPlan lp, List<String> partitionCols,
+ String expPartFilterString, String expFilterString)
+ throws FrontendException {
+ LOFilter filter = (LOFilter)lp.getLeaves().get(0);
+ PColFilterExtractor pColExtractor = new PColFilterExtractor(
+ filter.getComparisonPlan(), partitionCols);
+ pColExtractor.visit();
+
+ if(expPartFilterString == null) {
+ assertEquals("Checking partition column filter:", null,
+ pColExtractor.getPColCondition());
+ } else {
+ assertEquals("Checking partition column filter:",
+ expPartFilterString.toLowerCase(),
+ pColExtractor.getPColCondition().toString().toLowerCase());
+ }
+
+ if(expFilterString == null) {
+ assertTrue("Check that filter can be removed:",
+ pColExtractor.isFilterRemovable());
+ } else {
+ String actual = PColFilterExtractor.getExpression(
+ (ExpressionOperator) filter.getComparisonPlan().
+ getLeaves().get(0)).
+ toString().toLowerCase();
+ assertEquals("checking trimmed filter expression:", expFilterString,
+ actual);
+ }
+ return pColExtractor;
+ }
+
+ private void negativeTest(LogicalPlan lp, List<String> partitionCols,
+ int expectedErrorCode) {
+ LOFilter filter = (LOFilter)lp.getLeaves().get(0);
+ PColFilterExtractor pColExtractor = new PColFilterExtractor(
+ filter.getComparisonPlan(), partitionCols);
+ try {
+ pColExtractor.visit();
+ } catch(Exception e) {
+ assertEquals("Checking if exception has right error code",
+ expectedErrorCode, LogUtils.getPigException(e).getErrorCode());
+ return;
+ }
+ fail("Exception expected!");
+ }
+
+ /**
+ * this loader is only used to test that parition column filters are given
+ * in the manner expected in terms of column names - hence it does not
+ * implement many of the methods and only implements required ones.
+ */
+ public static class TestLoader extends LoadFunc implements LoadMetadata {
+
+ Schema schema;
+ String[] partCols;
+ static Expression partFilter = null;
+
+ public TestLoader(String schemaString, String commaSepPartitionCols)
+ throws ParseException {
+ schema = Util.getSchemaFromString(schemaString);
+ partCols = commaSepPartitionCols.split(",");
+ }
+
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ return null;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ throws IOException {
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public String[] getPartitionKeys(String location, Configuration conf)
+ throws IOException {
+ return partCols;
+ }
+
+ @Override
+ public ResourceSchema getSchema(String location, Configuration conf)
+ throws IOException {
+ return new ResourceSchema(schema);
+ }
+
+ @Override
+ public ResourceStatistics getStatistics(String location,
+ Configuration conf) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void setPartitionFilter(Expression partitionFilter)
+ throws IOException {
+ partFilter = partitionFilter;
+ }
+
+ }
+}