You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/03/06 22:26:30 UTC

svn commit: r1664724 - in /pig/trunk: CHANGES.txt src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java

Author: rohini
Date: Fri Mar  6 21:26:29 2015
New Revision: 1664724

URL: http://svn.apache.org/r1664724
Log:
PIG-4451: Log partition and predicate filter pushdown information and fix optimizer looping (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1664724&r1=1664723&r2=1664724&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Mar  6 21:26:29 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4451: Log partition and predicate filter pushdown information and fix optimizer looping (rohini)
+
 PIG-4430: Pig should support reading log4j.properties file from classpath as well (rdsr via daijy)
 
 PIG-4407: Allow specifying a replication factor for jarcache (jira.shegalov via rohini)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1664724&r1=1664723&r2=1664724&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Fri Mar  6 21:26:29 2015
@@ -20,16 +20,19 @@ package org.apache.pig.newplan.logical.r
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.Expression;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
 import org.apache.pig.Expression.BinaryExpression;
 import org.apache.pig.Expression.Column;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.FilterExtractor;
 import org.apache.pig.newplan.Operator;
@@ -45,6 +48,8 @@ import org.apache.pig.newplan.optimizer.
 import org.apache.pig.newplan.optimizer.Transformer;
 
 public class PartitionFilterOptimizer extends Rule {
+
+    private static final Log LOG = LogFactory.getLog(PartitionFilterOptimizer.class);
     private String[] partitionKeys;
 
     /**
@@ -100,6 +105,7 @@ public class PartitionFilterOptimizer ex
 
     public class PartitionFilterPushDownTransformer extends Transformer {
         protected OperatorSubPlan subPlan;
+        private boolean planChanged;
 
         @Override
         public boolean check(OperatorPlan matched) throws FrontendException {
@@ -136,7 +142,10 @@ public class PartitionFilterOptimizer ex
 
         @Override
         public OperatorPlan reportChanges() {
-            return subPlan;
+            // Return null in case there is no partition filter extracted
+            // which means the plan hasn't changed.
+            // If not return the modified plan which has filters removed.
+            return planChanged ? subPlan : null;
         }
 
         @Override
@@ -148,6 +157,7 @@ public class PartitionFilterOptimizer ex
             FilterExtractor filterFinder = new PartitionFilterExtractor(loFilter.getFilterPlan(),
                     getMappedKeys(partitionKeys));
             filterFinder.visit();
+            LOG.info("Partition keys are " + Arrays.asList(partitionKeys));
             Expression partitionFilter = filterFinder.getPushDownExpression();
 
             if(partitionFilter != null) {
@@ -157,7 +167,9 @@ public class PartitionFilterOptimizer ex
                 // LoadFunc.getSchema()
                 updateMappedColNames(partitionFilter);
                 try {
+                    LOG.info("Setting partition filter [" + partitionFilter + "] on loader " + loadMetadata);
                     loadMetadata.setPartitionFilter(partitionFilter);
+                    planChanged = true;
                 } catch (IOException e) {
                     throw new FrontendException( e );
                 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java?rev=1664724&r1=1664723&r2=1664724&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java Fri Mar  6 21:26:29 2015
@@ -24,13 +24,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.Expression;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.Expression.Column;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.LoadPredicatePushdown;
-import org.apache.pig.Expression.BinaryExpression;
-import org.apache.pig.Expression.Column;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
@@ -46,6 +48,8 @@ import org.apache.pig.newplan.optimizer.
 
 public class PredicatePushdownOptimizer extends Rule {
 
+    private static final Log LOG = LogFactory.getLog(PredicatePushdownOptimizer.class);
+
     public PredicatePushdownOptimizer(String name) {
         super(name, false);
     }
@@ -153,6 +157,7 @@ public class PredicatePushdownOptimizer
                 // LoadFunc.getSchema()
                 updateMappedColNames(pushDownPredicate);
                 try {
+                    LOG.info("Setting predicate pushdown filter [" + pushDownPredicate + "] on loader " + loadPredPushdown);
                     loadPredPushdown.setPushdownPredicate(pushDownPredicate);
                 } catch (IOException e) {
                     throw new FrontendException( e );