You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/03/04 19:21:22 UTC

svn commit: r919109 - in /hadoop/pig/trunk: CHANGES.txt src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java src/org/apache/pig/impl/plan/OperatorPlan.java test/org/apache/pig/test/TestPartitionFilterOptimization.java

Author: rding
Date: Thu Mar  4 18:21:21 2010
New Revision: 919109

URL: http://svn.apache.org/viewvc?rev=919109&view=rev
Log:
PIG-1267: Problems with partition filter optimizer

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=919109&r1=919108&r2=919109&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Mar  4 18:21:21 2010
@@ -139,6 +139,8 @@
 
 BUG FIXES
 
+PIG-1267: Problems with partition filter optimizer (rding)
+
 PIG-1079: Modify merge join to use distributed cache to maintain the index
 (rding)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java?rev=919109&r1=919108&r2=919109&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java Thu Mar  4 18:21:21 2010
@@ -19,10 +19,11 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.Expression;
@@ -31,7 +32,6 @@
 import org.apache.pig.PigException;
 import org.apache.pig.Expression.BinaryExpression;
 import org.apache.pig.Expression.Column;
-import org.apache.pig.Expression.OpType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOFilter;
 import org.apache.pig.impl.logicalLayer.LOLoad;
@@ -69,9 +69,9 @@
     private LOFilter loFilter;
     
     /**
-     * flag to ensure we only do the optimization once for performance reasons
+     * to ensure we only do the optimization once for performance reasons
      */
-    private boolean alreadyCalled = false;
+    private Set<LogicalOperator> alreadyChecked = new HashSet<LogicalOperator>();
     
     /**
      * a map between column names as reported in 
@@ -98,13 +98,6 @@
     @Override
     public boolean check(List<LogicalOperator> nodes) throws OptimizerException 
     {
-        if(!alreadyCalled) {
-            // first call
-            alreadyCalled = true;
-        } else {
-            // already called, just return
-            return false; 
-        }
         if((nodes == null) || (nodes.size() <= 0)) {
             int errCode = 2052;
             String msg = "Internal error. Cannot retrieve operator from null " +
@@ -114,6 +107,9 @@
         if(nodes.size() != 1|| !(nodes.get(0) instanceof LOLoad)) {
             return false;
         }
+        if (!alreadyChecked.add(nodes.get(0))) {
+            return false;
+        }
         loLoad = (LOLoad)nodes.get(0);
         List<LogicalOperator> sucs = mPlan.getSuccessors(loLoad);
         if(sucs == null || sucs.size() != 1 || !(sucs.get(0) instanceof LOFilter)) {
@@ -164,7 +160,7 @@
                 updateMappedColNames(partitionFilter);
                 loadMetadata.setPartitionFilter(partitionFilter);
                 if(pColFilterFinder.isFilterRemovable()) {
-                    // remove this filter from the plan
+                    // remove this filter from the plan                  
                     mPlan.removeAndReconnect(loFilter);
                 }
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=919109&r1=919108&r2=919109&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Thu Mar  4 18:21:21 2010
@@ -669,7 +669,10 @@
             pred = preds.get(0);
             disconnect(pred, node);
         }
-
+        
+        int oldPos = -1;
+        int newPos = -1;
+        
         List<E> succs = getSuccessors(node);
         E succ = null;
         if (succs != null) {
@@ -681,13 +684,51 @@
                 throw pe;
             }
             succ = succs.get(0);
+            List<E> plst = getPredecessors(succ);
+            for (int i=0; i<plst.size(); i++) {
+                if (plst.get(i).equals(node)) {
+                    oldPos = i;
+                }
+            }
             disconnect(node, succ);
         }
 
         remove(node);
         if (pred != null && succ != null) {
             connect(pred, succ);
-            succ.rewire(node, 0, pred, true);
+            List<E> plst = getPredecessors(succ);
+            for (int i=0; i<plst.size(); i++) {
+                if (plst.get(i).equals(pred)) {
+                    newPos = i;
+                }
+            }
+            
+            if (oldPos < 0 || newPos < 0) {
+                throw new PlanException("Invalid position index: " + oldPos                        
+                        + " : " + newPos);
+            }
+            
+            if (oldPos != newPos) {            
+                List<E> nlst = new ArrayList<E>();
+                for (int i=0; i<plst.size(); i++) {
+                    E nod = plst.get(i);
+                    if (i == oldPos) {
+                        nlst.add(pred);
+                    }
+                    if (i == newPos) continue;
+                    nlst.add(nod);
+                }
+                
+                if (nlst.size() != plst.size()) {
+                    throw new PlanException("Invalid list size: " + nlst.size()
+                            + " : " + plst.size());
+                }
+                         
+                mToEdges.removeKey(succ);
+                mToEdges.put(succ, nlst);
+            }
+            
+            succ.rewire(node, oldPos, pred, true);
         }
     }
 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java?rev=919109&r1=919108&r2=919109&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java Thu Mar  4 18:21:21 2010
@@ -45,6 +45,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.PColFilterExtractor;
+import org.apache.pig.impl.logicalLayer.PlanSetter;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.LogUtils;
@@ -420,6 +421,46 @@
                 "((age >= 20) and (f3 == 15))", actual);
     }
     
+    /**
+     * Test PIG-1267
+     * @throws Exception
+     */
+    @Test
+    public void testColNameMapping5() throws Exception {
+        TestLoader.partFilter = null;
+        lpTester.buildPlan("a = load 'foo' using "
+            + TestLoader.class.getName() + 
+            "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
+            "'srcid');");
+        lpTester.buildPlan("b = load 'bar' using "
+                + TestLoader.class.getName() + 
+                "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
+                "'srcid');");
+        lpTester.buildPlan("a1 = filter a by srcid == 10;");
+        lpTester.buildPlan("b1 = filter b by srcid == 20;");
+        lpTester.buildPlan("c = join a1 by bcookie, b1 by bcookie;");
+        LogicalPlan lp = lpTester
+                .buildPlan("d = foreach c generate $4 as bcookie:chararray, " +
+                		"$5 as dstid:int, $0 as mrkt:chararray;");
+        
+        new PlanSetter(lp).visit();
+        
+        lpTester.typeCheckPlan(lp);
+        lpTester.optimizePlan(lp);
+ 
+        assertEquals("checking partition filter:",             
+                    "(srcid == 20)",
+                    TestLoader.partFilter.toString());
+        
+        int counter = 0;
+        Iterator<LogicalOperator> iter = lp.getKeys().values().iterator();
+        while (iter.hasNext()) {
+            assertTrue(!(iter.next() instanceof LOFilter));
+            counter++;
+        }      
+        assertEquals(counter, 6);
+    }
+    
     //// helper methods ///////
     
     private PColFilterExtractor test(LogicalPlan lp, List<String> partitionCols,