You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/28 10:11:57 UTC

svn commit: r990323 - in /hadoop/pig/trunk: src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java test/org/apache/pig/test/TestPruneColumn.java

Author: daijy
Date: Sat Aug 28 08:11:57 2010
New Revision: 990323

URL: http://svn.apache.org/viewvc?rev=990323&view=rev
Log:
PIG-1178: LogicalPlan and Optimizer are too complex and hard to work with (PIG-1178-8.patch)

Modified:
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java?rev=990323&r1=990322&r2=990323&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java Sat Aug 28 08:11:57 2010
@@ -32,15 +32,20 @@ import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.OperatorSubPlan;
 import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
 import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
 import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOUnion;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
@@ -106,14 +111,11 @@ public class MapKeysPruneHelper {
         MapMarker marker = new MapMarker(currentPlan);
         marker.visit();
         
-        // Get all Uids from Sinks
-        List<Operator> sinks = currentPlan.getSinks();
-        Set<Long> sinkMapUids = new HashSet<Long>();
-        for( Operator sink : sinks ) {
-            LogicalSchema schema = ((LogicalRelationalOperator)sink).getSchema();
-            sinkMapUids.addAll( getMapUids( schema ) );
-        }
-        
+        // If the uid is the input uid of LOStore, LOCogroup, LOUnion, UserFunc, that means
+        // the entire map may be used. For simplicity, we do not prune any map key in this case
+        Set<Long> fullMapUids = new HashSet<Long>();
+        FullMapCollector collector = new FullMapCollector(currentPlan, fullMapUids);
+        collector.visit();
         
         // If we have found specific keys which are needed then we return true;
         // Else if we dont have any specific keys we return false
@@ -123,12 +125,12 @@ public class MapKeysPruneHelper {
                 (Map<Integer, Set<String>>) ((LogicalRelationalOperator)source).getAnnotation(REQUIRED_MAPKEYS);
             
             // Now for all full maps found in sinks we cannot prune them at source
-            if( ! sinkMapUids.isEmpty() && annotationValue != null && 
+            if( ! fullMapUids.isEmpty() && annotationValue != null && 
                     !annotationValue.isEmpty() ) {
                 Integer[] annotationKeyArray = annotationValue.keySet().toArray( new Integer[0] );
                 LogicalSchema sourceSchema = ((LogicalRelationalOperator)source).getSchema();
                 for( Integer col : annotationKeyArray ) {                	
-                    if( sinkMapUids.contains(sourceSchema.getField(col).uid)) {
+                    if( fullMapUids.contains(sourceSchema.getField(col).uid)) {
                         annotationValue.remove( col );
                     }
                 }
@@ -172,13 +174,11 @@ public class MapKeysPruneHelper {
      * @param schema Schema having fields
      * @return
      */
-    private Set<Long> getMapUids(LogicalSchema schema ) {
+    private static Set<Long> getMapUids(LogicalSchema schema ) {
         Set<Long> uids = new HashSet<Long>();
         if( schema != null ) {
             for( LogicalFieldSchema field : schema.getFields() ) {
-                if( field.type == DataType.MAP ) {
-                    uids.add( field.uid );
-                }
+                uids.add( field.uid );
             }
         }
         return uids;
@@ -188,7 +188,6 @@ public class MapKeysPruneHelper {
         return subplan;
     }
 
-      
     /**
      * This class collects all the information required to create
      * the list of keys required for a map
@@ -298,4 +297,72 @@ public class MapKeysPruneHelper {
             }
         }
     }
+    
+    static public class FullMapCollector extends AllExpressionVisitor {
+        Set<Long> fullMapUids = new HashSet<Long>();
+
+        protected FullMapCollector(OperatorPlan plan, Set<Long> fullMapUids) throws FrontendException {
+            super(plan, new DependencyOrderWalker(plan));
+            this.fullMapUids = fullMapUids;
+        }
+        
+        @Override
+        public void visit(LOStore store) throws FrontendException {
+            super.visit(store);
+            Set<Long> uids = getMapUids(store.getSchema());
+            fullMapUids.addAll(uids);
+        }
+        
+        @SuppressWarnings("unchecked")
+        @Override
+        public void visit(LOUnion union) throws FrontendException {
+            super.visit(union);
+            List<Operator> preds = plan.getPredecessors(union);
+            if (preds!=null) {
+                for (Operator pred : preds) {
+                    LogicalSchema schema = ((LogicalRelationalOperator)pred).getSchema();
+                    Set<Long> uids = getMapUids(schema);
+                    fullMapUids.addAll(uids);
+                }
+            }
+        }
+        
+        @SuppressWarnings("unchecked")
+        @Override
+        public void visit(LOCogroup cogroup) throws FrontendException {
+            super.visit(cogroup);
+            List<Operator> preds = plan.getPredecessors(cogroup);
+            if (preds!=null) {
+                for (Operator pred : preds) {
+                    LogicalSchema schema = ((LogicalRelationalOperator)pred).getSchema();
+                    Set<Long> uids = getMapUids(schema);
+                    fullMapUids.addAll(uids);
+                }
+            }
+        }
+
+        @Override
+        protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr)
+                throws FrontendException {
+            return new FullMapExpCollector(expr, fullMapUids);
+        }
+        
+        static class FullMapExpCollector extends LogicalExpressionVisitor {
+            Set<Long> fullMapUids = new HashSet<Long>();
+            protected FullMapExpCollector(OperatorPlan plan, Set<Long> fullMapUids)
+                    throws FrontendException {
+                super(plan, new DependencyOrderWalker(plan));
+                this.fullMapUids = fullMapUids;
+            }
+            
+            @Override
+            public void visit(UserFuncExpression userFunc) throws FrontendException {
+                List<Operator> succs = userFunc.getPlan().getSuccessors(userFunc);
+                if (succs==null) return;
+                LogicalExpression succ = (LogicalExpression)succs.get(0);
+                if (succ.getFieldSchema()!=null && succ.getFieldSchema().type==DataType.MAP)
+                    fullMapUids.add(succ.getFieldSchema().uid);
+            }
+        }
+    }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=990323&r1=990322&r2=990323&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Sat Aug 28 08:11:57 2010
@@ -1193,7 +1193,7 @@ public class TestPruneColumn extends Tes
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0", 
                 "Map key required for A: $1->[key2, key1]"}));
     }
-    /*
+    
     @Test
     public void testMapKey3() throws Exception {
         pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
@@ -1211,7 +1211,7 @@ public class TestPruneColumn extends Tes
         assertFalse(iter.hasNext());
         
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
-    }*/
+    }
     
     @Test
     public void testMapKey4() throws Exception {