You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/28 10:11:57 UTC
svn commit: r990323 - in /hadoop/pig/trunk:
src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
test/org/apache/pig/test/TestPruneColumn.java
Author: daijy
Date: Sat Aug 28 08:11:57 2010
New Revision: 990323
URL: http://svn.apache.org/viewvc?rev=990323&view=rev
Log:
PIG-1178: LogicalPlan and Optimizer are too complex and hard to work with (PIG-1178-8.patch)
Modified:
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java?rev=990323&r1=990322&r2=990323&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java Sat Aug 28 08:11:57 2010
@@ -32,15 +32,20 @@ import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
@@ -106,14 +111,11 @@ public class MapKeysPruneHelper {
MapMarker marker = new MapMarker(currentPlan);
marker.visit();
- // Get all Uids from Sinks
- List<Operator> sinks = currentPlan.getSinks();
- Set<Long> sinkMapUids = new HashSet<Long>();
- for( Operator sink : sinks ) {
- LogicalSchema schema = ((LogicalRelationalOperator)sink).getSchema();
- sinkMapUids.addAll( getMapUids( schema ) );
- }
-
+ // If the uid is the input uid of LOStore, LOCogroup, LOUnion, UserFunc, that means
+ // the entire map may be used. For simplicity, we do not prune any map key in this case
+ Set<Long> fullMapUids = new HashSet<Long>();
+ FullMapCollector collector = new FullMapCollector(currentPlan, fullMapUids);
+ collector.visit();
// If we have found specific keys which are needed then we return true;
// Else if we dont have any specific keys we return false
@@ -123,12 +125,12 @@ public class MapKeysPruneHelper {
(Map<Integer, Set<String>>) ((LogicalRelationalOperator)source).getAnnotation(REQUIRED_MAPKEYS);
// Now for all full maps found in sinks we cannot prune them at source
- if( ! sinkMapUids.isEmpty() && annotationValue != null &&
+ if( ! fullMapUids.isEmpty() && annotationValue != null &&
!annotationValue.isEmpty() ) {
Integer[] annotationKeyArray = annotationValue.keySet().toArray( new Integer[0] );
LogicalSchema sourceSchema = ((LogicalRelationalOperator)source).getSchema();
for( Integer col : annotationKeyArray ) {
- if( sinkMapUids.contains(sourceSchema.getField(col).uid)) {
+ if( fullMapUids.contains(sourceSchema.getField(col).uid)) {
annotationValue.remove( col );
}
}
@@ -172,13 +174,11 @@ public class MapKeysPruneHelper {
* @param schema Schema having fields
* @return
*/
- private Set<Long> getMapUids(LogicalSchema schema ) {
+ private static Set<Long> getMapUids(LogicalSchema schema ) {
Set<Long> uids = new HashSet<Long>();
if( schema != null ) {
for( LogicalFieldSchema field : schema.getFields() ) {
- if( field.type == DataType.MAP ) {
- uids.add( field.uid );
- }
+ uids.add( field.uid );
}
}
return uids;
@@ -188,7 +188,6 @@ public class MapKeysPruneHelper {
return subplan;
}
-
/**
* This class collects all the information required to create
* the list of keys required for a map
@@ -298,4 +297,72 @@ public class MapKeysPruneHelper {
}
}
}
+
+ static public class FullMapCollector extends AllExpressionVisitor {
+ Set<Long> fullMapUids = new HashSet<Long>();
+
+ protected FullMapCollector(OperatorPlan plan, Set<Long> fullMapUids) throws FrontendException {
+ super(plan, new DependencyOrderWalker(plan));
+ this.fullMapUids = fullMapUids;
+ }
+
+ @Override
+ public void visit(LOStore store) throws FrontendException {
+ super.visit(store);
+ Set<Long> uids = getMapUids(store.getSchema());
+ fullMapUids.addAll(uids);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void visit(LOUnion union) throws FrontendException {
+ super.visit(union);
+ List<Operator> preds = plan.getPredecessors(union);
+ if (preds!=null) {
+ for (Operator pred : preds) {
+ LogicalSchema schema = ((LogicalRelationalOperator)pred).getSchema();
+ Set<Long> uids = getMapUids(schema);
+ fullMapUids.addAll(uids);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void visit(LOCogroup cogroup) throws FrontendException {
+ super.visit(cogroup);
+ List<Operator> preds = plan.getPredecessors(cogroup);
+ if (preds!=null) {
+ for (Operator pred : preds) {
+ LogicalSchema schema = ((LogicalRelationalOperator)pred).getSchema();
+ Set<Long> uids = getMapUids(schema);
+ fullMapUids.addAll(uids);
+ }
+ }
+ }
+
+ @Override
+ protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr)
+ throws FrontendException {
+ return new FullMapExpCollector(expr, fullMapUids);
+ }
+
+ static class FullMapExpCollector extends LogicalExpressionVisitor {
+ Set<Long> fullMapUids = new HashSet<Long>();
+ protected FullMapExpCollector(OperatorPlan plan, Set<Long> fullMapUids)
+ throws FrontendException {
+ super(plan, new DependencyOrderWalker(plan));
+ this.fullMapUids = fullMapUids;
+ }
+
+ @Override
+ public void visit(UserFuncExpression userFunc) throws FrontendException {
+ List<Operator> succs = userFunc.getPlan().getSuccessors(userFunc);
+ if (succs==null) return;
+ LogicalExpression succ = (LogicalExpression)succs.get(0);
+ if (succ.getFieldSchema()!=null && succ.getFieldSchema().type==DataType.MAP)
+ fullMapUids.add(succ.getFieldSchema().uid);
+ }
+ }
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=990323&r1=990322&r2=990323&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Sat Aug 28 08:11:57 2010
@@ -1193,7 +1193,7 @@ public class TestPruneColumn extends Tes
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
"Map key required for A: $1->[key2, key1]"}));
}
- /*
+
@Test
public void testMapKey3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
@@ -1211,7 +1211,7 @@ public class TestPruneColumn extends Tes
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
- }*/
+ }
@Test
public void testMapKey4() throws Exception {