You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/10/18 20:16:33 UTC
svn commit: r1023931 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/
Author: daijy
Date: Mon Oct 18 18:16:32 2010
New Revision: 1023931
URL: http://svn.apache.org/viewvc?rev=1023931&view=rev
Log:
PIG-1669: PushUpFilter fail when filter condition contains scalar
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Oct 18 18:16:32 2010
@@ -209,6 +209,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1669: PushUpFilter fail when filter condition contains scalar (daijy)
+
PIG-1672: order of relations in replicated join gets switched in a query where
first relation has two mergeable foreach statements (thejas)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Oct 18 18:16:32 2010
@@ -771,6 +771,16 @@ public class MRCompiler extends PhyPlanV
mergedMap.scalars.add(physOp);
}
}
+ Set<PhysicalOperator> opsToChange = new HashSet<PhysicalOperator>();
+ for (Map.Entry<PhysicalOperator, MapReduceOper> entry : phyToMROpMap.entrySet()) {
+ if (entry.getValue()==rmro) {
+ opsToChange.add(entry.getKey());
+ }
+ }
+ for (PhysicalOperator op : opsToChange) {
+ phyToMROpMap.put(op, mergedMap);
+ }
+
MRPlan.remove(rmro);
}
return ret;
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java Mon Oct 18 18:16:32 2010
@@ -260,8 +260,13 @@ public class FilterAboveForeach extends
* -- And ForEach is FilterPred
*/
- currentPlan.removeAndReconnect(filter);
- currentPlan.insertBetween(forEachPred, filter, foreach);
+ Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach);
+ Pair<Integer, Integer> filterPredPlaces = currentPlan.disconnect(filterPred, filter);
+ Pair<Integer, Integer> filterSucPlaces = currentPlan.disconnect(filter, filterSuc);
+
+ currentPlan.connect(forEachPred, forEachPredPlaces.first, filter, filterPredPlaces.second);
+ currentPlan.connect(filter, filterSucPlaces.first, foreach, forEachPredPlaces.second);
+ currentPlan.connect(filterPred, filterPredPlaces.first, filterSuc, filterSucPlaces.second);
subPlan.add(forEachPred);
subPlan.add(foreach);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java Mon Oct 18 18:16:32 2010
@@ -17,6 +17,7 @@
*/
package org.apache.pig.newplan.logical.rules;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -82,7 +83,12 @@ public class MergeFilter extends Rule {
}
// Since we remove next, we need to merge soft link into filter
- List<Operator> nextSoftPreds = currentPlan.getSoftLinkPredecessors(next);
+ List<Operator> nextSoftPreds = null;
+ if (currentPlan.getSoftLinkPredecessors(next)!=null) {
+ nextSoftPreds = new ArrayList<Operator>();
+ nextSoftPreds.addAll(currentPlan.getSoftLinkPredecessors(next));
+ }
+
if (nextSoftPreds!=null) {
for (Operator softPred : nextSoftPreds) {
currentPlan.removeSoftLink(softPred, next);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Mon Oct 18 18:16:32 2010
@@ -110,6 +110,10 @@ public class PartitionFilterOptimizer ex
return false;
loFilter = (LOFilter)succeds.get(0);
+ // Filter has dependency other than load, skip optimization
+ if (currentPlan.getSoftLinkPredecessors(loFilter)!=null)
+ return false;
+
// we have to check more only if LoadFunc implements LoadMetada
loadFunc = loLoad.getLoadFunc();
if(!( loadFunc instanceof LoadMetadata ) ) {
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java Mon Oct 18 18:16:32 2010
@@ -211,14 +211,18 @@ public class PushDownForEachFlatten exte
LOForEach foreach = (LOForEach)matched.getSources().get(0);
Operator next = currentPlan.getSuccessors( foreach ).get(0);
if( next instanceof LOSort ) {
- currentPlan.removeAndReconnect(foreach);
-
- List<Operator> succs = currentPlan.getSuccessors( next );
+ Operator pred = currentPlan.getPredecessors( foreach ).get( 0 );
+ List<Operator> succs = new ArrayList<Operator>();
+ succs.addAll(currentPlan.getSuccessors( next ));
+ Pair<Integer, Integer> pos1 = currentPlan.disconnect( pred, foreach );
+ Pair<Integer, Integer> pos2 = currentPlan.disconnect( foreach, next );
+ currentPlan.connect( pred, pos1.first, next, pos2.second );
+
if( succs != null ) {
- List<Operator> succsCopy = new ArrayList<Operator>();
- succsCopy.addAll(succs);
- for( Operator succ : succsCopy ) {
- currentPlan.insertBetween(next, foreach, succ);
+ for( Operator succ : succs ) {
+ Pair<Integer, Integer> pos = currentPlan.disconnect( next, succ );
+ currentPlan.connect( next, pos.first, foreach, 0 );
+ currentPlan.connect( foreach, 0, succ, pos.second );
}
} else {
currentPlan.connect( next, foreach );
@@ -268,10 +272,10 @@ public class PushDownForEachFlatten exte
newForeach.setAlias(((LogicalRelationalOperator)next).getAlias());
- currentPlan.add( newForeach );
Operator opAfterX = null;
List<Operator> succs = currentPlan.getSuccessors( next );
if( succs == null || succs.size() == 0 ) {
+ currentPlan.add( newForeach );
currentPlan.connect( next, newForeach );
} else {
opAfterX = succs.get( 0 );
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java Mon Oct 18 18:16:32 2010
@@ -207,6 +207,8 @@ public class PushUpFilter extends Rule {
Operator predecessor = this.findNonFilterPredecessor( filter );
subPlan.add( predecessor) ;
+ // Disconnect the filter in the plan without removing it from the plan.
+ Operator predec = currentPlan.getPredecessors( filter ).get( 0 );
Operator succed;
if (currentPlan.getSuccessors(filter)!=null)
@@ -214,12 +216,13 @@ public class PushUpFilter extends Rule {
else
succed = null;
+ Pair<Integer, Integer> p1 = currentPlan.disconnect(predec, filter);
if (succed!=null) {
subPlan.add(succed);
+ Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
+ currentPlan.connect(predec, p1.first, succed, p2.second);
}
- currentPlan.removeAndReconnect(filter);
-
if( predecessor instanceof LOSort || predecessor instanceof LODistinct ||
( predecessor instanceof LOCogroup && currentPlan.getPredecessors( predecessor ).size() == 1 ) ) {
// For sort, put the filter in front of it.
@@ -318,7 +321,9 @@ public class PushUpFilter extends Rule {
// Insert the filter in between the given two operators.
private void insertFilter(Operator prev, Operator predecessor, LOFilter filter)
throws FrontendException {
- currentPlan.insertBetween(prev, filter, predecessor);
+ Pair<Integer, Integer> p3 = currentPlan.disconnect( prev, predecessor );
+ currentPlan.connect( prev, p3.first, filter, 0 );
+ currentPlan.connect( filter, 0, predecessor, p3.second );
}
// Identify those among preds that will need to have a filter between it and the predecessor.
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Oct 18 18:16:32 2010
@@ -745,4 +745,35 @@ public class TestEvalPipeline2 extends T
assertFalse(iter.hasNext());
}
+
+ // See PIG-1669
+ @Test
+ public void testPushUpFilterScalar() throws Exception{
+ String[] input1 = {
+ "jason\t14\t4.7",
+ "jack\t18\t4.6"
+ };
+
+ String[] input2 = {
+ "jason\t14",
+ "jack\t18"
+ };
+
+ Util.createInputFile(cluster, "table_PushUpFilterScalar1", input1);
+ Util.createInputFile(cluster, "table_PushUpFilterScalar2", input2);
+ pigServer.registerQuery("A = load 'table_PushUpFilterScalar1' as (name, age, gpa);");
+ pigServer.registerQuery("B = load 'table_PushUpFilterScalar2' as (name, age);");
+ pigServer.registerQuery("C = filter A by age < 20;");
+ pigServer.registerQuery("D = filter B by age < 20;");
+ pigServer.registerQuery("simple_scalar = limit D 1;");
+ pigServer.registerQuery("E = join C by name, D by name;");
+ pigServer.registerQuery("F = filter E by C::age==(int)simple_scalar.age;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("F");
+
+ Tuple t = iter.next();
+ assertTrue(t.toString().equals("(jason,14,4.7,jason,14)"));
+
+ assertFalse(iter.hasNext());
+ }
}
Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1023931&r1=1023930&r2=1023931&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Mon Oct 18 18:16:32 2010
@@ -379,7 +379,7 @@ public class TestNewPlanFilterAboveForea
Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
-
+
@Test
public void testFilterForeachFlatten() throws Exception {
planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
@@ -398,6 +398,23 @@ public class TestNewPlanFilterAboveForea
Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
+
+ // See PIG-1669
+ @Test
+ public void testPushUpFilterWithScalar() throws Exception {
+ planTester.buildPlan("a = load 'studenttab10k' as (name, age, gpa);");
+ planTester.buildPlan("b = group a all;");
+ planTester.buildPlan("c = foreach b generate AVG(a.age) as age;");
+ planTester.buildPlan("d = foreach a generate name, age;");
+ planTester.buildPlan("e = filter d by age > c.age;");
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("f = store e into 'empty';");
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( plan );
+
+ Operator store = newLogicalPlan.getSinks().get( 0 );
+ Operator foreach = newLogicalPlan.getPredecessors(store).get(0);
+ Assert.assertTrue( foreach instanceof LOForEach );
+ }
private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws FrontendException {
LogicalPlan newLogicalPlan = migratePlan( plan );