You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/05/12 11:58:24 UTC

svn commit: r1102227 - in /pig/branches/branch-0.9: ./ src/org/apache/pig/builtin/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/

Author: dvryaboy
Date: Thu May 12 09:58:24 2011
New Revision: 1102227

URL: http://svn.apache.org/viewvc?rev=1102227&view=rev
Log:
PIG-2014: SAMPLE should not be pushed up

Modified:
    pig/branches/branch-0.9/CHANGES.txt
    pig/branches/branch-0.9/src/org/apache/pig/builtin/RANDOM.java
    pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
    pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java
    pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
    pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
    pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
    pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterRule.java
    pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java

Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Thu May 12 09:58:24 2011
@@ -178,6 +178,8 @@ PIG-1696: Performance: Use System.arrayc
 
 BUG FIXES
 
+PIG-2014: SAMPLE shouldn't be pushed up (dvryaboy)
+
 PIG-2058: Macro missing returns clause doesn't give a good error message (rding)
 
 PIG-2035: Macro expansion doesn't handle multiple expansions of same macro inside another macro (rding)

Modified: pig/branches/branch-0.9/src/org/apache/pig/builtin/RANDOM.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/builtin/RANDOM.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/builtin/RANDOM.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/builtin/RANDOM.java Thu May 12 09:58:24 2011
@@ -29,8 +29,10 @@ import org.apache.pig.data.DataType;
  * Return a random double value.  Whatever arguments are passed to this UDF
  * are ignored.
  */
+@Nondeterministic
 public class RANDOM extends EvalFunc<Double>{
 
+	@Override
 	public Double exec(Tuple input) throws IOException {
 		return Math.random();
 	}

Modified: pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java Thu May 12 09:58:24 2011
@@ -55,7 +55,7 @@ public class FilterAboveForeach extends 
         LogicalPlan plan = new LogicalPlan();
         LogicalRelationalOperator foreach = new LOForEach(plan);
         LogicalRelationalOperator filter = new LOFilter(plan);
-        
+
         plan.add(foreach);
         plan.add(filter);
         plan.connect(foreach, filter);
@@ -67,14 +67,14 @@ public class FilterAboveForeach extends 
     public Transformer getNewTransformer() {
         return new FilterAboveForEachTransformer();
     }
-    
+
     public class FilterAboveForEachTransformer extends Transformer {
 
         LOFilter filter = null;
         LOForEach foreach = null;
         LogicalRelationalOperator forEachPred = null;
         OperatorSubPlan subPlan = null;
-        
+
         @Override
         public boolean check(OperatorPlan matched) throws FrontendException {
             Iterator<Operator> iter = matched.getOperators();
@@ -85,10 +85,10 @@ public class FilterAboveForeach extends 
                     break;
                 }
             }
-            
+
             // This would be a strange case
             if( foreach == null ) return false;
-            
+
             iter = matched.getOperators();
             while( iter.hasNext() ) {
                 Operator op = iter.next();
@@ -97,35 +97,37 @@ public class FilterAboveForeach extends 
                     break;
                 }
             }
-            
+
             // This is for cheating, we look up more than one filter in the plan
             while( filter != null ) {
+
                 // Get uids of Filter
                 Pair<List<Long>, List<Byte>> uidWithTypes = getFilterProjectionUids(filter);
 
                 // See if the previous operators have uids from project
-                List<Operator> preds = currentPlan.getPredecessors(foreach);            
+                List<Operator> preds = currentPlan.getPredecessors(foreach);
                 for(int j=0; j< preds.size(); j++) {
                     LogicalRelationalOperator logRelOp = (LogicalRelationalOperator)preds.get(j);
                     if (hasAll(logRelOp, uidWithTypes)) {
                         forEachPred = (LogicalRelationalOperator) preds.get(j);
-                        return true;
+                        // If a filter is nondeterministic, we shouldn't push it up.
+                        return !OptimizerUtils.planHasNonDeterministicUdf(filter.getFilterPlan());
                     }
                 }
-                
+
                 // Chances are there are filters below this filter which can be
                 // moved up. So searching for those filters
                 List<Operator> successors = currentPlan.getSuccessors(filter);
-                if( successors != null && successors.size() > 0 && 
+                if( successors != null && successors.size() > 0 &&
                         successors.get(0) instanceof LOFilter ) {
                     filter = (LOFilter)successors.get(0);
                 } else {
                     filter = null;
                 }
             }
-            return false;            
+            return false;
         }
-        
+
         /**
          * Get all uids from Projections of this FilterOperator
          * @param filter
@@ -136,7 +138,7 @@ public class FilterAboveForeach extends 
             List<Byte> types = new ArrayList<Byte>();
             if( filter != null ) {
                 LogicalExpressionPlan filterPlan = filter.getFilterPlan();
-                Iterator<Operator> iter = filterPlan.getOperators();            
+                Iterator<Operator> iter = filterPlan.getOperators();
                 Operator op = null;
                 while( iter.hasNext() ) {
                     op = iter.next();
@@ -159,29 +161,29 @@ public class FilterAboveForeach extends 
                         }
                     }
                 }
-                
+
             }
-            
+
             Pair<List<Long>, List<Byte>> result = new Pair<List<Long>, List<Byte>>(uids, types);
             return result;
         }
-        
+
         /**
          * checks if a relational operator contains all of the specified uids
          * @param op LogicalRelational operator that should contain the uid
          * @param uids Uids to check for
          * @return true if given LogicalRelationalOperator has all the given uids
          */
-        private boolean hasAll(LogicalRelationalOperator op, Pair<List<Long>, 
+        private boolean hasAll(LogicalRelationalOperator op, Pair<List<Long>,
                 List<Byte>> uidWithTypes) throws FrontendException {
             LogicalSchema schema = op.getSchema();
-            
+
             if (schema==null)
                 return false;
-            
+
             List<Long> uids = uidWithTypes.first;
             List<Byte> types = uidWithTypes.second;
-            
+
             for (int i=0;i<uids.size();i++) {
                 boolean found = false;
                 for (LogicalSchema.LogicalFieldSchema fs : schema.getFields()) {
@@ -193,53 +195,53 @@ public class FilterAboveForeach extends 
             }
             return true;
         }
-        
+
         @Override
-        public OperatorPlan reportChanges() {            
+        public OperatorPlan reportChanges() {
             return subPlan;
         }
 
         @Override
         public void transform(OperatorPlan matched) throws FrontendException {
-            
+
             List<Operator> opSet = currentPlan.getPredecessors(filter);
             if( ! ( opSet != null && opSet.size() > 0 ) ) {
                 return;
             }
             Operator filterPred = opSet.get(0);
-            
+
             opSet = currentPlan.getSuccessors(filter);
             if( ! ( opSet != null && opSet.size() > 0 ) ) {
                 return;
             }
             Operator filterSuc = opSet.get(0);
-            
+
             subPlan = new OperatorSubPlan(currentPlan);
-            
+
             // Steps below do the following
             /*
              *          ForEachPred
              *               |
-             *            ForEach         
+             *            ForEach
              *               |
              *             Filter*
              *      ( These are filters
              *      which cannot be moved )
              *               |
-             *           FilterPred                 
+             *           FilterPred
              *         ( is a Filter )
              *               |
              *             Filter
-             *        ( To be moved ) 
+             *        ( To be moved )
              *               |
              *            FilterSuc
-             *              
+             *
              *               |
              *               |
-             *        Transforms into 
+             *        Transforms into
              *               |
-             *              \/            
-             *                      
+             *              \/
+             *
              *            ForEachPred
              *               |
              *            Filter
@@ -251,25 +253,25 @@ public class FilterAboveForeach extends 
              *       ( These are filters
              *      which cannot be moved )
              *               |
-             *           FilterPred                 
+             *           FilterPred
              *         ( is a Filter )
              *               |
              *            FilterSuc
-             *            
+             *
              *  Above plan is assuming we are modifying the filter in middle.
              *  If we are modifying the first filter after ForEach then
              *  -- * (kleene star) becomes zero
-             *  -- And ForEach is FilterPred 
+             *  -- And ForEach is FilterPred
              */
-            
+
             Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach);
             Pair<Integer, Integer> filterPredPlaces = currentPlan.disconnect(filterPred, filter);
             Pair<Integer, Integer> filterSucPlaces = currentPlan.disconnect(filter, filterSuc);
-            
+
             currentPlan.connect(forEachPred, forEachPredPlaces.first, filter, filterPredPlaces.second);
             currentPlan.connect(filter, filterSucPlaces.first, foreach, forEachPredPlaces.second);
             currentPlan.connect(filterPred, filterPredPlaces.first, filterSuc, filterSucPlaces.second);
-            
+
             subPlan.add(forEachPred);
             subPlan.add(foreach);
             subPlan.add(filterPred);

Modified: pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java Thu May 12 09:58:24 2011
@@ -19,7 +19,11 @@ package org.apache.pig.newplan.logical.r
 
 import java.util.Iterator;
 
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
@@ -63,4 +67,26 @@ public class OptimizerUtils {
         LOGenerate gen = findGenerate( foreach );
         return hasFlatten( gen );
     }
+
+    /**
+     * Helper method to determine if the logical expression plan for a Filter contains
+     * non-deterministic operations and should therefore be treated extra carefully
+     * during optimization.
+     *
+     * @param filterPlan
+     * @return true of the filter plan contains a non-deterministic UDF
+     */
+    public static boolean planHasNonDeterministicUdf(LogicalExpressionPlan filterPlan) {
+        Iterator<Operator> it = filterPlan.getOperators();
+        while( it.hasNext() ) {
+            Operator op = it.next();
+            if( op instanceof UserFuncExpression ) {
+                Object udf = PigContext.instantiateFuncFromSpec(((UserFuncExpression) op).getFuncSpec());
+                if (udf.getClass().getAnnotation(Nondeterministic.class) != null) {
+                    return true;
+}
+            }
+        }
+        return false;
+    }
 }

Modified: pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java Thu May 12 09:58:24 2011
@@ -91,9 +91,16 @@ public class PushDownForEachFlatten exte
             
             LOForEach foreach = (LOForEach)matched.getSources().get(0);
             LOGenerate gen = OptimizerUtils.findGenerate( foreach );
+            
             if( !OptimizerUtils.hasFlatten( gen ) )
                 return false;
             
+            // If a foreach contains a nondeterministic udf, we shouldn't push it down.
+            for (LogicalExpressionPlan p : gen.getOutputPlans()) {
+                if (OptimizerUtils.planHasNonDeterministicUdf(p))
+                    return false;
+            }
+            
             List<Operator> succs = currentPlan.getSuccessors( foreach );
             if( succs == null || succs.size() != 1 )
                 return false;

Modified: pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java Thu May 12 09:58:24 2011
@@ -58,13 +58,13 @@ import org.apache.pig.newplan.optimizer.
 import org.apache.pig.newplan.optimizer.Transformer;
 
 public class PushUpFilter extends Rule {
-    
+
     public PushUpFilter(String n) {
-        super(n, false);       
+        super(n, false);
     }
 
     @Override
-    public Transformer getNewTransformer() {        
+    public Transformer getNewTransformer() {
         return new PushUpFilterTransformer();
     }
 
@@ -72,38 +72,42 @@ public class PushUpFilter extends Rule {
         private OperatorSubPlan subPlan;
 
         @Override
-        public boolean check(OperatorPlan matched) throws FrontendException {   
+        public boolean check(OperatorPlan matched) throws FrontendException {
             // check if it is inner join
             Operator current = matched.getSources().get(0);
-            
+
             Operator pred = findNonFilterPredecessor( current );
             if( pred == null )
                 return false;
-            
+
             // sort, distinct, or sort by is always okay.
             if( pred instanceof LOSort || pred instanceof LODistinct || pred instanceof LOUnion ) {
                 return true;
             }
-            
+
             // if the predecessor is one of LOLoad/LOStore/LOStream/LOLimit/LONative
             // if predecessor is LOForEach, it is optimized by rule FilterAboveForeach
             // return false
             if( pred instanceof LOLoad   || pred instanceof LOStore || pred instanceof LOStream      ||
-                pred instanceof LOFilter || pred instanceof LOSplit || pred instanceof LOSplitOutput || 
+                pred instanceof LOFilter || pred instanceof LOSplit || pred instanceof LOSplitOutput ||
                 pred instanceof LOLimit  || pred instanceof LONative || pred instanceof LOForEach) {
                 return false;
             }
-            
-            LOFilter filter = (LOFilter)current;            
+
+            LOFilter filter = (LOFilter)current;
             List<Operator> preds = currentPlan.getPredecessors( pred );
             LogicalExpressionPlan filterPlan = filter.getFilterPlan();
-                
+
+            if (OptimizerUtils.planHasNonDeterministicUdf(filterPlan)) {
+                return false;
+            }
+
             // collect all uids used in the filter plan
             Set<Long> uids = collectUidFromExpPlan(filterPlan);
-                                
+
             if( pred instanceof LOCogroup ) {
                 LOCogroup cogrp = (LOCogroup)pred;
-                if( preds.size() == 1 ) { 
+                if( preds.size() == 1 ) {
                     if( hasAll( (LogicalRelationalOperator)preds.get( 0 ), uids )    ) {
                         // Order by is ok if all UIDs can be found from previous operator.
                         return true;
@@ -115,7 +119,7 @@ public class PushUpFilter extends Rule {
                     return true;
                 }
             }
-            
+
             // if the predecessor is a multi-input operator then detailed
             // checks are required
             if( pred instanceof LOCross || pred instanceof LOJoin ) {
@@ -135,7 +139,7 @@ public class PushUpFilter extends Rule {
                     if (isFullOuter)
                         return false;
                 }
-                
+
 
                 for(int j=0; j<preds.size(); j++) {
                     if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
@@ -144,12 +148,12 @@ public class PushUpFilter extends Rule {
                         if (pred instanceof LOCross || pred instanceof LOJoin && (isInner || innerFlags[j]))
                             return true;
                     }
-                }                       
+                }
             }
-            
+
             return false;
         }
-        
+
         private boolean containUDF(LogicalExpressionPlan filterPlan) {
             Iterator<Operator> it = filterPlan.getOperators();
             while( it.hasNext() ) {
@@ -171,9 +175,9 @@ public class PushUpFilter extends Rule {
             }
             return uids;
         }
-        
+
         /**
-         * Starting from current operator (which is a filter), search its successors until 
+         * Starting from current operator (which is a filter), search its successors until
          * locating a non-filter operator. Null is returned if none is found.
          */
         private Operator findNonFilterPredecessor(Operator current) {
@@ -194,7 +198,7 @@ public class PushUpFilter extends Rule {
                     return pred;
                 }
             } while( true );
-                
+
         }
 
         @Override
@@ -202,32 +206,32 @@ public class PushUpFilter extends Rule {
             subPlan = new OperatorSubPlan(currentPlan);
 
             LOFilter filter = (LOFilter)matched.getSources().get(0);
-            
+
             // This is the one that we will insert filter btwn it and it's input.
             Operator predecessor = this.findNonFilterPredecessor( filter );
             subPlan.add( predecessor) ;
-            
+
             // Disconnect the filter in the plan without removing it from the plan.
             Operator predec = currentPlan.getPredecessors( filter ).get( 0 );
             Operator succed;
-            
+
             if (currentPlan.getSuccessors(filter)!=null)
                 succed = currentPlan.getSuccessors(filter).get(0);
             else
                 succed = null;
-            
+
             Pair<Integer, Integer> p1 = currentPlan.disconnect(predec, filter);
             if (succed!=null) {
                 subPlan.add(succed);
                 Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
                 currentPlan.connect(predec, p1.first, succed, p2.second);
             }
-            
+
             if( predecessor instanceof LOSort || predecessor instanceof LODistinct ||
                 ( predecessor instanceof LOCogroup && currentPlan.getPredecessors( predecessor ).size() == 1 ) ) {
                 // For sort, put the filter in front of it.
                 Operator prev = currentPlan.getPredecessors( predecessor ).get( 0 );
-                
+
                 insertFilter( prev, predecessor, filter );
                 return;
             }
@@ -236,12 +240,12 @@ public class PushUpFilter extends Rule {
             LogicalExpressionPlan filterPlan = filter.getFilterPlan();
             List<Operator> preds = currentPlan.getPredecessors( predecessor );
             Map<Integer, Operator> inputs = findInputsToAddFilter( filterPlan, predecessor, preds );
-            
-            LOFilter newFilter = null;                
+
+            LOFilter newFilter = null;
             for( Entry<Integer, Operator> entry : inputs.entrySet() ) {
                 int inputIndex = entry.getKey();
                 Operator pred = entry.getValue();
-                
+
                 // Find projection field offset
                 int columnOffset = 0;
                 if( predecessor instanceof LOJoin || predecessor instanceof LOCross ) {
@@ -249,11 +253,11 @@ public class PushUpFilter extends Rule {
                         columnOffset += ( (LogicalRelationalOperator)preds.get( i ) ).getSchema().size();
                     }
                 }
-                
+
                 // Reuse the filter for the first match. For others, need to make a copy of the filter
                 // and add it between input and predecessor.
                 newFilter = newFilter == null ? filter : new LOFilter( (LogicalPlan)currentPlan );
-                
+
                 currentPlan.add( newFilter );
                 subPlan.add( newFilter );
                 subPlan.add( pred );
@@ -264,7 +268,7 @@ public class PushUpFilter extends Rule {
                     if( sink instanceof ProjectExpression )
                         projExprs.add( (ProjectExpression)sink );
                 }
-                
+
                 if( predecessor instanceof LOCogroup ) {
                     for( ProjectExpression projExpr : projExprs ) {
                         // Need to merge filter condition and cogroup by expression;
@@ -284,7 +288,7 @@ public class PushUpFilter extends Rule {
                         }
                     }
                 }
-                
+
                 // Now, reset the projection expressions in the new filter plan.
                 sinks = fPlan.getSinks();
                 for( Operator sink : sinks ) {
@@ -296,11 +300,11 @@ public class PushUpFilter extends Rule {
                     }
                  }
                 newFilter.setFilterPlan( fPlan );
-                
+
                 insertFilter( pred, predecessor, newFilter );
             }
         }
-        
+
         // check if a relational operator contains all of the specified uids
         private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) throws FrontendException {
             LogicalSchema schema = op.getSchema();
@@ -311,14 +315,14 @@ public class PushUpFilter extends Rule {
                     return false;
                 }
             }
-            
+
             return true;
         }
-           
+
         @Override
-        public OperatorPlan reportChanges() {            
+        public OperatorPlan reportChanges() {
             return currentPlan;
-        }          
+        }
 
         // Insert the filter in between the given two operators.
         private void insertFilter(Operator prev, Operator predecessor, LOFilter filter)
@@ -327,19 +331,19 @@ public class PushUpFilter extends Rule {
             currentPlan.connect( prev, p3.first, filter, 0 );
             currentPlan.connect( filter, 0, predecessor, p3.second );
         }
-        
+
         // Identify those among preds that will need to have a filter between it and the predecessor.
-        private Map<Integer, Operator> findInputsToAddFilter(LogicalExpressionPlan filterPlan, Operator predecessor, 
+        private Map<Integer, Operator> findInputsToAddFilter(LogicalExpressionPlan filterPlan, Operator predecessor,
                 List<Operator> preds) throws FrontendException {
             Map<Integer, Operator> inputs = new HashMap<Integer, Operator>();
-            
+
             if( predecessor instanceof LOUnion || predecessor instanceof LOCogroup ) {
                 for( int i = 0; i < preds.size(); i++ ) {
                     inputs.put( i, preds.get( i ) );
                 }
                 return inputs;
             }
-            
+
             // collect all uids used in the filter plan
             Set<Long> uids = collectUidFromExpPlan(filterPlan);
             boolean[] innerFlags = null;
@@ -353,13 +357,13 @@ public class PushUpFilter extends Rule {
                     }
                 }
             }
-            
+
             // Find the predecessor of join that contains all required uids.
             for(int j=0; j<preds.size(); j++) {
                 // Filter can push to LOJoin outer branch, but no inner branch
-                if( hasAll((LogicalRelationalOperator)preds.get(j), uids) && 
+                if( hasAll((LogicalRelationalOperator)preds.get(j), uids) &&
                         (predecessor instanceof LOCross || predecessor instanceof LOJoin && (isInner || innerFlags[j]))) {
-                    Operator input = preds.get(j);   
+                    Operator input = preds.get(j);
                     subPlan.add(input);
                     inputs.put( j, input );
                 }
@@ -369,11 +373,11 @@ public class PushUpFilter extends Rule {
     }
 
     @Override
-    protected OperatorPlan buildPattern() {        
+    protected OperatorPlan buildPattern() {
         LogicalPlan plan = new LogicalPlan();
         LogicalRelationalOperator op1 = new LOFilter(plan);
         plan.add( op1 );
-        
+
         return plan;
     }
 

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Thu May 12 09:58:24 2011
@@ -68,6 +68,34 @@ public class TestNewPlanFilterAboveForea
         Assert.assertTrue( fe2 instanceof LOForEach );
     }
     
+    /**
+     * Non-deterministic filters should not be pushed up (see PIG-2014).
+     * In the example below, if Filter gets pushed above flatten, we might remove
+     * whole bags of cuisines of random gets pushed up, while the intent is to sample from each bag.
+     * @throws Exception
+     */
+    @Test
+    public void testNondeterministicFilter() throws Exception {
+        String query = "A =LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) }, num:int );" +
+        "B = FOREACH A GENERATE name, flatten(cuisines), num;" +
+        "C = FILTER B BY RANDOM(num) > 5;" +
+        "D = STORE C INTO 'empty';" ;
+
+        LogicalPlan newLogicalPlan = buildPlan( query );
+
+        newLogicalPlan.explain(System.out, "text", true);
+
+        // Expect Filter to not be pushed, so it should be load->foreach-> filter
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator fe1 = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( fe1 instanceof LOForEach );
+        Operator fe2 = newLogicalPlan.getSuccessors( fe1 ).get( 0 );
+        Assert.assertTrue( fe2 instanceof LOForEach );
+        Operator filter = newLogicalPlan.getSuccessors( fe2 ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+    }
+
     @Test
     public void testMultipleFilter() throws Exception {
         String query = "A =LOAD 'file.txt' AS (name, cuisines : bag{ t : ( cuisine ) } );" +
@@ -450,10 +478,12 @@ public class TestNewPlanFilterAboveForea
             super(p, iterations, new HashSet<String>());
         }
         
+        @Override
         public void addPlanTransformListener(PlanTransformListener listener) {
             super.addPlanTransformListener(listener);
         }
         
+       @Override
        protected List<Set<Rule>> buildRuleSets() {            
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
             

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterRule.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterRule.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterRule.java Thu May 12 09:58:24 2011
@@ -487,6 +487,33 @@ public class TestNewPlanFilterRule {
         Assert.assertTrue( store instanceof LOStore );
     }
 
+    /**
+     * Test that SAMPLE doesn't get pushed up (see PIG-2014)
+     */
+    @Test
+    public void testSample() throws Exception {
+        String query = "A = LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) } );" +
+        "B = GROUP A by name;" +
+        "C = FOREACH B GENERATE group, A;" +
+        "D = SAMPLE C 0.1 ; " +
+        "E = STORE D INTO 'empty';";
+        // expect loload -> foreach -> cogroup -> filter
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+        newLogicalPlan.explain(System.out, "text", true);
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator fe1 = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( fe1 instanceof LOForEach );
+        Operator cg = newLogicalPlan.getSuccessors( fe1 ).get( 0 );
+        Assert.assertTrue( cg instanceof LOCogroup );
+        Operator fe2 = newLogicalPlan.getSuccessors( cg ).get( 0 );
+        Assert.assertTrue( fe1 instanceof LOForEach );
+        Operator filter = newLogicalPlan.getSuccessors( fe2 ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+
+    }
+
     private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
     	PigServer pigServer = new PigServer(pc);
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
@@ -500,10 +527,12 @@ public class TestNewPlanFilterRule {
             super(p, iterations, new HashSet<String>());
         }
         
+        @Override
         public void addPlanTransformListener(PlanTransformListener listener) {
             super.addPlanTransformListener(listener);
         }
         
+       @Override
        protected List<Set<Rule>> buildRuleSets() {            
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
             
@@ -531,6 +560,7 @@ public class TestNewPlanFilterRule {
             addPlanTransformListener(new ProjectionPatcher());
         }
         
+        @Override
         public void addPlanTransformListener(PlanTransformListener listener) {
             super.addPlanTransformListener(listener);
         }

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java Thu May 12 09:58:24 2011
@@ -1152,5 +1152,23 @@ public class TestNewPlanPushDownForeachF
         return newLogicalPlan;
     }
 
+    @Test
+    public void testNonDeterministicUdf() throws Exception {
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, RANDOM(), flatten($2);" +
+        "C = order B by $0, $1;" +
+        "D = store C into 'dummy';";
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+        
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
+        foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
+        Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+        Assert.assertTrue( sort instanceof LOSort );
+        
+    }
 }