You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/01/05 04:24:06 UTC
svn commit: r895874 - in /hadoop/pig/trunk: ./
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/optimizer/ test/org/apache/pig/test/
Author: daijy
Date: Tue Jan 5 03:23:52 2010
New Revision: 895874
URL: http://svn.apache.org/viewvc?rev=895874&view=rev
Log:
PIG-1172: PushDownForeachFlatten shall not push ForEach below Join if the flattened fields is used in Join
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=895874&r1=895873&r2=895874&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jan 5 03:23:52 2010
@@ -323,6 +323,9 @@
PIG-761: ERROR 2086 on simple JOIN (daijy)
+PIG-1172: PushDownForeachFlatten shall not push ForEach below Join if the
+flattened fields is used in Join (daijy)
+
Release 0.5.0
INCOMPATIBLE CHANGES
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=895874&r1=895873&r2=895874&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Tue Jan 5 03:23:52 2010
@@ -795,11 +795,8 @@
return new Pair<Boolean, List<Integer>>(hasFlatten, flattenedColumns);
}
- public LogicalPlan getRelevantPlan(int output, int column)
+ public LogicalPlan getRelevantPlan(int column)
{
- if (output!=0)
- return null;
-
if (column<0)
return null;
@@ -814,6 +811,22 @@
return mSchemaPlanMapping.get(column);
}
+ public boolean isInputFlattened(int column) throws FrontendException {
+ LogicalPlan plan = getRelevantPlan(column);
+ if (plan==null) {
+ int errCode = 2195;
+ throw new FrontendException("Fail to get foreach plan for input column "+column,
+ errCode, PigException.BUG);
+ }
+ int index = mForEachPlans.indexOf(plan);
+ if (index==-1) {
+ int errCode = 2195;
+ throw new FrontendException("Fail to get foreach plan for input column "+column,
+ errCode, PigException.BUG);
+ }
+ return mFlatten.get(index);
+ }
+
@Override
public List<RequiredFields> getRelevantInputs(int output, int column) throws FrontendException {
if (!mIsSchemaComputed)
@@ -835,7 +848,7 @@
return null;
}
- LogicalPlan plan = getRelevantPlan(output, column);
+ LogicalPlan plan = getRelevantPlan(column);
TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
plan);
@@ -946,7 +959,7 @@
int index = planToRemove.get(planToRemove.size()-1);
if (mUserDefinedSchema!=null) {
for (int i=mUserDefinedSchema.size()-1;i>=0;i--) {
- if (getRelevantPlan(0, i)==mForEachPlans.get(index))
+ if (getRelevantPlan(i)==mForEachPlans.get(index))
mUserDefinedSchema.remove(i);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=895874&r1=895873&r2=895874&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java Tue Jan 5 03:23:52 2010
@@ -292,7 +292,7 @@
else if (rlo instanceof LOForEach)
{
// Relay map keys from output to input
- LogicalPlan forEachPlan = ((LOForEach)rlo).getRelevantPlan(requiredOutputField.first, requiredOutputField.second);
+ LogicalPlan forEachPlan = ((LOForEach)rlo).getRelevantPlan(requiredOutputField.second);
if (relevantFields.getFields()!=null && relevantFields.getFields().size()!=0)
{
int index = ((LOForEach)rlo).getForEachPlans().indexOf(forEachPlan);
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=895874&r1=895873&r2=895874&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java Tue Jan 5 03:23:52 2010
@@ -19,6 +19,7 @@
package org.apache.pig.impl.logicalLayer.optimizer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -41,6 +42,7 @@
import org.apache.pig.impl.plan.ProjectionMap;
import org.apache.pig.impl.plan.RequiredFields;
import org.apache.pig.impl.plan.OperatorPlan.IndexHelper;
+import org.apache.pig.impl.plan.ProjectionMap.Column;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
import org.apache.pig.PigException;
import org.apache.pig.impl.util.MultiMap;
@@ -283,6 +285,23 @@
}
}
+ // Check if flattened fields is required by LOJoin, if so, don't optimize
+ if (successor instanceof LOJoin) {
+ List<RequiredFields> requiredFieldsList = ((LOJoin)successor).getRequiredFields();
+ RequiredFields requiredFields = requiredFieldsList.get(foreachPosition.intValue());
+
+ MultiMap<Integer, Column> foreachMappedFields = foreachProjectionMap.getMappedFields();
+
+ for (Pair<Integer, Integer> pair : requiredFields.getFields()) {
+ Collection<Column> columns = foreachMappedFields.get(pair.second);
+ for (Column column : columns) {
+ Pair<Integer, Integer> foreachInputColumn = column.getInputColumn();
+ if (foreach.isInputFlattened(foreachInputColumn.second))
+ return false;
+ }
+ }
+ }
+
mInsertBetween = true;
return true;
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=895874&r1=895873&r2=895874&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java Tue Jan 5 03:23:52 2010
@@ -977,5 +977,27 @@
}
+ // See PIG-1172
+ @Test
+ public void testForeachJoinRequiredField() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});");
+ planTester.buildPlan("B = FOREACH A generate flatten($0);");
+ planTester.buildPlan("C = load '3.txt' AS (c0, c1);");
+ planTester.buildPlan("D = JOIN B by a1, C by c1;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ }
+
}