You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/10/18 20:16:33 UTC

svn commit: r1023931 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/

Author: daijy
Date: Mon Oct 18 18:16:32 2010
New Revision: 1023931

URL: http://svn.apache.org/viewvc?rev=1023931&view=rev
Log:
PIG-1669: PushUpFilter fail when filter condition contains scalar

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Oct 18 18:16:32 2010
@@ -209,6 +209,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1669: PushUpFilter fail when filter condition contains scalar (daijy)
+
 PIG-1672: order of relations in replicated join gets switched in a query where
  first relation has two mergeable foreach statements (thejas)
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Oct 18 18:16:32 2010
@@ -771,6 +771,16 @@ public class MRCompiler extends PhyPlanV
                     mergedMap.scalars.add(physOp);
                 }
             }
+            Set<PhysicalOperator> opsToChange = new HashSet<PhysicalOperator>();
+            for (Map.Entry<PhysicalOperator, MapReduceOper> entry : phyToMROpMap.entrySet()) {
+                if (entry.getValue()==rmro) {
+                    opsToChange.add(entry.getKey());
+                }
+            }
+            for (PhysicalOperator op : opsToChange) {
+                phyToMROpMap.put(op, mergedMap);
+            }
+            
             MRPlan.remove(rmro);
         }
         return ret;

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java Mon Oct 18 18:16:32 2010
@@ -260,8 +260,13 @@ public class FilterAboveForeach extends 
              *  -- And ForEach is FilterPred 
              */
             
-            currentPlan.removeAndReconnect(filter);
-            currentPlan.insertBetween(forEachPred, filter, foreach);
+            Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach);
+            Pair<Integer, Integer> filterPredPlaces = currentPlan.disconnect(filterPred, filter);
+            Pair<Integer, Integer> filterSucPlaces = currentPlan.disconnect(filter, filterSuc);
+            
+            currentPlan.connect(forEachPred, forEachPredPlaces.first, filter, filterPredPlaces.second);
+            currentPlan.connect(filter, filterSucPlaces.first, foreach, forEachPredPlaces.second);
+            currentPlan.connect(filterPred, filterPredPlaces.first, filterSuc, filterSucPlaces.second);
             
             subPlan.add(forEachPred);
             subPlan.add(foreach);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java Mon Oct 18 18:16:32 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.newplan.logical.rules;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -82,7 +83,12 @@ public class MergeFilter extends Rule {
                 }
 
                 // Since we remove next, we need to merge soft link into filter
-                List<Operator> nextSoftPreds = currentPlan.getSoftLinkPredecessors(next);
+                List<Operator> nextSoftPreds = null;
+                if (currentPlan.getSoftLinkPredecessors(next)!=null) {
+                    nextSoftPreds = new ArrayList<Operator>();
+                    nextSoftPreds.addAll(currentPlan.getSoftLinkPredecessors(next));
+                }
+                
                 if (nextSoftPreds!=null) {
                     for (Operator softPred : nextSoftPreds) {
                         currentPlan.removeSoftLink(softPred, next);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Mon Oct 18 18:16:32 2010
@@ -110,6 +110,10 @@ public class PartitionFilterOptimizer ex
                 return false;
             loFilter =  (LOFilter)succeds.get(0);
             
+            // Filter has dependency other than load, skip optimization
+            if (currentPlan.getSoftLinkPredecessors(loFilter)!=null)
+                return false;
+            
             // we have to check more only if LoadFunc implements LoadMetada
             loadFunc = loLoad.getLoadFunc();
             if(!( loadFunc instanceof LoadMetadata ) ) {

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java Mon Oct 18 18:16:32 2010
@@ -211,14 +211,18 @@ public class PushDownForEachFlatten exte
             LOForEach foreach = (LOForEach)matched.getSources().get(0);
             Operator next = currentPlan.getSuccessors( foreach ).get(0);
             if( next instanceof LOSort ) {
-                currentPlan.removeAndReconnect(foreach);
-                
-                List<Operator> succs = currentPlan.getSuccessors( next );
+                Operator pred = currentPlan.getPredecessors( foreach ).get( 0 );
+                List<Operator> succs = new ArrayList<Operator>();
+                succs.addAll(currentPlan.getSuccessors( next ));
+                Pair<Integer, Integer> pos1 = currentPlan.disconnect( pred, foreach );
+                Pair<Integer, Integer> pos2 = currentPlan.disconnect( foreach, next );
+                currentPlan.connect( pred, pos1.first, next, pos2.second );
+
                 if( succs != null ) {
-                    List<Operator> succsCopy = new ArrayList<Operator>();
-                    succsCopy.addAll(succs);
-                    for( Operator succ : succsCopy ) {
-                        currentPlan.insertBetween(next, foreach, succ);
+                    for( Operator succ : succs ) {
+                        Pair<Integer, Integer> pos = currentPlan.disconnect( next, succ );
+                        currentPlan.connect( next, pos.first, foreach, 0 );
+                        currentPlan.connect( foreach, 0, succ, pos.second );
                     }
                 } else {
                     currentPlan.connect( next, foreach );
@@ -268,10 +272,10 @@ public class PushDownForEachFlatten exte
                 
                 newForeach.setAlias(((LogicalRelationalOperator)next).getAlias());
                 
-                currentPlan.add( newForeach );
                 Operator opAfterX = null;
                 List<Operator> succs = currentPlan.getSuccessors( next );
                 if( succs == null || succs.size() == 0 ) {
+                    currentPlan.add( newForeach );
                     currentPlan.connect( next, newForeach );
                 } else {
                     opAfterX = succs.get( 0 );

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java Mon Oct 18 18:16:32 2010
@@ -207,6 +207,8 @@ public class PushUpFilter extends Rule {
             Operator predecessor = this.findNonFilterPredecessor( filter );
             subPlan.add( predecessor) ;
             
+            // Disconnect the filter in the plan without removing it from the plan.
+            Operator predec = currentPlan.getPredecessors( filter ).get( 0 );
             Operator succed;
             
             if (currentPlan.getSuccessors(filter)!=null)
@@ -214,12 +216,13 @@ public class PushUpFilter extends Rule {
             else
                 succed = null;
             
+            Pair<Integer, Integer> p1 = currentPlan.disconnect(predec, filter);
             if (succed!=null) {
                 subPlan.add(succed);
+                Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
+                currentPlan.connect(predec, p1.first, succed, p2.second);
             }
             
-            currentPlan.removeAndReconnect(filter);
-            
             if( predecessor instanceof LOSort || predecessor instanceof LODistinct ||
                 ( predecessor instanceof LOCogroup && currentPlan.getPredecessors( predecessor ).size() == 1 ) ) {
                 // For sort, put the filter in front of it.
@@ -318,7 +321,9 @@ public class PushUpFilter extends Rule {
         // Insert the filter in between the given two operators.
         private void insertFilter(Operator prev, Operator predecessor, LOFilter filter)
         throws FrontendException {
-            currentPlan.insertBetween(prev, filter, predecessor);
+            Pair<Integer, Integer> p3 = currentPlan.disconnect( prev, predecessor );
+            currentPlan.connect( prev, p3.first, filter, 0 );
+            currentPlan.connect( filter, 0, predecessor, p3.second );
         }
         
         // Identify those among preds that will need to have a filter between it and the predecessor.

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Oct 18 18:16:32 2010
@@ -745,4 +745,35 @@ public class TestEvalPipeline2 extends T
         
         assertFalse(iter.hasNext());
     }
+    
+    // See PIG-1669
+    @Test
+    public void testPushUpFilterScalar() throws Exception{
+        String[] input1 = {
+                "jason\t14\t4.7",
+                "jack\t18\t4.6"
+        };
+        
+        String[] input2 = {
+                "jason\t14",
+                "jack\t18"
+        };
+        
+        Util.createInputFile(cluster, "table_PushUpFilterScalar1", input1);
+        Util.createInputFile(cluster, "table_PushUpFilterScalar2", input2);
+        pigServer.registerQuery("A = load 'table_PushUpFilterScalar1' as (name, age, gpa);");
+        pigServer.registerQuery("B = load 'table_PushUpFilterScalar2' as (name, age);");
+        pigServer.registerQuery("C = filter A by age < 20;");
+        pigServer.registerQuery("D = filter B by age < 20;");
+        pigServer.registerQuery("simple_scalar = limit D 1;");
+        pigServer.registerQuery("E = join C by name, D by name;");
+        pigServer.registerQuery("F = filter E by C::age==(int)simple_scalar.age;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("F");
+        
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(jason,14,4.7,jason,14)"));
+        
+        assertFalse(iter.hasNext());
+    }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Mon Oct 18 18:16:32 2010
@@ -379,7 +379,7 @@ public class TestNewPlanFilterAboveForea
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }
-
+    
     @Test
     public void testFilterForeachFlatten() throws Exception {
         planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
@@ -398,6 +398,23 @@ public class TestNewPlanFilterAboveForea
         Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
         Assert.assertTrue( store instanceof LOStore );
     }
+    
+    // See PIG-1669
+    @Test
+    public void testPushUpFilterWithScalar() throws Exception {
+        planTester.buildPlan("a = load 'studenttab10k' as (name, age, gpa);");
+        planTester.buildPlan("b = group a all;");
+        planTester.buildPlan("c = foreach b generate AVG(a.age) as age;");
+        planTester.buildPlan("d = foreach a generate name, age;");
+        planTester.buildPlan("e = filter d by age > c.age;");
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("f = store e into 'empty';");
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+
+        Operator store = newLogicalPlan.getSinks().get( 0 );
+        Operator foreach = newLogicalPlan.getPredecessors(store).get(0);
+        Assert.assertTrue( foreach instanceof LOForEach );
+    }
 
     private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws FrontendException {
         LogicalPlan newLogicalPlan = migratePlan( plan );