You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/03/04 19:21:22 UTC
svn commit: r919109 - in /hadoop/pig/trunk: CHANGES.txt
src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
src/org/apache/pig/impl/plan/OperatorPlan.java
test/org/apache/pig/test/TestPartitionFilterOptimization.java
Author: rding
Date: Thu Mar 4 18:21:21 2010
New Revision: 919109
URL: http://svn.apache.org/viewvc?rev=919109&view=rev
Log:
PIG-1267: Problems with partition filter optimizer
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=919109&r1=919108&r2=919109&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Mar 4 18:21:21 2010
@@ -139,6 +139,8 @@
BUG FIXES
+PIG-1267: Problems with partition filter optimizer (rding)
+
PIG-1079: Modify merge join to use distributed cache to maintain the index
(rding)
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java?rev=919109&r1=919108&r2=919109&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java Thu Mar 4 18:21:21 2010
@@ -19,10 +19,11 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.Expression;
@@ -31,7 +32,6 @@
import org.apache.pig.PigException;
import org.apache.pig.Expression.BinaryExpression;
import org.apache.pig.Expression.Column;
-import org.apache.pig.Expression.OpType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOFilter;
import org.apache.pig.impl.logicalLayer.LOLoad;
@@ -69,9 +69,9 @@
private LOFilter loFilter;
/**
- * flag to ensure we only do the optimization once for performance reasons
+ * to ensure we only do the optimization once for performance reasons
*/
- private boolean alreadyCalled = false;
+ private Set<LogicalOperator> alreadyChecked = new HashSet<LogicalOperator>();
/**
* a map between column names as reported in
@@ -98,13 +98,6 @@
@Override
public boolean check(List<LogicalOperator> nodes) throws OptimizerException
{
- if(!alreadyCalled) {
- // first call
- alreadyCalled = true;
- } else {
- // already called, just return
- return false;
- }
if((nodes == null) || (nodes.size() <= 0)) {
int errCode = 2052;
String msg = "Internal error. Cannot retrieve operator from null " +
@@ -114,6 +107,9 @@
if(nodes.size() != 1|| !(nodes.get(0) instanceof LOLoad)) {
return false;
}
+ if (!alreadyChecked.add(nodes.get(0))) {
+ return false;
+ }
loLoad = (LOLoad)nodes.get(0);
List<LogicalOperator> sucs = mPlan.getSuccessors(loLoad);
if(sucs == null || sucs.size() != 1 || !(sucs.get(0) instanceof LOFilter)) {
@@ -164,7 +160,7 @@
updateMappedColNames(partitionFilter);
loadMetadata.setPartitionFilter(partitionFilter);
if(pColFilterFinder.isFilterRemovable()) {
- // remove this filter from the plan
+ // remove this filter from the plan
mPlan.removeAndReconnect(loFilter);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=919109&r1=919108&r2=919109&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Thu Mar 4 18:21:21 2010
@@ -669,7 +669,10 @@
pred = preds.get(0);
disconnect(pred, node);
}
-
+
+ int oldPos = -1;
+ int newPos = -1;
+
List<E> succs = getSuccessors(node);
E succ = null;
if (succs != null) {
@@ -681,13 +684,51 @@
throw pe;
}
succ = succs.get(0);
+ List<E> plst = getPredecessors(succ);
+ for (int i=0; i<plst.size(); i++) {
+ if (plst.get(i).equals(node)) {
+ oldPos = i;
+ }
+ }
disconnect(node, succ);
}
remove(node);
if (pred != null && succ != null) {
connect(pred, succ);
- succ.rewire(node, 0, pred, true);
+ List<E> plst = getPredecessors(succ);
+ for (int i=0; i<plst.size(); i++) {
+ if (plst.get(i).equals(pred)) {
+ newPos = i;
+ }
+ }
+
+ if (oldPos < 0 || newPos < 0) {
+ throw new PlanException("Invalid position index: " + oldPos
+ + " : " + newPos);
+ }
+
+ if (oldPos != newPos) {
+ List<E> nlst = new ArrayList<E>();
+ for (int i=0; i<plst.size(); i++) {
+ E nod = plst.get(i);
+ if (i == oldPos) {
+ nlst.add(pred);
+ }
+ if (i == newPos) continue;
+ nlst.add(nod);
+ }
+
+ if (nlst.size() != plst.size()) {
+ throw new PlanException("Invalid list size: " + nlst.size()
+ + " : " + plst.size());
+ }
+
+ mToEdges.removeKey(succ);
+ mToEdges.put(succ, nlst);
+ }
+
+ succ.rewire(node, oldPos, pred, true);
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java?rev=919109&r1=919108&r2=919109&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPartitionFilterOptimization.java Thu Mar 4 18:21:21 2010
@@ -45,6 +45,7 @@
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.PColFilterExtractor;
+import org.apache.pig.impl.logicalLayer.PlanSetter;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.LogUtils;
@@ -420,6 +421,46 @@
"((age >= 20) and (f3 == 15))", actual);
}
+ /**
+ * Test PIG-1267
+ * @throws Exception
+ */
+ @Test
+ public void testColNameMapping5() throws Exception {
+ TestLoader.partFilter = null;
+ lpTester.buildPlan("a = load 'foo' using "
+ + TestLoader.class.getName() +
+ "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
+ "'srcid');");
+ lpTester.buildPlan("b = load 'bar' using "
+ + TestLoader.class.getName() +
+ "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
+ "'srcid');");
+ lpTester.buildPlan("a1 = filter a by srcid == 10;");
+ lpTester.buildPlan("b1 = filter b by srcid == 20;");
+ lpTester.buildPlan("c = join a1 by bcookie, b1 by bcookie;");
+ LogicalPlan lp = lpTester
+ .buildPlan("d = foreach c generate $4 as bcookie:chararray, " +
+ "$5 as dstid:int, $0 as mrkt:chararray;");
+
+ new PlanSetter(lp).visit();
+
+ lpTester.typeCheckPlan(lp);
+ lpTester.optimizePlan(lp);
+
+ assertEquals("checking partition filter:",
+ "(srcid == 20)",
+ TestLoader.partFilter.toString());
+
+ int counter = 0;
+ Iterator<LogicalOperator> iter = lp.getKeys().values().iterator();
+ while (iter.hasNext()) {
+ assertTrue(!(iter.next() instanceof LOFilter));
+ counter++;
+ }
+ assertEquals(counter, 6);
+ }
+
//// helper methods ///////
private PColFilterExtractor test(LogicalPlan lp, List<String> partitionCols,