You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/28 06:01:48 UTC

svn commit: r1546285 [2/3] - in /pig/branches/tez: ./ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/ contrib/piggybank/java/src/test/java/...

Modified: pig/branches/tez/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/PColFilterExtractor.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/PColFilterExtractor.java Thu Nov 28 05:01:47 2013
@@ -1,600 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.newplan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.Expression;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.util.Pair;
-
-import org.apache.pig.Expression.OpType;
-import org.apache.pig.newplan.logical.expression.AddExpression;
-import org.apache.pig.newplan.logical.expression.AndExpression;
-import org.apache.pig.newplan.logical.expression.BinCondExpression;
-import org.apache.pig.newplan.logical.expression.BinaryExpression;
-import org.apache.pig.newplan.logical.expression.CastExpression;
-import org.apache.pig.newplan.logical.expression.ConstantExpression;
-import org.apache.pig.newplan.logical.expression.DereferenceExpression;
-import org.apache.pig.newplan.logical.expression.DivideExpression;
-import org.apache.pig.newplan.logical.expression.EqualExpression;
-import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
-import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
-import org.apache.pig.newplan.logical.expression.IsNullExpression;
-import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
-import org.apache.pig.newplan.logical.expression.LessThanExpression;
-import org.apache.pig.newplan.logical.expression.LogicalExpression;
-import org.apache.pig.newplan.logical.expression.MapLookupExpression;
-import org.apache.pig.newplan.logical.expression.ModExpression;
-import org.apache.pig.newplan.logical.expression.MultiplyExpression;
-import org.apache.pig.newplan.logical.expression.NotEqualExpression;
-import org.apache.pig.newplan.logical.expression.NotExpression;
-import org.apache.pig.newplan.logical.expression.OrExpression;
-import org.apache.pig.newplan.logical.expression.ProjectExpression;
-import org.apache.pig.newplan.logical.expression.RegexExpression;
-import org.apache.pig.newplan.logical.expression.SubtractExpression;
-import org.apache.pig.newplan.logical.expression.UserFuncExpression;
-import org.apache.pig.newplan.DepthFirstWalker;
-
-/**
- * This Visitor works on the filter condition of a LOFilter which immediately
- * follows a LOLoad that interacts with a metadata system (currently OWL) to
- * read table data. The visitor looks for conditions on partition columns in the
- * filter condition and extracts those conditions out of the filter condition.
- * The condition on partition cols will be used to prune partitions of the table.
- *
- */
-@Deprecated
-public class PColFilterExtractor extends PlanVisitor {
-
-    private static final Log LOG = LogFactory.getLog(PColFilterExtractor.class);
-
-    /**
-     * partition columns associated with the table
-     * present in the load on which the filter whose
-     * inner plan is being visited is applied
-     */
-    private List<String> partitionCols;
-
-    /**
-     * will contain the partition column filter conditions
-     * accumulated during the visit - the final condition will an expression
-     * built from these sub expressions connected with AND
-     */
-    private ArrayList<Expression> pColConditions = new ArrayList<Expression>();
-
-    /**
-     * flag used during visit to indicate if a partition key
-     * was seen
-     */
-    private boolean sawKey;
-
-    private boolean sawNonKeyCol;
-
-    private enum Side { LEFT, RIGHT, NONE };
-    private Side replaceSide = Side.NONE;
-
-    private boolean filterRemovable = false;
-
-    private boolean canPushDown = true;
-
-    @Override
-    public void visit() throws FrontendException {
-        // we will visit the leaf and it will recursively walk the plan
-        LogicalExpression leaf = (LogicalExpression)plan.getSources().get( 0 );
-        // if the leaf is a unary operator it should be a FilterFunc in
-        // which case we don't try to extract partition filter conditions
-        if(leaf instanceof BinaryExpression) {
-            BinaryExpression binExpr = (BinaryExpression)leaf;
-            visit( binExpr );
-            replaceChild( binExpr );
-            // if the entire expression is to be removed, then the above
-            // replaceChild will not set sawKey to false (sawKey is set to
-            // false only in replaceChild()
-            if(sawKey == true) {
-                //there are only conditions on partition columns in the filter
-                //extract it
-                pColConditions.add( getExpression( leaf ) );
-                filterRemovable = true;
-            }
-        }
-    }
-
-    /**
-     *
-     * @param plan logical plan corresponding the filter's comparison condition
-     * @param partitionCols list of partition columns of the table which is
-     * being loaded in the LOAD statement which is input to the filter
-     */
-    public PColFilterExtractor(OperatorPlan plan,
-            List<String> partitionCols) {
-        // though we configure a DepthFirstWalker to be the walker, we will not
-        // use it - we will visit the leaf and it will recursively walk the
-        // plan
-        super( plan, new DepthFirstWalker( plan ) );
-        this.partitionCols = new ArrayList<String>(partitionCols);
-    }
-
-    protected void visit(ProjectExpression project) throws FrontendException {
-        String fieldName = project.getFieldSchema().alias;
-        if(partitionCols.contains(fieldName)) {
-            sawKey = true;
-            // The condition on partition column will be used to prune the
-            // scan and removed from the filter condition. Hence the condition
-            // on the partition column will not be re applied when data is read,
-            // so the following cases should throw error until that changes.
-            List<Class<?>> opsToCheckFor = new ArrayList<Class<?>>();
-                        opsToCheckFor.add(UserFuncExpression.class);
-            if(checkSuccessors(project, opsToCheckFor)) {
-                LOG.warn("No partition filter push down: " +
-                    "You have an partition column ("
-                    + fieldName + ") inside a function in the " +
-                    "filter condition.");
-                canPushDown = false;
-                return;
-            }
-            opsToCheckFor.set(0, CastExpression.class);
-            if(checkSuccessors(project, opsToCheckFor)) {
-                LOG.warn("No partition filter push down: " +
-                    "You have an partition column ("
-                    + fieldName + ") inside a cast in the " +
-                    "filter condition.");
-                canPushDown = false;
-                return;
-            }
-            opsToCheckFor.set(0, IsNullExpression.class);
-            if(checkSuccessors(project, opsToCheckFor)) {
-                LOG.warn("No partition filter push down: " +
-                    "You have an partition column ("
-                    + fieldName + ") inside a null check operator in the " +
-                    "filter condition.");
-                canPushDown = false;
-                return;
-            }
-            opsToCheckFor.set(0, BinCondExpression.class);
-            if(checkSuccessors(project, opsToCheckFor)) {
-                LOG.warn("No partition filter push down: " +
-                    "You have an partition column ("
-                    + fieldName + ") inside a bincond operator in the " +
-                    "filter condition.");
-                canPushDown = false;
-                return;
-            }
-        } else {
-            sawNonKeyCol = true;
-        }
-    }
-
-    /**
-     * Detect whether a non-partition column is present in the expression.
-     * @param binOp
-     * @return true or false
-     * @throws FrontendException
-     */
-    private boolean detectNonPartitionColumn(BinaryExpression binOp) throws FrontendException {
-        LogicalExpression lhs = binOp.getLhs();
-        LogicalExpression rhs = binOp.getRhs();
-        if (lhs instanceof ProjectExpression) {
-            String fieldName = ((ProjectExpression)lhs).getFieldSchema().alias;
-            if(!partitionCols.contains(fieldName)) {
-                return true;
-            }
-        }
-        if (rhs instanceof ProjectExpression) {
-            String fieldName = ((ProjectExpression)rhs).getFieldSchema().alias;
-            if(!partitionCols.contains(fieldName)) {
-                return true;
-            }
-        }
-
-        boolean lhsSawNonKeyCol = false;
-        boolean rhsSawNonKeyCol = false;
-        if (lhs instanceof BinaryExpression) {
-            lhsSawNonKeyCol = detectNonPartitionColumn((BinaryExpression)lhs);
-        }
-        if (rhs instanceof BinaryExpression) {
-            rhsSawNonKeyCol = detectNonPartitionColumn((BinaryExpression)rhs);
-        }
-
-        return lhsSawNonKeyCol || rhsSawNonKeyCol;
-    }
-
-    /**
-     * Detect and/or expressions that contain both partition and non-partition
-     * conditions such as '(pcond and non-pcond) or (pcond and non-pcond)'.
-     * @param binOp
-     * @return true or false
-     * @throws FrontendException
-     */
-    private boolean detectAndOrConditionWithMixedColumns(BinaryExpression binOp) throws FrontendException {
-        LogicalExpression lhs = binOp.getLhs();
-        LogicalExpression rhs = binOp.getRhs();
-
-        if ( (binOp instanceof OrExpression) &&
-             ( (lhs instanceof AndExpression && rhs instanceof AndExpression) ||
-               (lhs instanceof OrExpression || rhs instanceof OrExpression) ) ) {
-            return detectNonPartitionColumn(binOp);
-        }
-
-        return false;
-    }
-
-    private void visit(BinaryExpression binOp) throws FrontendException {
-        boolean lhsSawKey = false;
-        boolean rhsSawKey = false;
-        boolean lhsSawNonKeyCol = false;
-        boolean rhsSawNonKeyCol = false;
-        sawKey = false;
-        sawNonKeyCol = false;
-
-        if (detectAndOrConditionWithMixedColumns(binOp)) {
-            sawNonKeyCol = true;
-            // Don't set canPushDown to false. If there are other AND
-            // conditions on a partition column we want to push that down
-            LOG.warn("No partition filter push down: You have partition and non-partition "
-                    + "columns  in a construction like: "
-                    + "(pcond and non-pcond ..) or (pcond and non-pcond ...) "
-                    + "where pcond is a condition on a partition column and "
-                    + "non-pcond is a condition on a non-partition column.");
-            return;
-        }
-
-        visit( binOp.getLhs() );
-        replaceChild(binOp.getLhs());
-        lhsSawKey = sawKey;
-        lhsSawNonKeyCol = sawNonKeyCol;
-
-        sawKey = false;
-        sawNonKeyCol = false;
-        visit( binOp.getRhs() );
-        replaceChild(binOp.getRhs());
-        rhsSawKey = sawKey;
-        rhsSawNonKeyCol = sawNonKeyCol;
-
-        // only in the case of an AND, we potentially split the AND to
-        // remove conditions on partition columns out of the AND. For this
-        // we set replaceSide accordingly so that when we reach a predecessor
-        // we can trim the appropriate side. If both sides of the AND have
-        // conditions on partition columns, we will remove the AND completely -
-        // in this case, we will not set replaceSide, but sawKey will be
-        // true so that as we go to higher predecessor ANDs we can trim later.
-        if(binOp instanceof AndExpression) {
-            if(lhsSawKey && rhsSawNonKeyCol){
-                replaceSide = Side.LEFT;
-            }else if(rhsSawKey && lhsSawNonKeyCol){
-                replaceSide = Side.RIGHT;
-            }
-        } else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && lhsSawNonKeyCol){
-            LOG.warn("No partition filter push down: " +
-                "Use of partition column/condition with" +
-                " non partition column/condition in filter expression is not " +
-                "supported.");
-            canPushDown = false;
-        }
-
-        sawKey = lhsSawKey || rhsSawKey;
-        sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
-    }
-
-    /**
-     * @return the condition on partition columns extracted from filter
-     */
-    public  Expression getPColCondition(){
-        if(!canPushDown || pColConditions.size() == 0)
-            return null;
-        Expression cond =  pColConditions.get(0);
-        for(int i=1; i<pColConditions.size(); i++){
-            //if there is more than one condition expression
-            // connect them using "AND"s
-            cond = new Expression.BinaryExpression(cond, pColConditions.get(i),
-                    OpType.OP_AND);
-        }
-        return cond;
-    }
-
-    /**
-     * @return the filterRemovable
-     */
-    public boolean isFilterRemovable() {
-        return canPushDown && filterRemovable;
-    }
-
-    //////// helper methods /////////////////////////
-    /**
-     * check for the presence of a certain operator type in the Successors
-     * @param opToStartFrom
-     * @param opsToCheckFor operators to be checked for at each level of
-     * Successors - the ordering in the list is the order in which the ops
-     * will be checked.
-     * @return true if opsToCheckFor are found
-     * @throws IOException
-     */
-    private boolean checkSuccessors(Operator opToStartFrom,
-            List<Class<?>> opsToCheckFor) throws FrontendException {
-        boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
-        if(!done && !opsToCheckFor.isEmpty()) {
-            // continue checking if there is more to check
-            while(!done) {
-                opToStartFrom = plan.getPredecessors(opToStartFrom).get(0);
-                done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
-            }
-        }
-        return opsToCheckFor.isEmpty();
-    }
-
-    private boolean checkSuccessorsHelper(Operator opToStartFrom,
-            List<Class<?>> opsToCheckFor) throws FrontendException {
-        List<Operator> successors = plan.getPredecessors(
-                opToStartFrom);
-        if(successors == null || successors.size() == 0) {
-            return true; // further checking cannot be done
-        }
-        if(successors.size() == 1) {
-            Operator suc  = successors.get(0);
-            if(suc.getClass().getCanonicalName().equals(
-                    opsToCheckFor.get(0).getCanonicalName())) {
-                // trim the list of operators to check
-                opsToCheckFor.remove(0);
-                if(opsToCheckFor.isEmpty()) {
-                    return true; //no further checks required
-                }
-            }
-        } else {
-            logInternalErrorAndSetFlag();
-        }
-        return false; // more checking can be done
-    }
-
-    private void replaceChild(LogicalExpression childExpr) throws FrontendException {
-
-        if(replaceSide == Side.NONE) {
-            // the child is trimmed when the appropriate
-            // flag is set to indicate that it needs to be trimmed.
-            return;
-        }
-
-        // eg if replaceSide == Side.LEFT
-        //    binexpop
-        //   /   \ \
-        // child (this is the childExpr argument send in)
-        //  /  \
-        // Lt   Rt
-        //
-        // gets converted to
-        //  binexpop
-        //  /
-        // Rt
-
-        if( !( childExpr instanceof BinaryExpression ) ) {
-             logInternalErrorAndSetFlag();
-             return;
-        }
-        // child's lhs operand
-        LogicalExpression leftChild =
-            ((BinaryExpression)childExpr).getLhs();
-        // child's rhs operand
-        LogicalExpression rightChild =
-            ((BinaryExpression)childExpr).getRhs();
-
-        plan.disconnect( childExpr, leftChild );
-        plan.disconnect( childExpr, rightChild );
-
-        if(replaceSide == Side.LEFT) {
-            // remove left child and replace childExpr with its right child
-            remove( leftChild );
-            replace(childExpr, rightChild);
-        } else if(replaceSide == Side.RIGHT){
-            // remove right child and replace childExpr with its left child
-            remove(rightChild);
-            replace(childExpr, leftChild);
-        } else {
-            logInternalErrorAndSetFlag();
-            return;
-        }
-        //reset
-        replaceSide = Side.NONE;
-        sawKey = false;
-
-    }
-
-    private void replace(Operator oldOp, Operator newOp) throws FrontendException {
-        List<Operator> grandParents = plan.getPredecessors( oldOp );
-        if( grandParents == null || grandParents.size() == 0 ) {
-            plan.remove( oldOp );
-            return;
-        }
-        Operator grandParent = plan.getPredecessors( oldOp ).get( 0 );
-        Pair<Integer, Integer> pair = plan.disconnect( grandParent, oldOp );
-        plan.add( newOp );
-        plan.connect( grandParent, pair.first, newOp, pair.second );
-        plan.remove( oldOp );
-    }
-
-    /**
-     * @param op
-     * @throws IOException
-     * @throws IOException
-     * @throws IOException
-     */
-    private void remove(LogicalExpression op) throws FrontendException {
-        pColConditions.add( getExpression( op ) );
-        removeTree( op );
-    }
-
-    /**
-     * Assume that the given operator is already disconnected from its predecessors.
-     * @param op
-     * @throws FrontendException
-     */
-    private void removeTree(Operator op) throws FrontendException {
-        List<Operator> succs = plan.getSuccessors( op );
-        if( succs == null ) {
-            plan.remove( op );
-            return;
-        }
-
-        Operator[] children = new Operator[succs.size()];
-        for( int i = 0; i < succs.size(); i++ ) {
-            children[i] = succs.get(i);
-        }
-
-        for( Operator succ : children ) {
-            plan.disconnect( op, succ );
-            removeTree( succ );
-        }
-
-        plan.remove( op );
-    }
-
-    public Expression getExpression(LogicalExpression op) throws FrontendException
-     {
-        if(op instanceof ConstantExpression) {
-            ConstantExpression constExpr =(ConstantExpression)op ;
-            return new Expression.Const( constExpr.getValue() );
-        } else if (op instanceof ProjectExpression) {
-            ProjectExpression projExpr = (ProjectExpression)op;
-            String fieldName = projExpr.getFieldSchema().alias;
-            return new Expression.Column(fieldName);
-        } else {
-            if( !( op instanceof BinaryExpression ) ) {
-                logInternalErrorAndSetFlag();
-                return null;
-            }
-            BinaryExpression binOp = (BinaryExpression)op;
-            if(binOp instanceof AddExpression) {
-                return getExpression( binOp, OpType.OP_PLUS );
-            } else if(binOp instanceof SubtractExpression) {
-                return getExpression(binOp, OpType.OP_MINUS);
-            } else if(binOp instanceof MultiplyExpression) {
-                return getExpression(binOp, OpType.OP_TIMES);
-            } else if(binOp instanceof DivideExpression) {
-                return getExpression(binOp, OpType.OP_DIV);
-            } else if(binOp instanceof ModExpression) {
-                return getExpression(binOp, OpType.OP_MOD);
-            } else if(binOp instanceof AndExpression) {
-                return getExpression(binOp, OpType.OP_AND);
-            } else if(binOp instanceof OrExpression) {
-                return getExpression(binOp, OpType.OP_OR);
-            } else if(binOp instanceof EqualExpression) {
-                return getExpression(binOp, OpType.OP_EQ);
-            } else if(binOp instanceof NotEqualExpression) {
-                return getExpression(binOp, OpType.OP_NE);
-            } else if(binOp instanceof GreaterThanExpression) {
-                return getExpression(binOp, OpType.OP_GT);
-            } else if(binOp instanceof GreaterThanEqualExpression) {
-                return getExpression(binOp, OpType.OP_GE);
-            } else if(binOp instanceof LessThanExpression) {
-                return getExpression(binOp, OpType.OP_LT);
-            } else if(binOp instanceof LessThanEqualExpression) {
-                return getExpression(binOp, OpType.OP_LE);
-            } else if(binOp instanceof RegexExpression) {
-                return getExpression(binOp, OpType.OP_MATCH);
-            } else {
-                logInternalErrorAndSetFlag();
-            }
-        }
-        return null;
-    }
-
-    private Expression getExpression(BinaryExpression binOp, OpType
-            opType) throws FrontendException {
-        return new Expression.BinaryExpression(getExpression(binOp.getLhs())
-                , getExpression(binOp.getRhs()), opType);
-    }
-
-    private void logInternalErrorAndSetFlag() throws FrontendException {
-        LOG.warn("No partition filter push down: "
-                + "Internal error while processing any partition filter "
-                + "conditions in the filter after the load");
-        canPushDown = false;
-    }
-
-    // this might get called from some visit() - in that case, delegate to
-    // the other visit()s which we have defined here
-    private void visit(LogicalExpression op) throws FrontendException {
-        if(op instanceof ProjectExpression) {
-            visit((ProjectExpression)op);
-        } else if (op instanceof BinaryExpression) {
-            visit((BinaryExpression)op);
-        } else if (op instanceof CastExpression) {
-            visit((CastExpression)op);
-        } else if (op instanceof BinCondExpression) {
-            visit((BinCondExpression)op);
-        } else if (op instanceof UserFuncExpression) {
-            visit((UserFuncExpression)op);
-        } else if (op instanceof IsNullExpression) {
-            visit((IsNullExpression)op);
-        } else if( op instanceof NotExpression ) {
-            visit( (NotExpression)op );
-        } else if( op instanceof RegexExpression ) {
-            visit( (RegexExpression)op );
-        } else if (op instanceof MapLookupExpression) {
-            visit((MapLookupExpression) op);
-        } else if (op instanceof DereferenceExpression) {
-            visit((DereferenceExpression) op);
-        }
-    }
-
-    // some specific operators which are of interest to catch some
-    // unsupported scenarios
-    private void visit(CastExpression cast) throws FrontendException {
-        visit(cast.getExpression());
-    }
-
-    private void visit(NotExpression not) throws FrontendException {
-        visit(not.getExpression());
-    }
-
-    private void visit(RegexExpression regexp) throws FrontendException {
-        visit((BinaryExpression)regexp);
-    }
-
-    private void visit(BinCondExpression binCond) throws FrontendException {
-        visit(binCond.getCondition());
-        visit(binCond.getLhs());
-        visit(binCond.getRhs());
-    }
-
-    private void visit(UserFuncExpression udf) throws FrontendException {
-        for (LogicalExpression op : udf.getArguments()) {
-            visit(op);
-        }
-    }
-
-    private void visit(IsNullExpression isNull) throws FrontendException {
-        visit(isNull.getExpression());
-    }
-
-    private void visit(MapLookupExpression mapLookup) throws FrontendException {
-        visit(mapLookup.getMap());
-    }
-
-    private void visit(DereferenceExpression deref) throws FrontendException {
-        visit(deref.getReferredExpression());
-    }
-
-    public boolean canPushDown() {
-        return canPushDown;
-    }
-
-}

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Thu Nov 28 05:01:47 2013
@@ -127,15 +127,6 @@ public class LogicalPlanOptimizer extend
         // This set of rules push partition filter to LoadFunc
         s = new HashSet<Rule>();
         // Optimize partition filter
-        r = new PartitionFilterOptimizer("NewPartitionFilterOptimizer");
-        checkAndAddRule(s, r);
-        if (!s.isEmpty())
-            ls.add(s);
-
-        // Partition filter set
-        // This set of rules push partition filter to LoadFunc
-        s = new HashSet<Rule>();
-        // Optimize partition filter
         r = new PartitionFilterOptimizer("PartitionFilterOptimizer");
         checkAndAddRule(s, r);
         if (!s.isEmpty())

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java Thu Nov 28 05:01:47 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.newplan.logical.optimizer;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
@@ -47,138 +46,121 @@ import org.apache.pig.newplan.logical.re
 public class LogicalPlanPrinter extends PlanVisitor {
 
     private PrintStream mStream = null;
-    private String TAB1 = "    ";
-    private String TABMore = "|   ";
-    private String LSep = "|\n|---";
-    private String USep = "|   |\n|   ";
+    private byte[] TAB1 = "    ".getBytes();
+    private byte[] TABMore = "|   ".getBytes();
+    private byte[] Bar = "|\n".getBytes();
+    private byte[] LSep = "|---".getBytes();
+    private byte[] USep = "|   |\n".getBytes();
     static public String SEPERATE = "\t";
 
+    protected ArrayList<byte[]> tabs;
+    protected boolean reverse = false;
+
     /**
      * @param ps PrintStream to output plan information to
      * @param plan Logical plan to print
      */
     public LogicalPlanPrinter(OperatorPlan plan, PrintStream ps) throws FrontendException {
+        this(plan, ps, new ArrayList<byte[]>());
+    }
+
+    private LogicalPlanPrinter(OperatorPlan plan, PrintStream ps, ArrayList<byte[]> tabs) throws FrontendException {
         super(plan, null);
         mStream = ps;
+        this.tabs = tabs;
+        if (plan instanceof LogicalPlan) {
+            reverse = false;
+        }
+        else {
+            reverse = true;
+        }
     }
 
     @Override
     public void visit() throws FrontendException {
         try {
-            if (plan instanceof LogicalPlan) {
-                mStream.write(depthFirstLP().getBytes());
-            }
-            else {
-                mStream.write(reverseDepthFirstLP().getBytes());
-            }
+            depthFirstLP();
         } catch (IOException e) {
             throw new FrontendException(e);
         }
     }
 
-    protected String depthFirstLP() throws FrontendException, IOException {
-        StringBuilder sb = new StringBuilder();
-        List<Operator> leaves = plan.getSinks();
+    protected void depthFirstLP() throws FrontendException, IOException {
+        List<Operator> leaves;
+        if(reverse) {
+            leaves = plan.getSources();
+        } else {
+            leaves = plan.getSinks();
+        }
         for (Operator leaf : leaves) {
-            sb.append(depthFirst(leaf));
-            sb.append("\n");
+            writeWithTabs((leaf.toString()+"\n").getBytes());
+            depthFirst(leaf);
         }
-        return sb.toString();
     }
-    
-    private String depthFirst(Operator node) throws FrontendException, IOException {
-        String nodeString = printNode(node);
-        
-        List<Operator> originalPredecessors =  plan.getPredecessors(node);
-        if (originalPredecessors == null)
-            return nodeString;
-        
-        StringBuffer sb = new StringBuffer(nodeString);
-        List<Operator> predecessors =  new ArrayList<Operator>(originalPredecessors);
-        
-        int i = 0;
-        for (Operator pred : predecessors) {
-            i++;
-            String DFStr = depthFirst(pred);
-            if (DFStr != null) {
-                sb.append(LSep);
-                if (i < predecessors.size())
-                    sb.append(shiftStringByTabs(DFStr, 2));
-                else
-                    sb.append(shiftStringByTabs(DFStr, 1));
-            }
+
+    private void writeWithTabs(byte[] data) throws IOException {
+        for(byte[] tab : tabs) {
+            mStream.write(tab);
         }
-        return sb.toString();
+        mStream.write(data);
     }
-    
-    protected String reverseDepthFirstLP() throws FrontendException, IOException {
-        StringBuilder sb = new StringBuilder();
-        List<Operator> roots = plan.getSources();
-        for (Operator root : roots) {
-            sb.append(reverseDepthFirst(root));
-            sb.append("\n");
-        }
-        return sb.toString();
-    }
-    
-    private String reverseDepthFirst(Operator node) throws FrontendException, IOException {
-        String nodeString = printNode(node);
-        
-        List<Operator> originalSuccessors =  plan.getSuccessors(node);
-        if (originalSuccessors == null)
-            return nodeString;
-        
-        StringBuffer sb = new StringBuffer(nodeString);
-        List<Operator> successors =  new ArrayList<Operator>(originalSuccessors);
-        
+
+    private void depthFirst(Operator node) throws FrontendException, IOException {
+        printNodePlan(node);
+        List<Operator> operators;
+
+        if(reverse) {
+            operators = plan.getSuccessors(node);
+        } else {
+            operators =  plan.getPredecessors(node);
+        }
+        if (operators == null)
+            return;
+
+        List<Operator> predecessors =  new ArrayList<Operator>(operators);
+
         int i = 0;
-        for (Operator succ : successors) {
+        for (Operator pred : predecessors) {
             i++;
-            String DFStr = reverseDepthFirst(succ);
-            if (DFStr != null) {
-                sb.append(LSep);
-                if (i < successors.size())
-                    sb.append(shiftStringByTabs(DFStr, 2));
-                else
-                    sb.append(shiftStringByTabs(DFStr, 1));
-            }
+            writeWithTabs(Bar);
+            writeWithTabs(LSep);
+            mStream.write((pred.toString()+"\n").getBytes());
+            if (i < predecessors.size()) {
+                tabs.add(TABMore);
+            } else {
+                tabs.add(TAB1);
+            }
+            depthFirst(pred);
+            tabs.remove(tabs.size() - 1);
         }
-        return sb.toString();
     }
-    
-    private String planString(OperatorPlan lp) throws VisitorException, IOException {
-        StringBuilder sb = new StringBuilder();
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PrintStream ps = new PrintStream(baos);
+
+    private void printPlan(OperatorPlan lp) throws VisitorException, IOException {
+        writeWithTabs(USep);
+        tabs.add(TABMore);
         if(lp!=null) {
-            LogicalPlanPrinter printer = new LogicalPlanPrinter(lp, ps);
+            LogicalPlanPrinter printer = new LogicalPlanPrinter(lp, mStream, tabs);
             printer.visit();
         }
-        else
-            return "";
-        sb.append(USep);
-        sb.append(shiftStringByTabs(baos.toString(), 2));
-        return sb.toString();
-    }
-    
-    private String printNode(Operator node) throws FrontendException, IOException {
-        StringBuilder sb = new StringBuilder(node.toString()+"\n");
-        
+        tabs.remove(tabs.size() - 1);
+    }
+
+    private void printNodePlan(Operator node) throws FrontendException, IOException {
         if(node instanceof LOFilter){
-            sb.append(planString(((LOFilter)node).getFilterPlan()));
+            printPlan(((LOFilter)node).getFilterPlan());
         }
         else if(node instanceof LOLimit){
-            sb.append(planString(((LOLimit)node).getLimitPlan()));
+            printPlan(((LOLimit)node).getLimitPlan());
         }
         else if(node instanceof LOForEach){
-            sb.append(planString(((LOForEach)node).getInnerPlan()));        
+            printPlan(((LOForEach)node).getInnerPlan());        
         }
         else if(node instanceof LOCogroup){
             MultiMap<Integer, LogicalExpressionPlan> plans = ((LOCogroup)node).getExpressionPlans();
             for (int i : plans.keySet()) {
                 // Visit the associated plans
                 for (OperatorPlan plan : plans.get(i)) {
-                    sb.append(planString(plan));
+                    printPlan(plan);
                 }
             }
         }
@@ -187,44 +169,28 @@ public class LogicalPlanPrinter extends 
             for (int i: plans.keySet()) {
                 // Visit the associated plans
                 for (OperatorPlan plan : plans.get(i)) {
-                    sb.append(planString(plan));
+                    printPlan(plan);
                 }
             }
         }
         else if(node instanceof LORank){
             // Visit fields for rank
             for (OperatorPlan plan : ((LORank)node).getRankColPlans())
-                sb.append(planString(plan));
+                printPlan(plan);
         }
         else if(node instanceof LOSort){
             for (OperatorPlan plan : ((LOSort)node).getSortColPlans())
-                sb.append(planString(plan));
+                printPlan(plan);
         }
         else if(node instanceof LOSplitOutput){
-            sb.append(planString(((LOSplitOutput)node).getFilterPlan()));
+            printPlan(((LOSplitOutput)node).getFilterPlan());
         }
         else if(node instanceof LOGenerate){
             for (OperatorPlan plan : ((LOGenerate)node).getOutputPlans()) {
-                sb.append(planString(plan));
+                printPlan(plan);
             }
         }
-        return sb.toString();
-    }
-
-    private String shiftStringByTabs(String DFStr, int TabType) {
-        StringBuilder sb = new StringBuilder();
-        String[] spl = DFStr.split("\n");
-
-        String tab = (TabType == 1) ? TAB1 : TABMore;
-
-        sb.append(spl[0] + "\n");
-        for (int i = 1; i < spl.length; i++) {
-            sb.append(tab);
-            sb.append(spl[i]);
-            sb.append("\n");
-        }
-        return sb.toString();
     }
 }
 
-        
+

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Thu Nov 28 05:01:47 2013
@@ -35,7 +35,6 @@ import org.apache.pig.newplan.FilterExtr
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.OperatorSubPlan;
-import org.apache.pig.newplan.PColFilterExtractor;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOLoad;
@@ -96,43 +95,7 @@ public class PartitionFilterOptimizer ex
 
     @Override
     public Transformer getNewTransformer() {
-        if(name.equals("PartitionFilterOptimizer")) {
-            return new PartitionFilterPushDownTransformer();
-        } else {
-            return new NewPartitionFilterPushDownTransformer();
-        }
-    }
-
-    public class NewPartitionFilterPushDownTransformer extends PartitionFilterPushDownTransformer {
-        @Override
-        public void transform(OperatorPlan matched) throws FrontendException {
-            subPlan = new OperatorSubPlan( currentPlan );
-
-            setupColNameMaps();
-
-            FilterExtractor filterFinder = new FilterExtractor(
-                    loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) );
-            filterFinder.visit();
-            Expression partitionFilter = filterFinder.getPColCondition();
-
-            if(partitionFilter != null) {
-                // the column names in the filter may be the ones provided by
-                // the user in the schema in the load statement - we may need
-                // to replace them with partition column names as given by
-                // LoadFunc.getSchema()
-                updateMappedColNames(partitionFilter);
-                try {
-                    loadMetadata.setPartitionFilter(partitionFilter);
-                } catch (IOException e) {
-                    throw new FrontendException( e );
-                }
-                if(filterFinder.isFilterRemovable()) {
-                    currentPlan.removeAndReconnect( loFilter );
-                } else {
-                    loFilter.setFilterPlan(filterFinder.getFilteredPlan());
-                }
-            }
-        }
+        return new PartitionFilterPushDownTransformer();
     }
 
     public class PartitionFilterPushDownTransformer extends Transformer {
@@ -182,33 +145,26 @@ public class PartitionFilterOptimizer ex
 
         	setupColNameMaps();
         	
-        	// PIG-1871: Don't throw exception if partition filters cannot be pushed up. 
-        	// Perform transformation on a copy of the filter plan, and replace the 
-        	// original filter plan only if the transformation is successful 
-        	// (i.e. partition filter can be pushed down) 
-        	LogicalExpressionPlan filterExpr = loFilter.getFilterPlan();
-        	LogicalExpressionPlan filterExprCopy = filterExpr.deepCopy();
-        	
-        	PColFilterExtractor pColFilterFinder = new PColFilterExtractor(
-        	        filterExprCopy, getMappedKeys( partitionKeys ) );
-        	pColFilterFinder.visit();
-        	Expression partitionFilter = pColFilterFinder.getPColCondition();
-        	
-        	if(partitionFilter != null) {
-        		// the column names in the filter may be the ones provided by
-        		// the user in the schema in the load statement - we may need
-        		// to replace them with partition column names as given by
-        		// LoadFunc.getSchema()
-        		updateMappedColNames(partitionFilter);
-        		try {
-					loadMetadata.setPartitionFilter(partitionFilter);
-				} catch (IOException e) {
-					throw new FrontendException( e );
-				}
-        		if(pColFilterFinder.isFilterRemovable()) {  
-        			currentPlan.removeAndReconnect( loFilter );
-        		} else {
-                    loFilter.setFilterPlan(filterExprCopy);
+        	FilterExtractor filterFinder = new FilterExtractor(
+                    loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) );
+            filterFinder.visit();
+            Expression partitionFilter = filterFinder.getPColCondition();
+
+            if(partitionFilter != null) {
+                // the column names in the filter may be the ones provided by
+                // the user in the schema in the load statement - we may need
+                // to replace them with partition column names as given by
+                // LoadFunc.getSchema()
+                updateMappedColNames(partitionFilter);
+                try {
+                    loadMetadata.setPartitionFilter(partitionFilter);
+                } catch (IOException e) {
+                    throw new FrontendException( e );
+                }
+                if(filterFinder.isFilterRemovable()) {
+                    currentPlan.removeAndReconnect( loFilter );
+                } else {
+                    loFilter.setFilterPlan(filterFinder.getFilteredPlan());
                 }
             }
         }

Modified: pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu Nov 28 05:01:47 2013
@@ -1759,15 +1759,17 @@ alias_col_ref[LogicalExpressionPlan plan
            throw new PlanGenerationFailureException( input, loc, e );
        }
 
-       Operator op = builder.lookupOperator( alias );
-       if( op != null && ( schema == null || schema.getFieldPosition( alias ) == -1 ) ) {
-           $expr = new ScalarExpression( plan, op,
-               inForeachPlan ? $foreach_clause::foreachOp : $GScope::currentOp );
-           $expr.setLocation( loc );
+       // PIG-3581
+       // check within foreach scope before looking at outer scope for scalar
+       if( inForeachPlan && ($foreach_plan::operators).containsKey(alias)) {
+           $expr = builder.buildProjectExpr( loc, $plan, $GScope::currentOp,
+               $foreach_plan::operators, $foreach_plan::exprPlans, alias, 0 );
        } else {
-           if( inForeachPlan ) {
-               $expr = builder.buildProjectExpr( loc, $plan, $GScope::currentOp,
-                    $foreach_plan::operators, $foreach_plan::exprPlans, alias, 0 );
+           Operator op = builder.lookupOperator( alias );
+           if( op != null && ( schema == null || schema.getFieldPosition( alias ) == -1 ) ) {
+               $expr = new ScalarExpression( plan, op,
+                   inForeachPlan ? $foreach_clause::foreachOp : $GScope::currentOp );
+               $expr.setLocation( loc );
            } else {
                $expr = builder.buildProjectExpr( loc, $plan, $GScope::currentOp,
                    $statement::inputIndex, alias, 0 );

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Nov 28 05:01:47 2013
@@ -281,9 +281,7 @@ public abstract class ScriptState {
         try {
             String line = reader.readLine();
             while (line != null) {
-                if (line.length() > 0) {
-                    sb.append(line).append("\n");
-                }
+                sb.append(line).append("\n");
                 line = reader.readLine();
             }
         } catch (IOException e) {

Propchange: pig/branches/tez/src/pig-default.properties
------------------------------------------------------------------------------
  Merged /pig/trunk/src/pig-default.properties:r1543105-1546284

Modified: pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java Thu Nov 28 05:01:47 2013
@@ -1861,7 +1861,7 @@ public class TestBuiltin {
         r = func.exec(t4);
         assertEquals("m", r);
 
-        func = new REGEX_EXTRACT(true);
+        func = new REGEX_EXTRACT("true");
         r = func.exec(t4);
         assertEquals("match", r);
 
@@ -1903,7 +1903,7 @@ public class TestBuiltin {
         assertEquals("t", re.get(0));
         assertEquals("his is a match", re.get(1));
 
-        funce = new REGEX_EXTRACT_ALL(false);
+        funce = new REGEX_EXTRACT_ALL("false");
         re = funce.exec(te1);
         assertEquals(re.size(), 2);
         assertEquals("t", re.get(0));

Modified: pig/branches/tez/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java Thu Nov 28 05:01:47 2013
@@ -798,7 +798,7 @@ public class TestNewPartitionFilterPushD
 
             Set<Rule> s = new HashSet<Rule>();
             // add split filter rule
-            Rule r = new PartitionFilterOptimizer("NewPartitionFilterPushDown");
+            Rule r = new PartitionFilterOptimizer("PartitionFilterPushDown");
             s = new HashSet<Rule>();
             s.add(r);
             ls.add(s);