You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/05/12 11:58:24 UTC
svn commit: r1102227 - in /pig/branches/branch-0.9: ./
src/org/apache/pig/builtin/ src/org/apache/pig/newplan/logical/rules/
test/org/apache/pig/test/
Author: dvryaboy
Date: Thu May 12 09:58:24 2011
New Revision: 1102227
URL: http://svn.apache.org/viewvc?rev=1102227&view=rev
Log:
PIG-2014: SAMPLE should not be pushed up
Modified:
pig/branches/branch-0.9/CHANGES.txt
pig/branches/branch-0.9/src/org/apache/pig/builtin/RANDOM.java
pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java
pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterRule.java
pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Thu May 12 09:58:24 2011
@@ -178,6 +178,8 @@ PIG-1696: Performance: Use System.arrayc
BUG FIXES
+PIG-2014: SAMPLE shouldn't be pushed up (dvryaboy)
+
PIG-2058: Macro missing returns clause doesn't give a good error message (rding)
PIG-2035: Macro expansion doesn't handle multiple expansions of same macro inside another macro (rding)
Modified: pig/branches/branch-0.9/src/org/apache/pig/builtin/RANDOM.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/builtin/RANDOM.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/builtin/RANDOM.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/builtin/RANDOM.java Thu May 12 09:58:24 2011
@@ -29,8 +29,10 @@ import org.apache.pig.data.DataType;
* Return a random double value. Whatever arguments are passed to this UDF
* are ignored.
*/
+@Nondeterministic
public class RANDOM extends EvalFunc<Double>{
+ @Override
public Double exec(Tuple input) throws IOException {
return Math.random();
}
Modified: pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java Thu May 12 09:58:24 2011
@@ -55,7 +55,7 @@ public class FilterAboveForeach extends
LogicalPlan plan = new LogicalPlan();
LogicalRelationalOperator foreach = new LOForEach(plan);
LogicalRelationalOperator filter = new LOFilter(plan);
-
+
plan.add(foreach);
plan.add(filter);
plan.connect(foreach, filter);
@@ -67,14 +67,14 @@ public class FilterAboveForeach extends
public Transformer getNewTransformer() {
return new FilterAboveForEachTransformer();
}
-
+
public class FilterAboveForEachTransformer extends Transformer {
LOFilter filter = null;
LOForEach foreach = null;
LogicalRelationalOperator forEachPred = null;
OperatorSubPlan subPlan = null;
-
+
@Override
public boolean check(OperatorPlan matched) throws FrontendException {
Iterator<Operator> iter = matched.getOperators();
@@ -85,10 +85,10 @@ public class FilterAboveForeach extends
break;
}
}
-
+
// This would be a strange case
if( foreach == null ) return false;
-
+
iter = matched.getOperators();
while( iter.hasNext() ) {
Operator op = iter.next();
@@ -97,35 +97,37 @@ public class FilterAboveForeach extends
break;
}
}
-
+
// This is for cheating, we look up more than one filter in the plan
while( filter != null ) {
+
// Get uids of Filter
Pair<List<Long>, List<Byte>> uidWithTypes = getFilterProjectionUids(filter);
// See if the previous operators have uids from project
- List<Operator> preds = currentPlan.getPredecessors(foreach);
+ List<Operator> preds = currentPlan.getPredecessors(foreach);
for(int j=0; j< preds.size(); j++) {
LogicalRelationalOperator logRelOp = (LogicalRelationalOperator)preds.get(j);
if (hasAll(logRelOp, uidWithTypes)) {
forEachPred = (LogicalRelationalOperator) preds.get(j);
- return true;
+ // If a filter is nondeterministic, we shouldn't push it up.
+ return !OptimizerUtils.planHasNonDeterministicUdf(filter.getFilterPlan());
}
}
-
+
// Chances are there are filters below this filter which can be
// moved up. So searching for those filters
List<Operator> successors = currentPlan.getSuccessors(filter);
- if( successors != null && successors.size() > 0 &&
+ if( successors != null && successors.size() > 0 &&
successors.get(0) instanceof LOFilter ) {
filter = (LOFilter)successors.get(0);
} else {
filter = null;
}
}
- return false;
+ return false;
}
-
+
/**
* Get all uids from Projections of this FilterOperator
* @param filter
@@ -136,7 +138,7 @@ public class FilterAboveForeach extends
List<Byte> types = new ArrayList<Byte>();
if( filter != null ) {
LogicalExpressionPlan filterPlan = filter.getFilterPlan();
- Iterator<Operator> iter = filterPlan.getOperators();
+ Iterator<Operator> iter = filterPlan.getOperators();
Operator op = null;
while( iter.hasNext() ) {
op = iter.next();
@@ -159,29 +161,29 @@ public class FilterAboveForeach extends
}
}
}
-
+
}
-
+
Pair<List<Long>, List<Byte>> result = new Pair<List<Long>, List<Byte>>(uids, types);
return result;
}
-
+
/**
* checks if a relational operator contains all of the specified uids
* @param op LogicalRelational operator that should contain the uid
* @param uids Uids to check for
* @return true if given LogicalRelationalOperator has all the given uids
*/
- private boolean hasAll(LogicalRelationalOperator op, Pair<List<Long>,
+ private boolean hasAll(LogicalRelationalOperator op, Pair<List<Long>,
List<Byte>> uidWithTypes) throws FrontendException {
LogicalSchema schema = op.getSchema();
-
+
if (schema==null)
return false;
-
+
List<Long> uids = uidWithTypes.first;
List<Byte> types = uidWithTypes.second;
-
+
for (int i=0;i<uids.size();i++) {
boolean found = false;
for (LogicalSchema.LogicalFieldSchema fs : schema.getFields()) {
@@ -193,53 +195,53 @@ public class FilterAboveForeach extends
}
return true;
}
-
+
@Override
- public OperatorPlan reportChanges() {
+ public OperatorPlan reportChanges() {
return subPlan;
}
@Override
public void transform(OperatorPlan matched) throws FrontendException {
-
+
List<Operator> opSet = currentPlan.getPredecessors(filter);
if( ! ( opSet != null && opSet.size() > 0 ) ) {
return;
}
Operator filterPred = opSet.get(0);
-
+
opSet = currentPlan.getSuccessors(filter);
if( ! ( opSet != null && opSet.size() > 0 ) ) {
return;
}
Operator filterSuc = opSet.get(0);
-
+
subPlan = new OperatorSubPlan(currentPlan);
-
+
// Steps below do the following
/*
* ForEachPred
* |
- * ForEach
+ * ForEach
* |
* Filter*
* ( These are filters
* which cannot be moved )
* |
- * FilterPred
+ * FilterPred
* ( is a Filter )
* |
* Filter
- * ( To be moved )
+ * ( To be moved )
* |
* FilterSuc
- *
+ *
* |
* |
- * Transforms into
+ * Transforms into
* |
- * \/
- *
+ * \/
+ *
* ForEachPred
* |
* Filter
@@ -251,25 +253,25 @@ public class FilterAboveForeach extends
* ( These are filters
* which cannot be moved )
* |
- * FilterPred
+ * FilterPred
* ( is a Filter )
* |
* FilterSuc
- *
+ *
* Above plan is assuming we are modifying the filter in middle.
* If we are modifying the first filter after ForEach then
* -- * (kleene star) becomes zero
- * -- And ForEach is FilterPred
+ * -- And ForEach is FilterPred
*/
-
+
Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach);
Pair<Integer, Integer> filterPredPlaces = currentPlan.disconnect(filterPred, filter);
Pair<Integer, Integer> filterSucPlaces = currentPlan.disconnect(filter, filterSuc);
-
+
currentPlan.connect(forEachPred, forEachPredPlaces.first, filter, filterPredPlaces.second);
currentPlan.connect(filter, filterSucPlaces.first, foreach, forEachPredPlaces.second);
currentPlan.connect(filterPred, filterPredPlaces.first, filterSuc, filterSucPlaces.second);
-
+
subPlan.add(forEachPred);
subPlan.add(foreach);
subPlan.add(filterPred);
Modified: pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/OptimizerUtils.java Thu May 12 09:58:24 2011
@@ -19,7 +19,11 @@ package org.apache.pig.newplan.logical.r
import java.util.Iterator;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
@@ -63,4 +67,26 @@ public class OptimizerUtils {
LOGenerate gen = findGenerate( foreach );
return hasFlatten( gen );
}
+
+ /**
+ * Helper method to determine if the logical expression plan for a Filter contains
+ * non-deterministic operations and should therefore be treated extra carefully
+ * during optimization.
+ *
+ * @param filterPlan
+ * @return true of the filter plan contains a non-deterministic UDF
+ */
+ public static boolean planHasNonDeterministicUdf(LogicalExpressionPlan filterPlan) {
+ Iterator<Operator> it = filterPlan.getOperators();
+ while( it.hasNext() ) {
+ Operator op = it.next();
+ if( op instanceof UserFuncExpression ) {
+ Object udf = PigContext.instantiateFuncFromSpec(((UserFuncExpression) op).getFuncSpec());
+ if (udf.getClass().getAnnotation(Nondeterministic.class) != null) {
+ return true;
+}
+ }
+ }
+ return false;
+ }
}
Modified: pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java Thu May 12 09:58:24 2011
@@ -91,9 +91,16 @@ public class PushDownForEachFlatten exte
LOForEach foreach = (LOForEach)matched.getSources().get(0);
LOGenerate gen = OptimizerUtils.findGenerate( foreach );
+
if( !OptimizerUtils.hasFlatten( gen ) )
return false;
+ // If a foreach contains a nondeterministic udf, we shouldn't push it down.
+ for (LogicalExpressionPlan p : gen.getOutputPlans()) {
+ if (OptimizerUtils.planHasNonDeterministicUdf(p))
+ return false;
+ }
+
List<Operator> succs = currentPlan.getSuccessors( foreach );
if( succs == null || succs.size() != 1 )
return false;
Modified: pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java Thu May 12 09:58:24 2011
@@ -58,13 +58,13 @@ import org.apache.pig.newplan.optimizer.
import org.apache.pig.newplan.optimizer.Transformer;
public class PushUpFilter extends Rule {
-
+
public PushUpFilter(String n) {
- super(n, false);
+ super(n, false);
}
@Override
- public Transformer getNewTransformer() {
+ public Transformer getNewTransformer() {
return new PushUpFilterTransformer();
}
@@ -72,38 +72,42 @@ public class PushUpFilter extends Rule {
private OperatorSubPlan subPlan;
@Override
- public boolean check(OperatorPlan matched) throws FrontendException {
+ public boolean check(OperatorPlan matched) throws FrontendException {
// check if it is inner join
Operator current = matched.getSources().get(0);
-
+
Operator pred = findNonFilterPredecessor( current );
if( pred == null )
return false;
-
+
// sort, distinct, or sort by is always okay.
if( pred instanceof LOSort || pred instanceof LODistinct || pred instanceof LOUnion ) {
return true;
}
-
+
// if the predecessor is one of LOLoad/LOStore/LOStream/LOLimit/LONative
// if predecessor is LOForEach, it is optimized by rule FilterAboveForeach
// return false
if( pred instanceof LOLoad || pred instanceof LOStore || pred instanceof LOStream ||
- pred instanceof LOFilter || pred instanceof LOSplit || pred instanceof LOSplitOutput ||
+ pred instanceof LOFilter || pred instanceof LOSplit || pred instanceof LOSplitOutput ||
pred instanceof LOLimit || pred instanceof LONative || pred instanceof LOForEach) {
return false;
}
-
- LOFilter filter = (LOFilter)current;
+
+ LOFilter filter = (LOFilter)current;
List<Operator> preds = currentPlan.getPredecessors( pred );
LogicalExpressionPlan filterPlan = filter.getFilterPlan();
-
+
+ if (OptimizerUtils.planHasNonDeterministicUdf(filterPlan)) {
+ return false;
+ }
+
// collect all uids used in the filter plan
Set<Long> uids = collectUidFromExpPlan(filterPlan);
-
+
if( pred instanceof LOCogroup ) {
LOCogroup cogrp = (LOCogroup)pred;
- if( preds.size() == 1 ) {
+ if( preds.size() == 1 ) {
if( hasAll( (LogicalRelationalOperator)preds.get( 0 ), uids ) ) {
// Order by is ok if all UIDs can be found from previous operator.
return true;
@@ -115,7 +119,7 @@ public class PushUpFilter extends Rule {
return true;
}
}
-
+
// if the predecessor is a multi-input operator then detailed
// checks are required
if( pred instanceof LOCross || pred instanceof LOJoin ) {
@@ -135,7 +139,7 @@ public class PushUpFilter extends Rule {
if (isFullOuter)
return false;
}
-
+
for(int j=0; j<preds.size(); j++) {
if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
@@ -144,12 +148,12 @@ public class PushUpFilter extends Rule {
if (pred instanceof LOCross || pred instanceof LOJoin && (isInner || innerFlags[j]))
return true;
}
- }
+ }
}
-
+
return false;
}
-
+
private boolean containUDF(LogicalExpressionPlan filterPlan) {
Iterator<Operator> it = filterPlan.getOperators();
while( it.hasNext() ) {
@@ -171,9 +175,9 @@ public class PushUpFilter extends Rule {
}
return uids;
}
-
+
/**
- * Starting from current operator (which is a filter), search its successors until
+ * Starting from current operator (which is a filter), search its successors until
* locating a non-filter operator. Null is returned if none is found.
*/
private Operator findNonFilterPredecessor(Operator current) {
@@ -194,7 +198,7 @@ public class PushUpFilter extends Rule {
return pred;
}
} while( true );
-
+
}
@Override
@@ -202,32 +206,32 @@ public class PushUpFilter extends Rule {
subPlan = new OperatorSubPlan(currentPlan);
LOFilter filter = (LOFilter)matched.getSources().get(0);
-
+
// This is the one that we will insert filter btwn it and it's input.
Operator predecessor = this.findNonFilterPredecessor( filter );
subPlan.add( predecessor) ;
-
+
// Disconnect the filter in the plan without removing it from the plan.
Operator predec = currentPlan.getPredecessors( filter ).get( 0 );
Operator succed;
-
+
if (currentPlan.getSuccessors(filter)!=null)
succed = currentPlan.getSuccessors(filter).get(0);
else
succed = null;
-
+
Pair<Integer, Integer> p1 = currentPlan.disconnect(predec, filter);
if (succed!=null) {
subPlan.add(succed);
Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
currentPlan.connect(predec, p1.first, succed, p2.second);
}
-
+
if( predecessor instanceof LOSort || predecessor instanceof LODistinct ||
( predecessor instanceof LOCogroup && currentPlan.getPredecessors( predecessor ).size() == 1 ) ) {
// For sort, put the filter in front of it.
Operator prev = currentPlan.getPredecessors( predecessor ).get( 0 );
-
+
insertFilter( prev, predecessor, filter );
return;
}
@@ -236,12 +240,12 @@ public class PushUpFilter extends Rule {
LogicalExpressionPlan filterPlan = filter.getFilterPlan();
List<Operator> preds = currentPlan.getPredecessors( predecessor );
Map<Integer, Operator> inputs = findInputsToAddFilter( filterPlan, predecessor, preds );
-
- LOFilter newFilter = null;
+
+ LOFilter newFilter = null;
for( Entry<Integer, Operator> entry : inputs.entrySet() ) {
int inputIndex = entry.getKey();
Operator pred = entry.getValue();
-
+
// Find projection field offset
int columnOffset = 0;
if( predecessor instanceof LOJoin || predecessor instanceof LOCross ) {
@@ -249,11 +253,11 @@ public class PushUpFilter extends Rule {
columnOffset += ( (LogicalRelationalOperator)preds.get( i ) ).getSchema().size();
}
}
-
+
// Reuse the filter for the first match. For others, need to make a copy of the filter
// and add it between input and predecessor.
newFilter = newFilter == null ? filter : new LOFilter( (LogicalPlan)currentPlan );
-
+
currentPlan.add( newFilter );
subPlan.add( newFilter );
subPlan.add( pred );
@@ -264,7 +268,7 @@ public class PushUpFilter extends Rule {
if( sink instanceof ProjectExpression )
projExprs.add( (ProjectExpression)sink );
}
-
+
if( predecessor instanceof LOCogroup ) {
for( ProjectExpression projExpr : projExprs ) {
// Need to merge filter condition and cogroup by expression;
@@ -284,7 +288,7 @@ public class PushUpFilter extends Rule {
}
}
}
-
+
// Now, reset the projection expressions in the new filter plan.
sinks = fPlan.getSinks();
for( Operator sink : sinks ) {
@@ -296,11 +300,11 @@ public class PushUpFilter extends Rule {
}
}
newFilter.setFilterPlan( fPlan );
-
+
insertFilter( pred, predecessor, newFilter );
}
}
-
+
// check if a relational operator contains all of the specified uids
private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) throws FrontendException {
LogicalSchema schema = op.getSchema();
@@ -311,14 +315,14 @@ public class PushUpFilter extends Rule {
return false;
}
}
-
+
return true;
}
-
+
@Override
- public OperatorPlan reportChanges() {
+ public OperatorPlan reportChanges() {
return currentPlan;
- }
+ }
// Insert the filter in between the given two operators.
private void insertFilter(Operator prev, Operator predecessor, LOFilter filter)
@@ -327,19 +331,19 @@ public class PushUpFilter extends Rule {
currentPlan.connect( prev, p3.first, filter, 0 );
currentPlan.connect( filter, 0, predecessor, p3.second );
}
-
+
// Identify those among preds that will need to have a filter between it and the predecessor.
- private Map<Integer, Operator> findInputsToAddFilter(LogicalExpressionPlan filterPlan, Operator predecessor,
+ private Map<Integer, Operator> findInputsToAddFilter(LogicalExpressionPlan filterPlan, Operator predecessor,
List<Operator> preds) throws FrontendException {
Map<Integer, Operator> inputs = new HashMap<Integer, Operator>();
-
+
if( predecessor instanceof LOUnion || predecessor instanceof LOCogroup ) {
for( int i = 0; i < preds.size(); i++ ) {
inputs.put( i, preds.get( i ) );
}
return inputs;
}
-
+
// collect all uids used in the filter plan
Set<Long> uids = collectUidFromExpPlan(filterPlan);
boolean[] innerFlags = null;
@@ -353,13 +357,13 @@ public class PushUpFilter extends Rule {
}
}
}
-
+
// Find the predecessor of join that contains all required uids.
for(int j=0; j<preds.size(); j++) {
// Filter can push to LOJoin outer branch, but no inner branch
- if( hasAll((LogicalRelationalOperator)preds.get(j), uids) &&
+ if( hasAll((LogicalRelationalOperator)preds.get(j), uids) &&
(predecessor instanceof LOCross || predecessor instanceof LOJoin && (isInner || innerFlags[j]))) {
- Operator input = preds.get(j);
+ Operator input = preds.get(j);
subPlan.add(input);
inputs.put( j, input );
}
@@ -369,11 +373,11 @@ public class PushUpFilter extends Rule {
}
@Override
- protected OperatorPlan buildPattern() {
+ protected OperatorPlan buildPattern() {
LogicalPlan plan = new LogicalPlan();
LogicalRelationalOperator op1 = new LOFilter(plan);
plan.add( op1 );
-
+
return plan;
}
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Thu May 12 09:58:24 2011
@@ -68,6 +68,34 @@ public class TestNewPlanFilterAboveForea
Assert.assertTrue( fe2 instanceof LOForEach );
}
+ /**
+ * Non-deterministic filters should not be pushed up (see PIG-2014).
+ * In the example below, if Filter gets pushed above flatten, we might remove
+ * whole bags of cuisines of random gets pushed up, while the intent is to sample from each bag.
+ * @throws Exception
+ */
+ @Test
+ public void testNondeterministicFilter() throws Exception {
+ String query = "A =LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) }, num:int );" +
+ "B = FOREACH A GENERATE name, flatten(cuisines), num;" +
+ "C = FILTER B BY RANDOM(num) > 5;" +
+ "D = STORE C INTO 'empty';" ;
+
+ LogicalPlan newLogicalPlan = buildPlan( query );
+
+ newLogicalPlan.explain(System.out, "text", true);
+
+ // Expect Filter to not be pushed, so it should be load->foreach-> filter
+ Operator load = newLogicalPlan.getSources().get( 0 );
+ Assert.assertTrue( load instanceof LOLoad );
+ Operator fe1 = newLogicalPlan.getSuccessors( load ).get( 0 );
+ Assert.assertTrue( fe1 instanceof LOForEach );
+ Operator fe2 = newLogicalPlan.getSuccessors( fe1 ).get( 0 );
+ Assert.assertTrue( fe2 instanceof LOForEach );
+ Operator filter = newLogicalPlan.getSuccessors( fe2 ).get( 0 );
+ Assert.assertTrue( filter instanceof LOFilter );
+ }
+
@Test
public void testMultipleFilter() throws Exception {
String query = "A =LOAD 'file.txt' AS (name, cuisines : bag{ t : ( cuisine ) } );" +
@@ -450,10 +478,12 @@ public class TestNewPlanFilterAboveForea
super(p, iterations, new HashSet<String>());
}
+ @Override
public void addPlanTransformListener(PlanTransformListener listener) {
super.addPlanTransformListener(listener);
}
+ @Override
protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterRule.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterRule.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanFilterRule.java Thu May 12 09:58:24 2011
@@ -487,6 +487,33 @@ public class TestNewPlanFilterRule {
Assert.assertTrue( store instanceof LOStore );
}
+ /**
+ * Test that SAMPLE doesn't get pushed up (see PIG-2014)
+ */
+ @Test
+ public void testSample() throws Exception {
+ String query = "A = LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) } );" +
+ "B = GROUP A by name;" +
+ "C = FOREACH B GENERATE group, A;" +
+ "D = SAMPLE C 0.1 ; " +
+ "E = STORE D INTO 'empty';";
+ // expect loload -> foreach -> cogroup -> filter
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+ newLogicalPlan.explain(System.out, "text", true);
+
+ Operator load = newLogicalPlan.getSources().get( 0 );
+ Assert.assertTrue( load instanceof LOLoad );
+ Operator fe1 = newLogicalPlan.getSuccessors( load ).get( 0 );
+ Assert.assertTrue( fe1 instanceof LOForEach );
+ Operator cg = newLogicalPlan.getSuccessors( fe1 ).get( 0 );
+ Assert.assertTrue( cg instanceof LOCogroup );
+ Operator fe2 = newLogicalPlan.getSuccessors( cg ).get( 0 );
+ Assert.assertTrue( fe1 instanceof LOForEach );
+ Operator filter = newLogicalPlan.getSuccessors( fe2 ).get( 0 );
+ Assert.assertTrue( filter instanceof LOFilter );
+
+ }
+
private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
PigServer pigServer = new PigServer(pc);
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
@@ -500,10 +527,12 @@ public class TestNewPlanFilterRule {
super(p, iterations, new HashSet<String>());
}
+ @Override
public void addPlanTransformListener(PlanTransformListener listener) {
super.addPlanTransformListener(listener);
}
+ @Override
protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
@@ -531,6 +560,7 @@ public class TestNewPlanFilterRule {
addPlanTransformListener(new ProjectionPatcher());
}
+ @Override
public void addPlanTransformListener(PlanTransformListener listener) {
super.addPlanTransformListener(listener);
}
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1102227&r1=1102226&r2=1102227&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java Thu May 12 09:58:24 2011
@@ -1152,5 +1152,23 @@ public class TestNewPlanPushDownForeachF
return newLogicalPlan;
}
+ @Test
+ public void testNonDeterministicUdf() throws Exception {
+ String query = "A = load 'myfile' as (name, age, gpa);" +
+ "B = foreach A generate $0, RANDOM(), flatten($2);" +
+ "C = order B by $0, $1;" +
+ "D = store C into 'dummy';";
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+
+ Operator load = newLogicalPlan.getSources().get( 0 );
+ Assert.assertTrue( load instanceof LOLoad );
+ Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
+ Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( sort instanceof LOSort );
+
+ }
}