You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/28 06:01:48 UTC
svn commit: r1546285 [2/3] - in /pig/branches/tez: ./ conf/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/data/jh/
contrib/piggybank/java/src/test/java/...
Modified: pig/branches/tez/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/PColFilterExtractor.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/PColFilterExtractor.java Thu Nov 28 05:01:47 2013
@@ -1,600 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.newplan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.Expression;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.util.Pair;
-
-import org.apache.pig.Expression.OpType;
-import org.apache.pig.newplan.logical.expression.AddExpression;
-import org.apache.pig.newplan.logical.expression.AndExpression;
-import org.apache.pig.newplan.logical.expression.BinCondExpression;
-import org.apache.pig.newplan.logical.expression.BinaryExpression;
-import org.apache.pig.newplan.logical.expression.CastExpression;
-import org.apache.pig.newplan.logical.expression.ConstantExpression;
-import org.apache.pig.newplan.logical.expression.DereferenceExpression;
-import org.apache.pig.newplan.logical.expression.DivideExpression;
-import org.apache.pig.newplan.logical.expression.EqualExpression;
-import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
-import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
-import org.apache.pig.newplan.logical.expression.IsNullExpression;
-import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
-import org.apache.pig.newplan.logical.expression.LessThanExpression;
-import org.apache.pig.newplan.logical.expression.LogicalExpression;
-import org.apache.pig.newplan.logical.expression.MapLookupExpression;
-import org.apache.pig.newplan.logical.expression.ModExpression;
-import org.apache.pig.newplan.logical.expression.MultiplyExpression;
-import org.apache.pig.newplan.logical.expression.NotEqualExpression;
-import org.apache.pig.newplan.logical.expression.NotExpression;
-import org.apache.pig.newplan.logical.expression.OrExpression;
-import org.apache.pig.newplan.logical.expression.ProjectExpression;
-import org.apache.pig.newplan.logical.expression.RegexExpression;
-import org.apache.pig.newplan.logical.expression.SubtractExpression;
-import org.apache.pig.newplan.logical.expression.UserFuncExpression;
-import org.apache.pig.newplan.DepthFirstWalker;
-
-/**
- * This Visitor works on the filter condition of a LOFilter which immediately
- * follows a LOLoad that interacts with a metadata system (currently OWL) to
- * read table data. The visitor looks for conditions on partition columns in the
- * filter condition and extracts those conditions out of the filter condition.
- * The condition on partition cols will be used to prune partitions of the table.
- *
- */
-@Deprecated
-public class PColFilterExtractor extends PlanVisitor {
-
- private static final Log LOG = LogFactory.getLog(PColFilterExtractor.class);
-
- /**
- * partition columns associated with the table
- * present in the load on which the filter whose
- * inner plan is being visited is applied
- */
- private List<String> partitionCols;
-
- /**
- * will contain the partition column filter conditions
- * accumulated during the visit - the final condition will an expression
- * built from these sub expressions connected with AND
- */
- private ArrayList<Expression> pColConditions = new ArrayList<Expression>();
-
- /**
- * flag used during visit to indicate if a partition key
- * was seen
- */
- private boolean sawKey;
-
- private boolean sawNonKeyCol;
-
- private enum Side { LEFT, RIGHT, NONE };
- private Side replaceSide = Side.NONE;
-
- private boolean filterRemovable = false;
-
- private boolean canPushDown = true;
-
- @Override
- public void visit() throws FrontendException {
- // we will visit the leaf and it will recursively walk the plan
- LogicalExpression leaf = (LogicalExpression)plan.getSources().get( 0 );
- // if the leaf is a unary operator it should be a FilterFunc in
- // which case we don't try to extract partition filter conditions
- if(leaf instanceof BinaryExpression) {
- BinaryExpression binExpr = (BinaryExpression)leaf;
- visit( binExpr );
- replaceChild( binExpr );
- // if the entire expression is to be removed, then the above
- // replaceChild will not set sawKey to false (sawKey is set to
- // false only in replaceChild()
- if(sawKey == true) {
- //there are only conditions on partition columns in the filter
- //extract it
- pColConditions.add( getExpression( leaf ) );
- filterRemovable = true;
- }
- }
- }
-
- /**
- *
- * @param plan logical plan corresponding the filter's comparison condition
- * @param partitionCols list of partition columns of the table which is
- * being loaded in the LOAD statement which is input to the filter
- */
- public PColFilterExtractor(OperatorPlan plan,
- List<String> partitionCols) {
- // though we configure a DepthFirstWalker to be the walker, we will not
- // use it - we will visit the leaf and it will recursively walk the
- // plan
- super( plan, new DepthFirstWalker( plan ) );
- this.partitionCols = new ArrayList<String>(partitionCols);
- }
-
- protected void visit(ProjectExpression project) throws FrontendException {
- String fieldName = project.getFieldSchema().alias;
- if(partitionCols.contains(fieldName)) {
- sawKey = true;
- // The condition on partition column will be used to prune the
- // scan and removed from the filter condition. Hence the condition
- // on the partition column will not be re applied when data is read,
- // so the following cases should throw error until that changes.
- List<Class<?>> opsToCheckFor = new ArrayList<Class<?>>();
- opsToCheckFor.add(UserFuncExpression.class);
- if(checkSuccessors(project, opsToCheckFor)) {
- LOG.warn("No partition filter push down: " +
- "You have an partition column ("
- + fieldName + ") inside a function in the " +
- "filter condition.");
- canPushDown = false;
- return;
- }
- opsToCheckFor.set(0, CastExpression.class);
- if(checkSuccessors(project, opsToCheckFor)) {
- LOG.warn("No partition filter push down: " +
- "You have an partition column ("
- + fieldName + ") inside a cast in the " +
- "filter condition.");
- canPushDown = false;
- return;
- }
- opsToCheckFor.set(0, IsNullExpression.class);
- if(checkSuccessors(project, opsToCheckFor)) {
- LOG.warn("No partition filter push down: " +
- "You have an partition column ("
- + fieldName + ") inside a null check operator in the " +
- "filter condition.");
- canPushDown = false;
- return;
- }
- opsToCheckFor.set(0, BinCondExpression.class);
- if(checkSuccessors(project, opsToCheckFor)) {
- LOG.warn("No partition filter push down: " +
- "You have an partition column ("
- + fieldName + ") inside a bincond operator in the " +
- "filter condition.");
- canPushDown = false;
- return;
- }
- } else {
- sawNonKeyCol = true;
- }
- }
-
- /**
- * Detect whether a non-partition column is present in the expression.
- * @param binOp
- * @return true or false
- * @throws FrontendException
- */
- private boolean detectNonPartitionColumn(BinaryExpression binOp) throws FrontendException {
- LogicalExpression lhs = binOp.getLhs();
- LogicalExpression rhs = binOp.getRhs();
- if (lhs instanceof ProjectExpression) {
- String fieldName = ((ProjectExpression)lhs).getFieldSchema().alias;
- if(!partitionCols.contains(fieldName)) {
- return true;
- }
- }
- if (rhs instanceof ProjectExpression) {
- String fieldName = ((ProjectExpression)rhs).getFieldSchema().alias;
- if(!partitionCols.contains(fieldName)) {
- return true;
- }
- }
-
- boolean lhsSawNonKeyCol = false;
- boolean rhsSawNonKeyCol = false;
- if (lhs instanceof BinaryExpression) {
- lhsSawNonKeyCol = detectNonPartitionColumn((BinaryExpression)lhs);
- }
- if (rhs instanceof BinaryExpression) {
- rhsSawNonKeyCol = detectNonPartitionColumn((BinaryExpression)rhs);
- }
-
- return lhsSawNonKeyCol || rhsSawNonKeyCol;
- }
-
- /**
- * Detect and/or expressions that contain both partition and non-partition
- * conditions such as '(pcond and non-pcond) or (pcond and non-pcond)'.
- * @param binOp
- * @return true or false
- * @throws FrontendException
- */
- private boolean detectAndOrConditionWithMixedColumns(BinaryExpression binOp) throws FrontendException {
- LogicalExpression lhs = binOp.getLhs();
- LogicalExpression rhs = binOp.getRhs();
-
- if ( (binOp instanceof OrExpression) &&
- ( (lhs instanceof AndExpression && rhs instanceof AndExpression) ||
- (lhs instanceof OrExpression || rhs instanceof OrExpression) ) ) {
- return detectNonPartitionColumn(binOp);
- }
-
- return false;
- }
-
- private void visit(BinaryExpression binOp) throws FrontendException {
- boolean lhsSawKey = false;
- boolean rhsSawKey = false;
- boolean lhsSawNonKeyCol = false;
- boolean rhsSawNonKeyCol = false;
- sawKey = false;
- sawNonKeyCol = false;
-
- if (detectAndOrConditionWithMixedColumns(binOp)) {
- sawNonKeyCol = true;
- // Don't set canPushDown to false. If there are other AND
- // conditions on a partition column we want to push that down
- LOG.warn("No partition filter push down: You have partition and non-partition "
- + "columns in a construction like: "
- + "(pcond and non-pcond ..) or (pcond and non-pcond ...) "
- + "where pcond is a condition on a partition column and "
- + "non-pcond is a condition on a non-partition column.");
- return;
- }
-
- visit( binOp.getLhs() );
- replaceChild(binOp.getLhs());
- lhsSawKey = sawKey;
- lhsSawNonKeyCol = sawNonKeyCol;
-
- sawKey = false;
- sawNonKeyCol = false;
- visit( binOp.getRhs() );
- replaceChild(binOp.getRhs());
- rhsSawKey = sawKey;
- rhsSawNonKeyCol = sawNonKeyCol;
-
- // only in the case of an AND, we potentially split the AND to
- // remove conditions on partition columns out of the AND. For this
- // we set replaceSide accordingly so that when we reach a predecessor
- // we can trim the appropriate side. If both sides of the AND have
- // conditions on partition columns, we will remove the AND completely -
- // in this case, we will not set replaceSide, but sawKey will be
- // true so that as we go to higher predecessor ANDs we can trim later.
- if(binOp instanceof AndExpression) {
- if(lhsSawKey && rhsSawNonKeyCol){
- replaceSide = Side.LEFT;
- }else if(rhsSawKey && lhsSawNonKeyCol){
- replaceSide = Side.RIGHT;
- }
- } else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && lhsSawNonKeyCol){
- LOG.warn("No partition filter push down: " +
- "Use of partition column/condition with" +
- " non partition column/condition in filter expression is not " +
- "supported.");
- canPushDown = false;
- }
-
- sawKey = lhsSawKey || rhsSawKey;
- sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
- }
-
- /**
- * @return the condition on partition columns extracted from filter
- */
- public Expression getPColCondition(){
- if(!canPushDown || pColConditions.size() == 0)
- return null;
- Expression cond = pColConditions.get(0);
- for(int i=1; i<pColConditions.size(); i++){
- //if there is more than one condition expression
- // connect them using "AND"s
- cond = new Expression.BinaryExpression(cond, pColConditions.get(i),
- OpType.OP_AND);
- }
- return cond;
- }
-
- /**
- * @return the filterRemovable
- */
- public boolean isFilterRemovable() {
- return canPushDown && filterRemovable;
- }
-
- //////// helper methods /////////////////////////
- /**
- * check for the presence of a certain operator type in the Successors
- * @param opToStartFrom
- * @param opsToCheckFor operators to be checked for at each level of
- * Successors - the ordering in the list is the order in which the ops
- * will be checked.
- * @return true if opsToCheckFor are found
- * @throws IOException
- */
- private boolean checkSuccessors(Operator opToStartFrom,
- List<Class<?>> opsToCheckFor) throws FrontendException {
- boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
- if(!done && !opsToCheckFor.isEmpty()) {
- // continue checking if there is more to check
- while(!done) {
- opToStartFrom = plan.getPredecessors(opToStartFrom).get(0);
- done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
- }
- }
- return opsToCheckFor.isEmpty();
- }
-
- private boolean checkSuccessorsHelper(Operator opToStartFrom,
- List<Class<?>> opsToCheckFor) throws FrontendException {
- List<Operator> successors = plan.getPredecessors(
- opToStartFrom);
- if(successors == null || successors.size() == 0) {
- return true; // further checking cannot be done
- }
- if(successors.size() == 1) {
- Operator suc = successors.get(0);
- if(suc.getClass().getCanonicalName().equals(
- opsToCheckFor.get(0).getCanonicalName())) {
- // trim the list of operators to check
- opsToCheckFor.remove(0);
- if(opsToCheckFor.isEmpty()) {
- return true; //no further checks required
- }
- }
- } else {
- logInternalErrorAndSetFlag();
- }
- return false; // more checking can be done
- }
-
- private void replaceChild(LogicalExpression childExpr) throws FrontendException {
-
- if(replaceSide == Side.NONE) {
- // the child is trimmed when the appropriate
- // flag is set to indicate that it needs to be trimmed.
- return;
- }
-
- // eg if replaceSide == Side.LEFT
- // binexpop
- // / \ \
- // child (this is the childExpr argument send in)
- // / \
- // Lt Rt
- //
- // gets converted to
- // binexpop
- // /
- // Rt
-
- if( !( childExpr instanceof BinaryExpression ) ) {
- logInternalErrorAndSetFlag();
- return;
- }
- // child's lhs operand
- LogicalExpression leftChild =
- ((BinaryExpression)childExpr).getLhs();
- // child's rhs operand
- LogicalExpression rightChild =
- ((BinaryExpression)childExpr).getRhs();
-
- plan.disconnect( childExpr, leftChild );
- plan.disconnect( childExpr, rightChild );
-
- if(replaceSide == Side.LEFT) {
- // remove left child and replace childExpr with its right child
- remove( leftChild );
- replace(childExpr, rightChild);
- } else if(replaceSide == Side.RIGHT){
- // remove right child and replace childExpr with its left child
- remove(rightChild);
- replace(childExpr, leftChild);
- } else {
- logInternalErrorAndSetFlag();
- return;
- }
- //reset
- replaceSide = Side.NONE;
- sawKey = false;
-
- }
-
- private void replace(Operator oldOp, Operator newOp) throws FrontendException {
- List<Operator> grandParents = plan.getPredecessors( oldOp );
- if( grandParents == null || grandParents.size() == 0 ) {
- plan.remove( oldOp );
- return;
- }
- Operator grandParent = plan.getPredecessors( oldOp ).get( 0 );
- Pair<Integer, Integer> pair = plan.disconnect( grandParent, oldOp );
- plan.add( newOp );
- plan.connect( grandParent, pair.first, newOp, pair.second );
- plan.remove( oldOp );
- }
-
- /**
- * @param op
- * @throws IOException
- * @throws IOException
- * @throws IOException
- */
- private void remove(LogicalExpression op) throws FrontendException {
- pColConditions.add( getExpression( op ) );
- removeTree( op );
- }
-
- /**
- * Assume that the given operator is already disconnected from its predecessors.
- * @param op
- * @throws FrontendException
- */
- private void removeTree(Operator op) throws FrontendException {
- List<Operator> succs = plan.getSuccessors( op );
- if( succs == null ) {
- plan.remove( op );
- return;
- }
-
- Operator[] children = new Operator[succs.size()];
- for( int i = 0; i < succs.size(); i++ ) {
- children[i] = succs.get(i);
- }
-
- for( Operator succ : children ) {
- plan.disconnect( op, succ );
- removeTree( succ );
- }
-
- plan.remove( op );
- }
-
- public Expression getExpression(LogicalExpression op) throws FrontendException
- {
- if(op instanceof ConstantExpression) {
- ConstantExpression constExpr =(ConstantExpression)op ;
- return new Expression.Const( constExpr.getValue() );
- } else if (op instanceof ProjectExpression) {
- ProjectExpression projExpr = (ProjectExpression)op;
- String fieldName = projExpr.getFieldSchema().alias;
- return new Expression.Column(fieldName);
- } else {
- if( !( op instanceof BinaryExpression ) ) {
- logInternalErrorAndSetFlag();
- return null;
- }
- BinaryExpression binOp = (BinaryExpression)op;
- if(binOp instanceof AddExpression) {
- return getExpression( binOp, OpType.OP_PLUS );
- } else if(binOp instanceof SubtractExpression) {
- return getExpression(binOp, OpType.OP_MINUS);
- } else if(binOp instanceof MultiplyExpression) {
- return getExpression(binOp, OpType.OP_TIMES);
- } else if(binOp instanceof DivideExpression) {
- return getExpression(binOp, OpType.OP_DIV);
- } else if(binOp instanceof ModExpression) {
- return getExpression(binOp, OpType.OP_MOD);
- } else if(binOp instanceof AndExpression) {
- return getExpression(binOp, OpType.OP_AND);
- } else if(binOp instanceof OrExpression) {
- return getExpression(binOp, OpType.OP_OR);
- } else if(binOp instanceof EqualExpression) {
- return getExpression(binOp, OpType.OP_EQ);
- } else if(binOp instanceof NotEqualExpression) {
- return getExpression(binOp, OpType.OP_NE);
- } else if(binOp instanceof GreaterThanExpression) {
- return getExpression(binOp, OpType.OP_GT);
- } else if(binOp instanceof GreaterThanEqualExpression) {
- return getExpression(binOp, OpType.OP_GE);
- } else if(binOp instanceof LessThanExpression) {
- return getExpression(binOp, OpType.OP_LT);
- } else if(binOp instanceof LessThanEqualExpression) {
- return getExpression(binOp, OpType.OP_LE);
- } else if(binOp instanceof RegexExpression) {
- return getExpression(binOp, OpType.OP_MATCH);
- } else {
- logInternalErrorAndSetFlag();
- }
- }
- return null;
- }
-
- private Expression getExpression(BinaryExpression binOp, OpType
- opType) throws FrontendException {
- return new Expression.BinaryExpression(getExpression(binOp.getLhs())
- , getExpression(binOp.getRhs()), opType);
- }
-
- private void logInternalErrorAndSetFlag() throws FrontendException {
- LOG.warn("No partition filter push down: "
- + "Internal error while processing any partition filter "
- + "conditions in the filter after the load");
- canPushDown = false;
- }
-
- // this might get called from some visit() - in that case, delegate to
- // the other visit()s which we have defined here
- private void visit(LogicalExpression op) throws FrontendException {
- if(op instanceof ProjectExpression) {
- visit((ProjectExpression)op);
- } else if (op instanceof BinaryExpression) {
- visit((BinaryExpression)op);
- } else if (op instanceof CastExpression) {
- visit((CastExpression)op);
- } else if (op instanceof BinCondExpression) {
- visit((BinCondExpression)op);
- } else if (op instanceof UserFuncExpression) {
- visit((UserFuncExpression)op);
- } else if (op instanceof IsNullExpression) {
- visit((IsNullExpression)op);
- } else if( op instanceof NotExpression ) {
- visit( (NotExpression)op );
- } else if( op instanceof RegexExpression ) {
- visit( (RegexExpression)op );
- } else if (op instanceof MapLookupExpression) {
- visit((MapLookupExpression) op);
- } else if (op instanceof DereferenceExpression) {
- visit((DereferenceExpression) op);
- }
- }
-
- // some specific operators which are of interest to catch some
- // unsupported scenarios
- private void visit(CastExpression cast) throws FrontendException {
- visit(cast.getExpression());
- }
-
- private void visit(NotExpression not) throws FrontendException {
- visit(not.getExpression());
- }
-
- private void visit(RegexExpression regexp) throws FrontendException {
- visit((BinaryExpression)regexp);
- }
-
- private void visit(BinCondExpression binCond) throws FrontendException {
- visit(binCond.getCondition());
- visit(binCond.getLhs());
- visit(binCond.getRhs());
- }
-
- private void visit(UserFuncExpression udf) throws FrontendException {
- for (LogicalExpression op : udf.getArguments()) {
- visit(op);
- }
- }
-
- private void visit(IsNullExpression isNull) throws FrontendException {
- visit(isNull.getExpression());
- }
-
- private void visit(MapLookupExpression mapLookup) throws FrontendException {
- visit(mapLookup.getMap());
- }
-
- private void visit(DereferenceExpression deref) throws FrontendException {
- visit(deref.getReferredExpression());
- }
-
- public boolean canPushDown() {
- return canPushDown;
- }
-
-}
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Thu Nov 28 05:01:47 2013
@@ -127,15 +127,6 @@ public class LogicalPlanOptimizer extend
// This set of rules push partition filter to LoadFunc
s = new HashSet<Rule>();
// Optimize partition filter
- r = new PartitionFilterOptimizer("NewPartitionFilterOptimizer");
- checkAndAddRule(s, r);
- if (!s.isEmpty())
- ls.add(s);
-
- // Partition filter set
- // This set of rules push partition filter to LoadFunc
- s = new HashSet<Rule>();
- // Optimize partition filter
r = new PartitionFilterOptimizer("PartitionFilterOptimizer");
checkAndAddRule(s, r);
if (!s.isEmpty())
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java Thu Nov 28 05:01:47 2013
@@ -17,7 +17,6 @@
*/
package org.apache.pig.newplan.logical.optimizer;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
@@ -47,138 +46,121 @@ import org.apache.pig.newplan.logical.re
public class LogicalPlanPrinter extends PlanVisitor {
private PrintStream mStream = null;
- private String TAB1 = " ";
- private String TABMore = "| ";
- private String LSep = "|\n|---";
- private String USep = "| |\n| ";
+ private byte[] TAB1 = " ".getBytes();
+ private byte[] TABMore = "| ".getBytes();
+ private byte[] Bar = "|\n".getBytes();
+ private byte[] LSep = "|---".getBytes();
+ private byte[] USep = "| |\n".getBytes();
static public String SEPERATE = "\t";
+ protected ArrayList<byte[]> tabs;
+ protected boolean reverse = false;
+
/**
* @param ps PrintStream to output plan information to
* @param plan Logical plan to print
*/
public LogicalPlanPrinter(OperatorPlan plan, PrintStream ps) throws FrontendException {
+ this(plan, ps, new ArrayList<byte[]>());
+ }
+
+ private LogicalPlanPrinter(OperatorPlan plan, PrintStream ps, ArrayList<byte[]> tabs) throws FrontendException {
super(plan, null);
mStream = ps;
+ this.tabs = tabs;
+ if (plan instanceof LogicalPlan) {
+ reverse = false;
+ }
+ else {
+ reverse = true;
+ }
}
@Override
public void visit() throws FrontendException {
try {
- if (plan instanceof LogicalPlan) {
- mStream.write(depthFirstLP().getBytes());
- }
- else {
- mStream.write(reverseDepthFirstLP().getBytes());
- }
+ depthFirstLP();
} catch (IOException e) {
throw new FrontendException(e);
}
}
- protected String depthFirstLP() throws FrontendException, IOException {
- StringBuilder sb = new StringBuilder();
- List<Operator> leaves = plan.getSinks();
+ protected void depthFirstLP() throws FrontendException, IOException {
+ List<Operator> leaves;
+ if(reverse) {
+ leaves = plan.getSources();
+ } else {
+ leaves = plan.getSinks();
+ }
for (Operator leaf : leaves) {
- sb.append(depthFirst(leaf));
- sb.append("\n");
+ writeWithTabs((leaf.toString()+"\n").getBytes());
+ depthFirst(leaf);
}
- return sb.toString();
}
-
- private String depthFirst(Operator node) throws FrontendException, IOException {
- String nodeString = printNode(node);
-
- List<Operator> originalPredecessors = plan.getPredecessors(node);
- if (originalPredecessors == null)
- return nodeString;
-
- StringBuffer sb = new StringBuffer(nodeString);
- List<Operator> predecessors = new ArrayList<Operator>(originalPredecessors);
-
- int i = 0;
- for (Operator pred : predecessors) {
- i++;
- String DFStr = depthFirst(pred);
- if (DFStr != null) {
- sb.append(LSep);
- if (i < predecessors.size())
- sb.append(shiftStringByTabs(DFStr, 2));
- else
- sb.append(shiftStringByTabs(DFStr, 1));
- }
+
+ private void writeWithTabs(byte[] data) throws IOException {
+ for(byte[] tab : tabs) {
+ mStream.write(tab);
}
- return sb.toString();
+ mStream.write(data);
}
-
- protected String reverseDepthFirstLP() throws FrontendException, IOException {
- StringBuilder sb = new StringBuilder();
- List<Operator> roots = plan.getSources();
- for (Operator root : roots) {
- sb.append(reverseDepthFirst(root));
- sb.append("\n");
- }
- return sb.toString();
- }
-
- private String reverseDepthFirst(Operator node) throws FrontendException, IOException {
- String nodeString = printNode(node);
-
- List<Operator> originalSuccessors = plan.getSuccessors(node);
- if (originalSuccessors == null)
- return nodeString;
-
- StringBuffer sb = new StringBuffer(nodeString);
- List<Operator> successors = new ArrayList<Operator>(originalSuccessors);
-
+
+ private void depthFirst(Operator node) throws FrontendException, IOException {
+ printNodePlan(node);
+ List<Operator> operators;
+
+ if(reverse) {
+ operators = plan.getSuccessors(node);
+ } else {
+ operators = plan.getPredecessors(node);
+ }
+ if (operators == null)
+ return;
+
+ List<Operator> predecessors = new ArrayList<Operator>(operators);
+
int i = 0;
- for (Operator succ : successors) {
+ for (Operator pred : predecessors) {
i++;
- String DFStr = reverseDepthFirst(succ);
- if (DFStr != null) {
- sb.append(LSep);
- if (i < successors.size())
- sb.append(shiftStringByTabs(DFStr, 2));
- else
- sb.append(shiftStringByTabs(DFStr, 1));
- }
+ writeWithTabs(Bar);
+ writeWithTabs(LSep);
+ mStream.write((pred.toString()+"\n").getBytes());
+ if (i < predecessors.size()) {
+ tabs.add(TABMore);
+ } else {
+ tabs.add(TAB1);
+ }
+ depthFirst(pred);
+ tabs.remove(tabs.size() - 1);
}
- return sb.toString();
}
-
- private String planString(OperatorPlan lp) throws VisitorException, IOException {
- StringBuilder sb = new StringBuilder();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream ps = new PrintStream(baos);
+
+ private void printPlan(OperatorPlan lp) throws VisitorException, IOException {
+ writeWithTabs(USep);
+ tabs.add(TABMore);
if(lp!=null) {
- LogicalPlanPrinter printer = new LogicalPlanPrinter(lp, ps);
+ LogicalPlanPrinter printer = new LogicalPlanPrinter(lp, mStream, tabs);
printer.visit();
}
- else
- return "";
- sb.append(USep);
- sb.append(shiftStringByTabs(baos.toString(), 2));
- return sb.toString();
- }
-
- private String printNode(Operator node) throws FrontendException, IOException {
- StringBuilder sb = new StringBuilder(node.toString()+"\n");
-
+ tabs.remove(tabs.size() - 1);
+ }
+
+ private void printNodePlan(Operator node) throws FrontendException, IOException {
if(node instanceof LOFilter){
- sb.append(planString(((LOFilter)node).getFilterPlan()));
+ printPlan(((LOFilter)node).getFilterPlan());
}
else if(node instanceof LOLimit){
- sb.append(planString(((LOLimit)node).getLimitPlan()));
+ printPlan(((LOLimit)node).getLimitPlan());
}
else if(node instanceof LOForEach){
- sb.append(planString(((LOForEach)node).getInnerPlan()));
+ printPlan(((LOForEach)node).getInnerPlan());
}
else if(node instanceof LOCogroup){
MultiMap<Integer, LogicalExpressionPlan> plans = ((LOCogroup)node).getExpressionPlans();
for (int i : plans.keySet()) {
// Visit the associated plans
for (OperatorPlan plan : plans.get(i)) {
- sb.append(planString(plan));
+ printPlan(plan);
}
}
}
@@ -187,44 +169,28 @@ public class LogicalPlanPrinter extends
for (int i: plans.keySet()) {
// Visit the associated plans
for (OperatorPlan plan : plans.get(i)) {
- sb.append(planString(plan));
+ printPlan(plan);
}
}
}
else if(node instanceof LORank){
// Visit fields for rank
for (OperatorPlan plan : ((LORank)node).getRankColPlans())
- sb.append(planString(plan));
+ printPlan(plan);
}
else if(node instanceof LOSort){
for (OperatorPlan plan : ((LOSort)node).getSortColPlans())
- sb.append(planString(plan));
+ printPlan(plan);
}
else if(node instanceof LOSplitOutput){
- sb.append(planString(((LOSplitOutput)node).getFilterPlan()));
+ printPlan(((LOSplitOutput)node).getFilterPlan());
}
else if(node instanceof LOGenerate){
for (OperatorPlan plan : ((LOGenerate)node).getOutputPlans()) {
- sb.append(planString(plan));
+ printPlan(plan);
}
}
- return sb.toString();
- }
-
- private String shiftStringByTabs(String DFStr, int TabType) {
- StringBuilder sb = new StringBuilder();
- String[] spl = DFStr.split("\n");
-
- String tab = (TabType == 1) ? TAB1 : TABMore;
-
- sb.append(spl[0] + "\n");
- for (int i = 1; i < spl.length; i++) {
- sb.append(tab);
- sb.append(spl[i]);
- sb.append("\n");
- }
- return sb.toString();
}
}
-
+
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Thu Nov 28 05:01:47 2013
@@ -35,7 +35,6 @@ import org.apache.pig.newplan.FilterExtr
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
-import org.apache.pig.newplan.PColFilterExtractor;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOLoad;
@@ -96,43 +95,7 @@ public class PartitionFilterOptimizer ex
@Override
public Transformer getNewTransformer() {
- if(name.equals("PartitionFilterOptimizer")) {
- return new PartitionFilterPushDownTransformer();
- } else {
- return new NewPartitionFilterPushDownTransformer();
- }
- }
-
- public class NewPartitionFilterPushDownTransformer extends PartitionFilterPushDownTransformer {
- @Override
- public void transform(OperatorPlan matched) throws FrontendException {
- subPlan = new OperatorSubPlan( currentPlan );
-
- setupColNameMaps();
-
- FilterExtractor filterFinder = new FilterExtractor(
- loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) );
- filterFinder.visit();
- Expression partitionFilter = filterFinder.getPColCondition();
-
- if(partitionFilter != null) {
- // the column names in the filter may be the ones provided by
- // the user in the schema in the load statement - we may need
- // to replace them with partition column names as given by
- // LoadFunc.getSchema()
- updateMappedColNames(partitionFilter);
- try {
- loadMetadata.setPartitionFilter(partitionFilter);
- } catch (IOException e) {
- throw new FrontendException( e );
- }
- if(filterFinder.isFilterRemovable()) {
- currentPlan.removeAndReconnect( loFilter );
- } else {
- loFilter.setFilterPlan(filterFinder.getFilteredPlan());
- }
- }
- }
+ return new PartitionFilterPushDownTransformer();
}
public class PartitionFilterPushDownTransformer extends Transformer {
@@ -182,33 +145,26 @@ public class PartitionFilterOptimizer ex
setupColNameMaps();
- // PIG-1871: Don't throw exception if partition filters cannot be pushed up.
- // Perform transformation on a copy of the filter plan, and replace the
- // original filter plan only if the transformation is successful
- // (i.e. partition filter can be pushed down)
- LogicalExpressionPlan filterExpr = loFilter.getFilterPlan();
- LogicalExpressionPlan filterExprCopy = filterExpr.deepCopy();
-
- PColFilterExtractor pColFilterFinder = new PColFilterExtractor(
- filterExprCopy, getMappedKeys( partitionKeys ) );
- pColFilterFinder.visit();
- Expression partitionFilter = pColFilterFinder.getPColCondition();
-
- if(partitionFilter != null) {
- // the column names in the filter may be the ones provided by
- // the user in the schema in the load statement - we may need
- // to replace them with partition column names as given by
- // LoadFunc.getSchema()
- updateMappedColNames(partitionFilter);
- try {
- loadMetadata.setPartitionFilter(partitionFilter);
- } catch (IOException e) {
- throw new FrontendException( e );
- }
- if(pColFilterFinder.isFilterRemovable()) {
- currentPlan.removeAndReconnect( loFilter );
- } else {
- loFilter.setFilterPlan(filterExprCopy);
+ FilterExtractor filterFinder = new FilterExtractor(
+ loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) );
+ filterFinder.visit();
+ Expression partitionFilter = filterFinder.getPColCondition();
+
+ if(partitionFilter != null) {
+ // the column names in the filter may be the ones provided by
+ // the user in the schema in the load statement - we may need
+ // to replace them with partition column names as given by
+ // LoadFunc.getSchema()
+ updateMappedColNames(partitionFilter);
+ try {
+ loadMetadata.setPartitionFilter(partitionFilter);
+ } catch (IOException e) {
+ throw new FrontendException( e );
+ }
+ if(filterFinder.isFilterRemovable()) {
+ currentPlan.removeAndReconnect( loFilter );
+ } else {
+ loFilter.setFilterPlan(filterFinder.getFilteredPlan());
}
}
}
Modified: pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu Nov 28 05:01:47 2013
@@ -1759,15 +1759,17 @@ alias_col_ref[LogicalExpressionPlan plan
throw new PlanGenerationFailureException( input, loc, e );
}
- Operator op = builder.lookupOperator( alias );
- if( op != null && ( schema == null || schema.getFieldPosition( alias ) == -1 ) ) {
- $expr = new ScalarExpression( plan, op,
- inForeachPlan ? $foreach_clause::foreachOp : $GScope::currentOp );
- $expr.setLocation( loc );
+ // PIG-3581
+ // check within foreach scope before looking at outer scope for scalar
+ if( inForeachPlan && ($foreach_plan::operators).containsKey(alias)) {
+ $expr = builder.buildProjectExpr( loc, $plan, $GScope::currentOp,
+ $foreach_plan::operators, $foreach_plan::exprPlans, alias, 0 );
} else {
- if( inForeachPlan ) {
- $expr = builder.buildProjectExpr( loc, $plan, $GScope::currentOp,
- $foreach_plan::operators, $foreach_plan::exprPlans, alias, 0 );
+ Operator op = builder.lookupOperator( alias );
+ if( op != null && ( schema == null || schema.getFieldPosition( alias ) == -1 ) ) {
+ $expr = new ScalarExpression( plan, op,
+ inForeachPlan ? $foreach_clause::foreachOp : $GScope::currentOp );
+ $expr.setLocation( loc );
} else {
$expr = builder.buildProjectExpr( loc, $plan, $GScope::currentOp,
$statement::inputIndex, alias, 0 );
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Nov 28 05:01:47 2013
@@ -281,9 +281,7 @@ public abstract class ScriptState {
try {
String line = reader.readLine();
while (line != null) {
- if (line.length() > 0) {
- sb.append(line).append("\n");
- }
+ sb.append(line).append("\n");
line = reader.readLine();
}
} catch (IOException e) {
Propchange: pig/branches/tez/src/pig-default.properties
------------------------------------------------------------------------------
Merged /pig/trunk/src/pig-default.properties:r1543105-1546284
Modified: pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java Thu Nov 28 05:01:47 2013
@@ -1861,7 +1861,7 @@ public class TestBuiltin {
r = func.exec(t4);
assertEquals("m", r);
- func = new REGEX_EXTRACT(true);
+ func = new REGEX_EXTRACT("true");
r = func.exec(t4);
assertEquals("match", r);
@@ -1903,7 +1903,7 @@ public class TestBuiltin {
assertEquals("t", re.get(0));
assertEquals("his is a match", re.get(1));
- funce = new REGEX_EXTRACT_ALL(false);
+ funce = new REGEX_EXTRACT_ALL("false");
re = funce.exec(te1);
assertEquals(re.size(), 2);
assertEquals("t", re.get(0));
Modified: pig/branches/tez/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java?rev=1546285&r1=1546284&r2=1546285&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java Thu Nov 28 05:01:47 2013
@@ -798,7 +798,7 @@ public class TestNewPartitionFilterPushD
Set<Rule> s = new HashSet<Rule>();
// add split filter rule
- Rule r = new PartitionFilterOptimizer("NewPartitionFilterPushDown");
+ Rule r = new PartitionFilterOptimizer("PartitionFilterPushDown");
s = new HashSet<Rule>();
s.add(r);
ls.add(s);