You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/28 00:23:27 UTC
svn commit: r990288 - in /hadoop/pig/trunk: ./ src/org/apache/pig/newplan/
src/org/apache/pig/newplan/logical/expression/
src/org/apache/pig/newplan/logical/optimizer/
src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/
Author: daijy
Date: Fri Aug 27 22:23:26 2010
New Revision: 990288
URL: http://svn.apache.org/viewvc?rev=990288&view=rev
Log:
PIG-1321: Logical Optimizer: Merge cascading foreach
Added:
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990288&r1=990287&r2=990288&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Aug 27 22:23:26 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
IMPROVEMENTS
+PIG-1321: Logical Optimizer: Merge cascading foreach (xuefuz via daijy)
+
PIG-1483: [piggybank] Add HadoopJobHistoryLoader to the piggybank (rding)
PIG-1555: [piggybank] add CSV Loader (dvryaboy)
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java?rev=990288&r1=990287&r2=990288&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java Fri Aug 27 22:23:26 2010
@@ -83,6 +83,10 @@ public abstract class Operator {
return annotations.remove(key);
}
+ public void setPlan(OperatorPlan p) {
+ plan = p;
+ }
+
/**
* This is like a shallow equals comparison.
* It returns true if two operators have equivalent properties even if they are
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java?rev=990288&r1=990287&r2=990288&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java Fri Aug 27 22:23:26 2010
@@ -19,7 +19,12 @@
package org.apache.pig.newplan.logical.expression;
import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.BaseOperatorPlan;
@@ -58,4 +63,35 @@ public class LogicalExpressionPlan exten
ExprPrinter npp = new ExprPrinter(this, ps);
npp.visit();
}
+
+ /**
+ * Merge all nodes in lgExpPlan, keep the connections
+ * @param lgExpPlan plan to merge
+ * @return sources of the merged plan
+ */
+ public List<Operator> merge(LogicalExpressionPlan lgExpPlan) throws FrontendException {
+
+ List<Operator> sources = lgExpPlan.getSources();
+
+ Iterator<Operator> iter = lgExpPlan.getOperators();
+ while (iter.hasNext()) {
+ LogicalExpression op = (LogicalExpression)iter.next();
+ op.setPlan(this);
+ add(op);
+ }
+
+ iter = lgExpPlan.getOperators();
+ while (iter.hasNext()) {
+ LogicalExpression startOp = (LogicalExpression)iter.next();
+ ArrayList<Operator> endOps = (ArrayList<Operator>)lgExpPlan.fromEdges.get(startOp);
+ if (endOps!=null) {
+ for (Operator endOp : endOps) {
+ connect(startOp, endOp);
+ }
+ }
+ }
+
+ return sources;
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=990288&r1=990287&r2=990288&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Fri Aug 27 22:23:26 2010
@@ -29,6 +29,7 @@ import org.apache.pig.newplan.logical.ru
import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.MergeFilter;
+import org.apache.pig.newplan.logical.rules.MergeForEach;
import org.apache.pig.newplan.logical.rules.PushUpFilter;
import org.apache.pig.newplan.logical.rules.SplitFilter;
import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter;
@@ -137,6 +138,14 @@ public class LogicalPlanOptimizer extend
if (!s.isEmpty())
ls.add(s);
+ // Add MergeForEach set
+ s = new HashSet<Rule>();
+ // Add the AddForEach
+ r = new MergeForEach("MergeForEach");
+ checkAndAddRule(s, r);
+ if (!s.isEmpty())
+ ls.add(s);
+
return ls;
}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=990288&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java Fri Aug 27 22:23:26 2010
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+
+public class MergeForEach extends Rule {
+
+ private OperatorSubPlan subPlan;
+
+ public MergeForEach(String name) {
+ super( name, false );
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // match each foreach.
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator foreach1 = new LOForEach(plan);
+ plan.add( foreach1 );
+ return plan;
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new MergeForEachTransformer();
+ }
+
+ public class MergeForEachTransformer extends Transformer {
+ @Override
+ public boolean check(OperatorPlan matched) throws FrontendException {
+ LOForEach foreach1 = (LOForEach)matched.getSources().get(0);
+ List<Operator> succs = currentPlan.getSuccessors( foreach1 );
+ if( succs == null || succs.size() != 1 || !( succs.get(0) instanceof LOForEach) )
+ return false;
+
+ LOForEach foreach2 = (LOForEach)succs.get(0);
+
+ // Check if the second foreach has only LOGenerate and LOInnerLoad
+ Iterator<Operator> it = foreach2.getInnerPlan().getOperators();
+ while( it.hasNext() ) {
+ Operator op = it.next();
+ if(!(op instanceof LOGenerate) && !(op instanceof LOInnerLoad))
+ return false;
+ }
+
+ // Check if the first foreach has flatten in its generate statement.
+ LOGenerate gen1 = (LOGenerate)foreach1.getInnerPlan().getSinks().get(0);
+ for (boolean flatten : gen1.getFlattenFlags()) {
+ if( flatten )
+ return false;
+ }
+
+ // Check if non of the 1st foreach output is referred more than once in second foreach.
+ // Otherwise, we may do expression calculation more than once, defeat the benefit of this
+ // optimization
+ Set<Integer> inputs = new HashSet<Integer>();
+ for (Operator op : foreach2.getInnerPlan().getSources()) {
+ // If the source is not LOInnerLoad, then it must be LOGenerate. This happens when
+ // the 1st ForEach does not rely on any input of 2nd ForEach
+ if (op instanceof LOInnerLoad) {
+ LOInnerLoad innerLoad = (LOInnerLoad)op;
+ int input = innerLoad.getProjection().getColNum();
+ if (inputs.contains(input))
+ return false;
+ else
+ inputs.add(input);
+
+ if (innerLoad.getProjection().isProjectStar())
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ private void addBranchToPlan(LOGenerate gen, int branch, OperatorPlan newPlan) {
+ Operator op = gen.getPlan().getPredecessors(gen).get(branch);
+ newPlan.add(op);
+ op.setPlan(newPlan);
+ Operator pred;
+ if (gen.getPlan().getPredecessors(op)!=null)
+ pred = gen.getPlan().getPredecessors(op).get(0);
+ else
+ pred = null;
+ while (pred!=null) {
+ newPlan.add(pred);
+ pred.setPlan(newPlan);
+ newPlan.connect(pred, op);
+ op = pred;
+ if (gen.getPlan().getPredecessors(pred)!=null)
+ pred = gen.getPlan().getPredecessors(pred).get(0);
+ else
+ pred = null;
+ }
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws FrontendException {
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ LOForEach foreach1 = (LOForEach)matched.getSources().get(0);
+ LOGenerate gen1 = (LOGenerate)foreach1.getInnerPlan().getSinks().get(0);
+
+ LOForEach foreach2 = (LOForEach)currentPlan.getSuccessors(foreach1).get(0);
+ LOGenerate gen2 = (LOGenerate)foreach2.getInnerPlan().getSinks().get(0);
+
+ LOForEach newForEach = new LOForEach(currentPlan);
+ LogicalPlan newForEachInnerPlan = new LogicalPlan();
+ newForEach.setInnerPlan(newForEachInnerPlan);
+ newForEach.setAlias(foreach2.getAlias());
+ newForEach.setRequestedParallelism(foreach1.getRequestedParallelisam());
+ List<LogicalExpressionPlan> newExpList = new ArrayList<LogicalExpressionPlan>();
+ LOGenerate newGen = new LOGenerate(newForEachInnerPlan, newExpList, gen2.getFlattenFlags());
+ newForEachInnerPlan.add(newGen);
+
+ for (LogicalExpressionPlan exp2 : gen2.getOutputPlans()) {
+ LogicalExpressionPlan newExpPlan = new LogicalExpressionPlan();
+ newExpPlan.merge(exp2);
+
+ // Add expression plan in 2nd ForEach
+ List<Operator> exp2Sinks = new ArrayList<Operator>();
+ exp2Sinks.addAll(newExpPlan.getSinks());
+ for (Operator exp2Sink : exp2Sinks) {
+ if (exp2Sink instanceof ProjectExpression) {
+ // Find referred expression plan in 1st ForEach
+ ProjectExpression proj = (ProjectExpression)exp2Sink;
+ LOInnerLoad innerLoad = (LOInnerLoad)foreach2.getInnerPlan().getPredecessors(gen2).get(proj.getInputNum());
+ int exp1Pos = innerLoad.getProjection().getColNum();
+ LogicalExpressionPlan exp1 = gen1.getOutputPlans().get(exp1Pos);
+ List<Operator> exp1Sources = newExpPlan.merge(exp1);
+
+ // Copy expression plan to the new ForEach, connect to the expression plan of 2nd ForEach
+ Operator exp1Source = exp1Sources.get(0);
+ if (newExpPlan.getPredecessors(exp2Sink)!=null) {
+ Operator exp2NextToSink = newExpPlan.getPredecessors(exp2Sink).get(0);
+ newExpPlan.disconnect(exp2NextToSink, exp2Sink);
+ newExpPlan.remove(exp2Sink);
+ newExpPlan.connect(exp2NextToSink, exp1Source);
+ }
+ else {
+ newExpPlan.remove(exp2Sink);
+ }
+ }
+ }
+
+ // Copy referred ForEach1 inner plan to new ForEach
+ List<Operator> exp1Sinks = newExpPlan.getSinks();
+ for (Operator exp1Sink : exp1Sinks) {
+ if (exp1Sink instanceof ProjectExpression) {
+ addBranchToPlan(gen1, ((ProjectExpression)exp1Sink).getInputNum(), newForEachInnerPlan);
+ Operator opNextToGen = foreach1.getInnerPlan().getPredecessors(gen1).get(((ProjectExpression)exp1Sink).getInputNum());
+ newForEachInnerPlan.connect(opNextToGen, newGen);
+ int input = newForEachInnerPlan.getPredecessors(newGen).indexOf(opNextToGen);
+ ((ProjectExpression)exp1Sink).setInputNum(input);
+ }
+ }
+
+ newExpList.add(newExpPlan);
+ }
+
+ // Adjust attachedOp
+ for (LogicalExpressionPlan p : newGen.getOutputPlans()) {
+ Iterator<Operator> iter = p.getOperators();
+ while (iter.hasNext()) {
+ Operator op = iter.next();
+ if (op instanceof ProjectExpression) {
+ ((ProjectExpression)op).setAttachedRelationalOp(newGen);
+ }
+ }
+ }
+
+ Iterator<Operator> iter = newForEach.getInnerPlan().getOperators();
+ while (iter.hasNext()) {
+ Operator op = iter.next();
+ if (op instanceof LOInnerLoad) {
+ ((LOInnerLoad)op).getProjection().setAttachedRelationalOp(newForEach);
+ }
+ }
+ // remove foreach1, foreach2, add new foreach
+ Operator pred = currentPlan.getPredecessors(foreach1).get(0);
+ Operator succ = currentPlan.getSuccessors(foreach2).get(0);
+ Pair<Integer, Integer> pos = currentPlan.disconnect(pred, foreach1);
+ currentPlan.disconnect(foreach1, foreach2);
+ currentPlan.disconnect(foreach2, succ);
+ currentPlan.remove(foreach1);
+ currentPlan.remove(foreach2);
+
+ currentPlan.add(newForEach);
+ currentPlan.connect(pred, pos.first, newForEach, pos.second);
+ currentPlan.connect(newForEach, succ);
+
+ subPlan.add(newForEach);
+ }
+ }
+}
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java?rev=990288&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java Fri Aug 27 22:23:26 2010
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.AddForEach;
+import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
+import org.apache.pig.newplan.logical.rules.MergeForEach;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import junit.framework.Assert;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMergeForEachOptimization {
+ LogicalPlan plan = null;
+ PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
+
+ private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
+ LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);
+ visitor.visit();
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+ return newPlan;
+ }
+
+ @BeforeClass
+ public static void setup() {
+
+ }
+
+ @AfterClass
+ public static void tearDown() {
+
+ }
+
+ /**
+ * Basic test case. Two simple FOREACH statements can be merged to one.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testSimple() throws IOException {
+ LogicalPlanTester lpt = new LogicalPlanTester( pc );
+ lpt.buildPlan( "A = load 'file.txt' as (a, b, c);" );
+ lpt.buildPlan( "B = foreach A generate a+b as u, c-b as v;" );
+ lpt.buildPlan( "C = foreach B generate $0+5, v;" );
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );
+ LogicalPlan newLogicalPlan = migratePlan( plan );
+
+ int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+ int outputExprCount1 = getOutputExprCount( newLogicalPlan );
+ LOForEach foreach1 = getForEachOperator( newLogicalPlan );
+ Assert.assertTrue( foreach1.getAlias().equals( "C" ) );
+
+ PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+ optimizer.optimize();
+
+ int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+ Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
+ int outputExprCount2 = getOutputExprCount( newLogicalPlan );
+ Assert.assertTrue( outputExprCount1 == outputExprCount2 );
+ LOForEach foreach2 = getForEachOperator( newLogicalPlan );
+ Assert.assertTrue( foreach2.getAlias().equals( "C" ) );
+ }
+
+ /**
+ * Test more complex case where the first for each in the script has inner plan.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testComplex() throws IOException {
+ LogicalPlanTester lpt = new LogicalPlanTester( pc );
+ lpt.buildPlan( "A = load 'file.txt' as (a:int, b, c:bag{t:tuple(c0:int,c1:int)});" );
+ lpt.buildPlan( "B = foreach A { S = ORDER c BY $0; generate $0 as u, COUNT(S) as v, SUM(S) as w; };" );
+ lpt.buildPlan( "C = foreach B generate w+5 as x, u-v/2;" );
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );
+ LogicalPlan newLogicalPlan = migratePlan( plan );
+
+ int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+ int outputExprCount1 = getOutputExprCount( newLogicalPlan );
+ LOForEach foreach1 = getForEachOperator( newLogicalPlan );
+ Assert.assertTrue( foreach1.getAlias().equals( "C" ) );
+
+ PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+ optimizer.optimize();
+
+ int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+ // The number of FOREACHes didn't change because one is genereated because of type cast and
+ // one is reduced because of the merge.
+ Assert.assertEquals( 0, forEachCount1 - forEachCount2 );
+ int outputExprCount2 = getOutputExprCount( newLogicalPlan );
+ Assert.assertTrue( outputExprCount1 == outputExprCount2 );
+ LOForEach foreach2 = getForEachOperator( newLogicalPlan );
+ Assert.assertTrue( foreach2.getAlias().equals( "C" ) );
+ }
+
+ /**
+ * Not all consecutive FOREACHes can be merged. In this case, the second FOREACH statment
+ * has inner plan, which cannot be merged with one before it.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testNegative1() throws IOException {
+ LogicalPlanTester lpt = new LogicalPlanTester( pc );
+ lpt.buildPlan( "A = LOAD 'file.txt' as (a, b, c, d:bag{t:tuple(c0:int,c1:int)});" );
+ lpt.buildPlan( "B = FOREACH A GENERATE a+5 AS u, b-c/2 AS v, d AS w;" );
+ lpt.buildPlan( "C = FOREACH B { S = ORDER w BY $0; GENERATE $0 as x, COUNT(S) as y; };" );
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );
+ LogicalPlan newLogicalPlan = migratePlan( plan );
+
+ int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+
+ PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+ optimizer.optimize();
+ int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+
+ // Actually MergeForEach optimization is happening here. A new foreach will be inserted after A because
+ // of typ casting. The inserted one and the one in B can be merged due to this optimization. However,
+ // the plan cannot be further optimized because C has inner plan.
+ Assert.assertEquals( forEachCount1, forEachCount2 );
+ }
+
+ /**
+ * MergeForEach Optimization is off if the first statement has a FLATTEN operator.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testNegative2() throws IOException {
+ LogicalPlanTester lpt = new LogicalPlanTester( pc );
+ lpt.buildPlan( "A = LOAD 'file.txt' as (a, b, c);" );
+ lpt.buildPlan( "B = FOREACH A GENERATE FLATTEN(a), b, c;" );
+ lpt.buildPlan( "C = FOREACH B GENERATE $0, $1+$2;" );
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan( "store C into 'empty';" );
+ LogicalPlan newLogicalPlan = migratePlan( plan );
+
+ int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
+
+ PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+ optimizer.optimize();
+
+ int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
+ Assert.assertEquals( 2, forEachCount1 );
+ Assert.assertEquals( 2, forEachCount2 );
+ }
+
+ private int getForEachOperatorCount(LogicalPlan plan) {
+ Iterator<Operator> ops = plan.getOperators();
+ int count = 0;
+ while( ops.hasNext() ) {
+ Operator op = ops.next();
+ if( op instanceof LOForEach )
+ count++;
+ }
+ return count;
+ }
+
+ private int getOutputExprCount(LogicalPlan plan) throws IOException {
+ LOForEach foreach = getForEachOperator( plan );
+ LogicalPlan inner = foreach.getInnerPlan();
+ List<Operator> ops = inner.getSinks();
+ LOGenerate gen = (LOGenerate)ops.get( 0 );
+ return gen.getOutputPlans().size();
+ }
+
+ private LOForEach getForEachOperator(LogicalPlan plan) throws IOException {
+ Iterator<Operator> ops = plan.getOperators();
+ while( ops.hasNext() ) {
+ Operator op = ops.next();
+ if( op instanceof LOForEach ) {
+ LOForEach foreach = (LOForEach)op;
+ Operator succ = plan.getSuccessors( foreach ).get( 0 );
+ if( !(succ instanceof LOForEach ) )
+ return foreach;
+ }
+ }
+ return null;
+ }
+
+ public class MyPlanOptimizer extends LogicalPlanOptimizer {
+ protected MyPlanOptimizer(OperatorPlan p, int iterations) {
+ super(p, iterations, new HashSet<String>());
+ }
+
+ protected List<Set<Rule>> buildRuleSets() {
+ List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+
+ Set<Rule> s = new HashSet<Rule>();
+ // add split filter rule
+ Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
+ s.add(r);
+ ls.add(s);
+
+ // Split Set
+ // This set of rules does splitting of operators only.
+ // It does not move operators
+ s = new HashSet<Rule>();
+ r = new AddForEach( "AddForEach" );
+ s.add(r);
+ ls.add(s);
+
+ s = new HashSet<Rule>();
+ r = new MergeForEach("MergeForEach");
+ s.add(r);
+ ls.add(s);
+
+ return ls;
+ }
+ }
+}