You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/04 19:46:48 UTC

svn commit: r982345 [9/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log...

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
+import org.apache.pig.newplan.logical.rules.MapKeysPruneHelper;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+public class TestNewPlanColumnPrune extends TestCase {
+
+    LogicalPlan plan = null;
+    PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+  
+    private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
+        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+        
+        return newPlan;
+    }
+    
+   
+    public void testNoPrune() throws Exception  {
+        // no foreach
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("b = filter a by v1==NULL;");        
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan newLogicalPlan = migratePlan(plan);
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("b = filter a by v1==NULL;");        
+        plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // no schema
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("b = foreach a generate $0, $1;");
+        plan = lpt.buildPlan("store b into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("b = foreach a generate $0, $1;");
+        plan = lpt.buildPlan("store b into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+    }
+       
+    public void testPrune() throws Exception  {
+        // only foreach
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("b = foreach a generate id;");        
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan newLogicalPlan = migratePlan(plan);
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id);");
+        lpt.buildPlan("b = foreach a generate id;");        
+        plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with filter
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = filter a by v1 != NULL AND (v2+v3)<100;");
+        lpt.buildPlan("c = foreach b generate id;");
+        plan = lpt.buildPlan("store c into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v3, v2);");
+        lpt.buildPlan("b = filter a by v1 != NULL AND (v2+v3)<100;");
+        lpt.buildPlan("c = foreach b generate id;");
+        plan = lpt.buildPlan("store c into 'empty';"); 
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with 2 foreach
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v2, v5, v4;");
+        lpt.buildPlan("c = foreach b generate v5, v4;");
+        plan = lpt.buildPlan("store c into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (v5, v4);");
+        lpt.buildPlan("b = foreach a generate v5, v4;");
+        lpt.buildPlan("c = foreach b generate v5, v4;");
+        plan = lpt.buildPlan("store c into 'empty';"); 
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with 2 foreach
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate id, v1, v5, v3, v4;");
+        lpt.buildPlan("c = foreach b generate v5, v4;");
+        plan = lpt.buildPlan("store c into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (v5, v4);");
+        lpt.buildPlan("b = foreach a generate v5, v4;");
+        lpt.buildPlan("c = foreach b generate v5, v4;");
+        plan = lpt.buildPlan("store c into 'empty';"); 
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with 2 foreach and filter in between
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v2, v5, v4;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (v5, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v2, v5, v4;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with 2 foreach after join
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2, v3);");
+        lpt.buildPlan("b = load 'c.txt' as (id, v4, v5, v6);");
+        lpt.buildPlan("c = join a by id, b by id;");       
+        lpt.buildPlan("d = foreach c generate a::id, v5, v3, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v3);");
+        lpt.buildPlan("b = load 'c.txt' as (id, v4, v5);");
+        lpt.buildPlan("c = join a by id, b by id;");       
+        lpt.buildPlan("d = foreach c generate a::id, v5, v3, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // with BinStorage, insert foreach after load
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");        
+        lpt.buildPlan("c = filter a by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v5, v4, v2;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5, v4;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+       // with BinStorage, not to insert foreach after load if there is already one
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");    
+        lpt.buildPlan("b = foreach a generate v5, v4, v2;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v5, v2;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+       // with BinStorage, not to insert foreach after load if there is already one
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");    
+        lpt.buildPlan("b = foreach a generate v5, v4, v2, 10;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+        lpt.buildPlan("b = foreach a generate v5, v2, 10;");
+        lpt.buildPlan("c = filter b by v2 != NULL;");
+        lpt.buildPlan("d = foreach c generate v5;");
+        plan = lpt.buildPlan("store d into 'empty';");  
+        expected = migratePlan(plan);
+        assertTrue(expected.isEqual(newLogicalPlan));
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testPruneWithMapKey() throws Exception {
+         // only foreach
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, m:map[]);");
+        lpt.buildPlan("b = foreach a generate id, m#'path';");        
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan newLogicalPlan = migratePlan(plan);
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, m:map[]);");
+        lpt.buildPlan("b = foreach a generate id, m#'path';");        
+        plan = lpt.buildPlan("store b into 'empty';");  
+        LogicalPlan expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        LOLoad op = (LOLoad)newLogicalPlan.getSources().get(0);
+        Map<Integer,Set<String>> annotation = 
+                (Map<Integer, Set<String>>) op.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+        assertEquals(annotation.size(), 1);
+        Set<String> s = new HashSet<String>();
+        s.add("path");
+        assertEquals(annotation.get(2), s);
+        
+        // foreach with join
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, m:map[]);");
+        lpt.buildPlan("b = load 'd.txt' as (id, v1, m:map[]);");
+        lpt.buildPlan("c = join a by id, b by id;");
+        lpt.buildPlan("d = filter c by a::m#'path' != NULL;");
+        lpt.buildPlan("e = foreach d generate a::id, b::id, b::m#'path', a::m;");        
+        plan = lpt.buildPlan("store e into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, m:map[]);");
+        lpt.buildPlan("b = load 'd.txt' as (id, m:map[]);");
+        lpt.buildPlan("c = join a by id, b by id;");
+        lpt.buildPlan("d = filter c by a::m#'path' != NULL;");
+        lpt.buildPlan("e = foreach d generate a::id, b::id, b::m#'path', a::m;");        
+        plan = lpt.buildPlan("store e into 'empty';");  
+        expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        List<Operator> ll = newLogicalPlan.getSources();
+        assertEquals(ll.size(), 2);
+        LOLoad loada = null;
+        LOLoad loadb = null;
+        for(Operator opp: ll) {
+            if (((LogicalRelationalOperator)opp).getAlias().equals("a")) {
+                loada = (LOLoad)opp;
+                continue;
+            }
+            
+            if (((LogicalRelationalOperator)opp).getAlias().equals("b")) {
+                loadb = (LOLoad)opp;
+                continue;
+            }
+        }
+                
+        annotation = 
+                (Map<Integer, Set<String>>) loada.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+        assertNull(annotation);
+        
+        annotation = 
+            (Map<Integer, Set<String>>) loadb.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+        assertEquals(annotation.size(), 1);
+    
+        s = new HashSet<String>();
+        s.add("path");
+        assertEquals(annotation.get(2), s);
+    }
+    
+    public void testPruneWithBag() throws Exception  {
+        // filter above foreach
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});");
+        lpt.buildPlan("b = filter a by id>10;");
+        lpt.buildPlan("c = foreach b generate id, FLATTEN(v);");    
+        lpt.buildPlan("d = foreach c generate id, v::s2;");    
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store d into 'empty';");  
+        LogicalPlan newLogicalPlan = migratePlan(plan);
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});");
+        lpt.buildPlan("b = filter a by id>10;");
+        lpt.buildPlan("c = foreach b generate id, FLATTEN(v);");    
+        lpt.buildPlan("d = foreach c generate id, v::s2;");    
+        plan = lpt.buildPlan("store d into 'empty';");
+        LogicalPlan expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+    }
+    
+    /*
+    public void testAddForeach() throws Exception  {
+        // filter above foreach
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("b = filter a by v1>10;");
+        lpt.buildPlan("c = foreach b generate id;");        
+        org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store c into 'empty';");  
+        LogicalPlan newLogicalPlan = migratePlan(plan);
+               
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1);");
+        lpt.buildPlan("b = filter a by v1>10;");
+        lpt.buildPlan("c = foreach b generate id;");      
+        plan = lpt.buildPlan("store c into 'empty';");  
+        LogicalPlan expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+        
+        // join with foreach
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("b = load 'd.txt' as (id, v1, v2);");
+        lpt.buildPlan("c = join a by id, b by id;");
+        lpt.buildPlan("d = filter c by a::v1>b::v1;");
+        lpt.buildPlan("e = foreach d generate a::id;");        
+        plan = lpt.buildPlan("store e into 'empty';");  
+        newLogicalPlan = migratePlan(plan);
+               
+        optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        
+        lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (id, v1);");
+        lpt.buildPlan("b = load 'd.txt' as (id, v1);");
+        lpt.buildPlan("c = join a by id, b by id;");
+        lpt.buildPlan("d = foreach c generate a::id, a::v1, b::v1;");        
+        lpt.buildPlan("e = filter d by a::v1>b::v1;");
+        lpt.buildPlan("f = foreach e generate a::id;");        
+        plan = lpt.buildPlan("store f into 'empty';");  
+        expected = migratePlan(plan);
+        
+        assertTrue(expected.isEqual(newLogicalPlan));
+    }*/
+    
+    public class MyPlanOptimizer extends LogicalPlanOptimizer {
+
+        protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
+            super(p, iterations);			
+        }
+        
+        protected List<Set<Rule>> buildRuleSets() {            
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+            
+            Rule r = new ColumnMapKeyPrune("ColumnMapKeyPrune");
+            Set<Rule> s = new HashSet<Rule>();
+            s.add(r);            
+            ls.add(s);
+            
+            return ls;
+        }
+    }    
+}