You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/03 18:58:21 UTC

svn commit: r1099123 [9/16] - in /pig/branches/branch-0.9: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/ src/org/apache/pig/impl/logi...

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestPushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestPushDownForeachFlatten.java Tue May  3 16:58:19 2011
@@ -1,1026 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.test;
-
-
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.ExecType;
-import org.apache.pig.FilterFunc;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.*;
-import org.apache.pig.impl.logicalLayer.optimizer.*;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.test.utils.Identity;
-import org.apache.pig.test.utils.LogicalPlanTester;
-import org.apache.pig.impl.plan.optimizer.OptimizerException;
-
-import org.junit.Test;
-import org.junit.Before;
-
-/**
- * Test the logical optimizer.
- */
-
-public class TestPushDownForeachFlatten extends junit.framework.TestCase {
-
-    final String FILE_BASE_LOCATION = "test/org/apache/pig/test/data/DotFiles/" ;
-    static final int MAX_SIZE = 100000;
-
-    private final Log log = LogFactory.getLog(getClass());
-    PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
-    LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
-    
-
-    private static final String simpleEchoStreamingCommand;
-    static {
-        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
-            simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
-        else
-            simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
-    }
-
-    
-    @Before
-    public void tearDown() {
-        planTester.reset();
-    }
-
-    /**
-     * 
-     * A simple filter UDF for testing
-     *
-     */
-    static public class MyFilterFunc extends FilterFunc {
-    	
-    	@Override
-    	public Boolean exec(Tuple input) {
-    		return false;
-    	}
-    }
-    
-    @Test
-    //Test to ensure that the right exception is thrown when the input list is empty
-    public void testErrorEmptyInput() throws Exception {
-        LogicalPlan lp = new LogicalPlan();
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        try {
-            pushDownForeach.check(lp.getRoots());
-            fail("Exception Expected!");
-        } catch(Exception e) {
-            assertTrue(((OptimizerException)e).getErrorCode() == 2052);
-        }
-    }
-
-    @Test
-    //Test to ensure that the right exception is thrown when the input list is empty
-    public void testErrorNonForeachInput() throws Exception {
-        LogicalPlan lp = planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");;
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        try {
-            pushDownForeach.check(lp.getRoots());
-            fail("Exception Expected!");
-        } catch(Exception e) {
-            assertTrue(((OptimizerException)e).getErrorCode() == 2005);
-        }
-    }
-    
-    @Test
-    public void testForeachNoFlatten() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        LogicalPlan lp = planTester.buildPlan("B = foreach A generate $1;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        assertTrue(!pushDownForeach.check(lp.getLeaves()));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-        
-    }
-    
-    @Test
-    public void testForeachNoSuccessors() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        LogicalPlan lp = planTester.buildPlan("B = foreach A generate flatten($1);");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        assertTrue(!pushDownForeach.check(lp.getLeaves()));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-        
-    }
-    
-    @Test
-    public void testForeachStreaming() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate flatten($1);");
-        LogicalPlan lp = planTester.buildPlan("C = stream B through `" + simpleEchoStreamingCommand + "`;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-        
-    }
-    
-    @Test
-    public void testForeachDistinct() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate flatten($1);");
-        LogicalPlan lp = planTester.buildPlan("C = distinct B;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-        
-    }
-    
-    @Test
-    public void testForeachForeach() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");        
-        LogicalPlan lp = planTester.buildPlan("C = foreach B generate $0;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-    }
-    
-
-    @Test
-    public void testForeachFilter() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");        
-        LogicalPlan lp = planTester.buildPlan("C = filter B by $1 < 18;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-        
-    }
-
-    @Test
-    public void testForeachSplitOutput() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        LogicalPlan lp = planTester.buildPlan("split B into C if $1 < 18, D if $1 >= 18;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-        
-    }
-
-    @Test
-    public void testForeachLimit() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        LogicalPlan lp = planTester.buildPlan("B = limit B 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-    }
-
-    @Test
-    public void testForeachUnion() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
-        LogicalPlan lp = planTester.buildPlan("D = union B, C;");        
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);        
-    }
-    
-    @Test
-    public void testForeachCogroup() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
-        LogicalPlan lp = planTester.buildPlan("D = cogroup B by $0, C by $0;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);        
-    }
-    
-    @Test
-    public void testForeachGroupBy() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        LogicalPlan lp = planTester.buildPlan("C = group B by $0;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-    }
-    
-    @Test
-    public void testForeachSort() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");        
-        LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOSort sort = (LOSort) lp.getLeaves().get(0);
-        LOForEach foreach = (LOForEach)lp.getPredecessors(sort).get(0);
-        
-        assertTrue(pushDownForeach.check(lp.getPredecessors(sort)));
-        assertTrue(pushDownForeach.getSwap() == true);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-        
-        pushDownForeach.transform(lp.getPredecessors(sort));
-        
-        assertEquals(foreach, lp.getLeaves().get(0));
-        assertEquals(sort, lp.getPredecessors(foreach).get(0));
-        
-    }
-    
-    @Test
-    public void testForeachFlattenAddedColumnSort() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
-        LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOSort sort = (LOSort) lp.getLeaves().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getPredecessors(sort)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-        
-    }
-    
-    @Test
-    public void testForeachUDFSort() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, " + Identity.class.getName() + "($2) ;");
-        LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOSort sort = (LOSort) lp.getLeaves().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getPredecessors(sort)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-        
-    }
-    
-    @Test
-    public void testForeachCastSort() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate (chararray)$0, $1, flatten($2);");        
-        LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-        
-        LOSort sort = (LOSort) lp.getLeaves().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getPredecessors(sort)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-        
-    }
-    
-    @Test
-    public void testForeachCross() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
-        planTester.buildPlan("D = cross B, C;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        LOLimit limit = (LOLimit) lp.getLeaves().get(0);
-        LOCross cross = (LOCross)lp.getPredecessors(limit).get(0);
-        LOForEach foreach = (LOForEach) lp.getPredecessors(cross).get(0);
-        
-        Schema limitSchema = limit.getSchema();
-        
-        assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == true);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
-
-        pushDownForeach.transform(lp.getSuccessors(load));
-        
-        planTester.rebuildSchema(lp);
-        
-        for(Boolean b: foreach.getFlatten()) {
-            assertEquals(b.booleanValue(), false);
-        }
-        
-        LOForEach newForeach = (LOForEach)lp.getSuccessors(cross).get(0);
-        
-        
-        List<Boolean> newForeachFlatten = newForeach.getFlatten();
-        Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();        
-        for(Integer key: remap.keySet()) {
-            Integer value = remap.get(key);
-            assertEquals(newForeachFlatten.get(value).booleanValue(), true);
-        }
-        
-        assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));        
-        
-    }
-
-    @Test
-    public void testForeachCross1() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
-        planTester.buildPlan("D = cross A, C;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad load = (LOLoad) lp.getRoots().get(1);
-        LOLimit limit = (LOLimit) lp.getLeaves().get(0);
-        LOCross cross = (LOCross)lp.getPredecessors(limit).get(0);
-        LOForEach foreach = (LOForEach) lp.getPredecessors(cross).get(1);
-        
-        Schema limitSchema = limit.getSchema();
-        
-        assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == true);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
-
-        pushDownForeach.transform(lp.getSuccessors(load));
-        
-        planTester.rebuildSchema(lp);
-        
-        for(Boolean b: foreach.getFlatten()) {
-            assertEquals(b.booleanValue(), false);
-        }
-        
-        LOForEach newForeach = (LOForEach)lp.getSuccessors(cross).get(0);
-        
-        
-        List<Boolean> newForeachFlatten = newForeach.getFlatten();
-        Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();        
-        for(Integer key: remap.keySet()) {
-            Integer value = remap.get(key);
-            assertEquals(newForeachFlatten.get(value).booleanValue(), true);
-        }
-        
-        assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));       
-        
-    }
-
-    // TODO
-    // The following test case testForeachCross2 has multiple foreach flatten
-    // A new rule should optimize this case
-    @Test
-    public void testForeachCross2() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
-        planTester.buildPlan("E = cross B, D;");
-        LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-    
-    @Test
-    public void testForeachFlattenAddedColumnCross() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = cross B, C;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-
-    @Test
-    public void testForeachUDFCross() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = cross B, C;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-
-    @Test
-    public void testForeachCastCross() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, (int)$1, $2;");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = cross B, C;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-    
-    @Test
-    public void testForeachFRJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
-        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        LOLimit limit = (LOLimit) lp.getLeaves().get(0);
-        LOJoin frjoin = (LOJoin)lp.getPredecessors(limit).get(0);
-        LOForEach foreach = (LOForEach) lp.getPredecessors(frjoin).get(0);
-        
-        Schema limitSchema = limit.getSchema();
-        
-        assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == true);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
-
-        pushDownForeach.transform(lp.getSuccessors(load));
-        
-        planTester.rebuildSchema(lp);
-        
-        for(Boolean b: foreach.getFlatten()) {
-            assertEquals(b.booleanValue(), false);
-        }
-        
-        LOForEach newForeach = (LOForEach)lp.getSuccessors(frjoin).get(0);
-        
-        
-        List<Boolean> newForeachFlatten = newForeach.getFlatten();
-        Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();        
-        for(Integer key: remap.keySet()) {
-            Integer value = remap.get(key);
-            assertEquals(newForeachFlatten.get(value).booleanValue(), true);
-        }
-        
-        assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));        
-
-    }
-    
-
-    @Test
-    public void testForeachFRJoin1() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
-        planTester.buildPlan("D = join A by $0, C by $0 using \"replicated\";");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad load = (LOLoad) lp.getRoots().get(1);
-        LOLimit limit = (LOLimit) lp.getLeaves().get(0);
-        LOJoin frjoin = (LOJoin)lp.getPredecessors(limit).get(0);
-        LOForEach foreach = (LOForEach) lp.getPredecessors(frjoin).get(1);
-        
-        Schema limitSchema = limit.getSchema();
-        
-        assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == true);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
-
-        pushDownForeach.transform(lp.getSuccessors(load));
-        
-        planTester.rebuildSchema(lp);
-        
-        for(Boolean b: foreach.getFlatten()) {
-            assertEquals(b.booleanValue(), false);
-        }
-        
-        LOForEach newForeach = (LOForEach)lp.getSuccessors(frjoin).get(0);
-        
-        
-        List<Boolean> newForeachFlatten = newForeach.getFlatten();
-        Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();        
-        for(Integer key: remap.keySet()) {
-            Integer value = remap.get(key);
-            assertEquals(newForeachFlatten.get(value).booleanValue(), true);
-        }
-        
-        assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));       
-
-    }
-
-    // TODO
-    // The following test case testForeachFRJoin2 has multiple foreach flatten
-    // A new rule should optimize this case
-    @Test
-    public void testForeachFRJoin2() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
-        planTester.buildPlan("E = join B by $0, D by $0 using \"replicated\";");
-        LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-    
-    @Test
-    public void testForeachFlattenAddedColumnFRJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-
-    @Test
-    public void testForeachUDFFRJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-
-    @Test
-    public void testForeachCastFRJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-
-    @Test
-    public void testForeachInnerJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad load = (LOLoad) lp.getRoots().get(0);
-        LOLimit limit = (LOLimit) lp.getLeaves().get(0);
-        LOJoin join = (LOJoin)lp.getPredecessors(limit).get(0);
-        LOForEach foreach = (LOForEach) lp.getPredecessors(join).get(0);
-        
-        Schema limitSchema = limit.getSchema();
-        
-        assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == true);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
-
-        pushDownForeach.transform(lp.getSuccessors(load));
-        
-        planTester.rebuildSchema(lp);
-        
-        for(Boolean b: foreach.getFlatten()) {
-            assertEquals(b.booleanValue(), false);
-        }
-        
-        LOForEach newForeach = (LOForEach)lp.getSuccessors(join).get(0);
-        
-        
-        List<Boolean> newForeachFlatten = newForeach.getFlatten();
-        Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();        
-        for(Integer key: remap.keySet()) {
-            Integer value = remap.get(key);
-            assertEquals(newForeachFlatten.get(value).booleanValue(), true);
-        }
-        
-        assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));        
-
-    }
-    
-    @Test
-    public void testForeachInnerJoin1() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
-        planTester.buildPlan("D = join A by $0, C by $0;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad load = (LOLoad) lp.getRoots().get(1);
-        LOLimit limit = (LOLimit) lp.getLeaves().get(0);
-        LOJoin join = (LOJoin)lp.getPredecessors(limit).get(0);
-        LOForEach foreach = (LOForEach) lp.getPredecessors(join).get(1);
-        
-        Schema limitSchema = limit.getSchema();
-        
-        assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == true);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
-
-        pushDownForeach.transform(lp.getSuccessors(load));
-        
-        planTester.rebuildSchema(lp);
-        
-        for(Boolean b: foreach.getFlatten()) {
-            assertEquals(b.booleanValue(), false);
-        }
-        
-        LOForEach newForeach = (LOForEach)lp.getSuccessors(join).get(0);
-        
-        
-        List<Boolean> newForeachFlatten = newForeach.getFlatten();
-        Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();        
-        for(Integer key: remap.keySet()) {
-            Integer value = remap.get(key);
-            assertEquals(newForeachFlatten.get(value).booleanValue(), true);
-        }
-        
-        assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));       
-
-    }
-
-
-    // TODO
-    // The following test case testForeachInnerJoin2 has multiple foreach flatten
-    // A new rule should optimize this case
-    @Test
-    public void testForeachInnerJoin2() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
-        planTester.buildPlan("E = join B by $0, D by $0;");
-        LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-    
-    @Test
-    public void testForeachFlattenAddedColumnInnerJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-
-    @Test
-    public void testForeachUDFInnerJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-
-    @Test
-    public void testForeachCastInnerJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-        assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
-
-    }
-
-    // See PIG-1172
-    @Test
-    public void testForeachJoinRequiredField() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});");
-        planTester.buildPlan("B = FOREACH A generate flatten($0);");
-        planTester.buildPlan("C = load '3.txt' AS (c0, c1);");
-        planTester.buildPlan("D = JOIN B by a1, C by c1;");
-        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-    }
-    
-    // See PIG-1374
-    @Test
-    public void testForeachRequiredField() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (b{t(a0:chararray,a1:int)});");
-        planTester.buildPlan("B = foreach A generate flatten($0);");
-        LogicalPlan lp = planTester.buildPlan("C = order B by $1 desc;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
-
-        LOLoad loada = (LOLoad) lp.getRoots().get(0);
-        
-        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
-        assertTrue(pushDownForeach.getSwap() == false);
-        assertTrue(pushDownForeach.getInsertBetween() == false);
-    }
-}
-