You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/03/06 22:26:30 UTC
svn commit: r1664724 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
Author: rohini
Date: Fri Mar 6 21:26:29 2015
New Revision: 1664724
URL: http://svn.apache.org/r1664724
Log:
PIG-4451: Log partition and predicate filter pushdown information and fix optimizer looping (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1664724&r1=1664723&r2=1664724&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Mar 6 21:26:29 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4451: Log partition and predicate filter pushdown information and fix optimizer looping (rohini)
+
PIG-4430: Pig should support reading log4j.properties file from classpath as well (rdsr via daijy)
PIG-4407: Allow specifying a replication factor for jarcache (jira.shegalov via rohini)
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1664724&r1=1664723&r2=1664724&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Fri Mar 6 21:26:29 2015
@@ -20,16 +20,19 @@ package org.apache.pig.newplan.logical.r
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.Expression;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
import org.apache.pig.Expression.BinaryExpression;
import org.apache.pig.Expression.Column;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.FilterExtractor;
import org.apache.pig.newplan.Operator;
@@ -45,6 +48,8 @@ import org.apache.pig.newplan.optimizer.
import org.apache.pig.newplan.optimizer.Transformer;
public class PartitionFilterOptimizer extends Rule {
+
+ private static final Log LOG = LogFactory.getLog(PartitionFilterOptimizer.class);
private String[] partitionKeys;
/**
@@ -100,6 +105,7 @@ public class PartitionFilterOptimizer ex
public class PartitionFilterPushDownTransformer extends Transformer {
protected OperatorSubPlan subPlan;
+ private boolean planChanged;
@Override
public boolean check(OperatorPlan matched) throws FrontendException {
@@ -136,7 +142,10 @@ public class PartitionFilterOptimizer ex
@Override
public OperatorPlan reportChanges() {
- return subPlan;
+ // Return null in case there is no partition filter extracted
+ // which means the plan hasn't changed.
+ // If not return the modified plan which has filters removed.
+ return planChanged ? subPlan : null;
}
@Override
@@ -148,6 +157,7 @@ public class PartitionFilterOptimizer ex
FilterExtractor filterFinder = new PartitionFilterExtractor(loFilter.getFilterPlan(),
getMappedKeys(partitionKeys));
filterFinder.visit();
+ LOG.info("Partition keys are " + Arrays.asList(partitionKeys));
Expression partitionFilter = filterFinder.getPushDownExpression();
if(partitionFilter != null) {
@@ -157,7 +167,9 @@ public class PartitionFilterOptimizer ex
// LoadFunc.getSchema()
updateMappedColNames(partitionFilter);
try {
+ LOG.info("Setting partition filter [" + partitionFilter + "] on loader " + loadMetadata);
loadMetadata.setPartitionFilter(partitionFilter);
+ planChanged = true;
} catch (IOException e) {
throw new FrontendException( e );
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java?rev=1664724&r1=1664723&r2=1664724&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java Fri Mar 6 21:26:29 2015
@@ -24,13 +24,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.Expression;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.Expression.Column;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPredicatePushdown;
-import org.apache.pig.Expression.BinaryExpression;
-import org.apache.pig.Expression.Column;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
@@ -46,6 +48,8 @@ import org.apache.pig.newplan.optimizer.
public class PredicatePushdownOptimizer extends Rule {
+ private static final Log LOG = LogFactory.getLog(PredicatePushdownOptimizer.class);
+
public PredicatePushdownOptimizer(String name) {
super(name, false);
}
@@ -153,6 +157,7 @@ public class PredicatePushdownOptimizer
// LoadFunc.getSchema()
updateMappedColNames(pushDownPredicate);
try {
+ LOG.info("Setting predicate pushdown filter [" + pushDownPredicate + "] on loader " + loadPredPushdown);
loadPredPushdown.setPushdownPredicate(pushDownPredicate);
} catch (IOException e) {
throw new FrontendException( e );