You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/03/15 04:28:28 UTC

svn commit: r923043 [5/5] - in /hadoop/pig/trunk: src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logical/optimizer/ src/org...

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumnNewLogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumnNewLogicalPlan.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumnNewLogicalPlan.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumnNewLogicalPlan.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,1879 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.pig.ExecType;
+import org.apache.pig.FilterFunc;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.optimizer.PruneColumns;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestPruneColumnNewLogicalPlan extends TestCase {
+    //MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer;
+    File tmpFile1;
+    File tmpFile2;
+    File tmpFile3;
+    File tmpFile4;
+    File tmpFile5;
+    File tmpFile6;
+    File tmpFile7;
+    File tmpFile8;
+    File tmpFile9;
+    File tmpFile10;
+    File logFile;
+
+    static public class PigStorageWithTrace extends PigStorage {
+
+        /**
+         * @param delimiter
+         */
+        public PigStorageWithTrace() {
+            super();
+        }
+        @Override
+        public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
+            RequiredFieldResponse response = super.pushProjection(requiredFieldList);
+            Logger logger = Logger.getLogger(this.getClass());
+            logger.info(requiredFieldList);
+            return response;
+        }
+
+    }
+    private static final String simpleEchoStreamingCommand;
+    static {
+        if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+            simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+        else
+            simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+    }
+
+    static public class MyFilterFunc extends FilterFunc {
+        
+        @Override
+        public Boolean exec(Tuple input) {
+            return true;
+        }
+    }
+    
+    @Before
+    @Override
+    public void setUp() throws Exception{
+        Logger logger = Logger.getLogger(Rule.class);
+        logger.removeAllAppenders();
+        logger.setLevel(Level.INFO);
+        SimpleLayout layout = new SimpleLayout();
+        logFile = File.createTempFile("log", "");
+        FileAppender appender = new FileAppender(layout, logFile.toString(), false, false, 0);
+        logger.addAppender(appender);
+        
+        Logger pigStorageWithTraceLogger = Logger.getLogger(PigStorageWithTrace.class);
+        pigStorageWithTraceLogger.setLevel(Level.INFO);
+        pigStorageWithTraceLogger.addAppender(appender);
+        
+        pigServer = new PigServer("local");
+        pigServer.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "true");
+        
+        //pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        tmpFile1 = File.createTempFile("prune", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile1));
+        ps.println("1\t2\t3");
+        ps.println("2\t5\t2");
+        ps.close();
+        
+        tmpFile2 = File.createTempFile("prune", "txt");
+        ps = new PrintStream(new FileOutputStream(tmpFile2));
+        ps.println("1\t1");
+        ps.println("2\t2");
+        ps.close();
+
+        tmpFile3 = File.createTempFile("prune", "txt");
+        ps = new PrintStream(new FileOutputStream(tmpFile3));
+        ps.println("1\t[key1#1,key2#2]");
+        ps.println("2\t[key1#2,key2#4]");
+        ps.close();
+
+        tmpFile4 = File.createTempFile("prune", "txt");
+        ps = new PrintStream(new FileOutputStream(tmpFile4));
+        ps.println("1\t2\t3");
+        ps.println("1\t2\t3");
+        ps.close();
+        
+        tmpFile5 = File.createTempFile("prune", "txt");
+        ps = new PrintStream(new FileOutputStream(tmpFile5));
+        ps.println("1\t2\t3\t4");
+        ps.println("2\t3\t4\t5");
+        ps.close();
+        
+        tmpFile6 = File.createTempFile("prune", "txt");
+        ps = new PrintStream(new FileOutputStream(tmpFile6));
+        ps.println("\t2\t3");
+        ps.println("2\t3\t4");
+        ps.close();
+
+        tmpFile7 = File.createTempFile("prune", "txt");
+        ps = new PrintStream(new FileOutputStream(tmpFile7));
+        ps.println("1\t1\t1");
+        ps.println("2\t2\t2");
+        ps.close();
+        
+        tmpFile8 = File.createTempFile("prune", "txt");
+        ps = new PrintStream(new FileOutputStream(tmpFile8));
+        ps.println("1\t2\t3\t4");
+        ps.println("2\t5\t2\t3");
+        ps.close();
+        
+        tmpFile9 = File.createTempFile("prune", "txt");
+        ps = new PrintStream(new FileOutputStream(tmpFile9));
+        ps.println("1\t[key1#1,key2#2]\t[key3#8,key4#9]");
+        ps.println("2\t[key1#2,key2#4]\t[key3#8,key4#9]");
+        ps.close();
+
+        tmpFile10 = File.createTempFile("prune", "txt");
+        ps = new PrintStream(new FileOutputStream(tmpFile10));
+        ps.println("1\t[1#1,2#1]\t2");
+        ps.close();
+
+    }
+    
+    public boolean checkLogFileMessage(String[] messages)
+    {
+        BufferedReader reader = null;
+        
+        try {
+            reader = new BufferedReader(new FileReader(logFile));
+            String logMessage="";
+            String line;
+            while ((line=reader.readLine())!=null)
+            {
+                logMessage = logMessage + line + "\n";
+            }
+            for (int i=0;i<messages.length;i++)
+            {
+                if (!logMessage.contains(messages[i]))
+                    return false;
+            }
+            return true;
+        }
+        catch (IOException e) {
+            return false;
+        }
+    }
+    
+    public boolean emptyLogFileMessage()
+    {
+        if (!logFile.exists())
+            return true;
+        BufferedReader reader = null;
+        String line;
+        try {
+            reader = new BufferedReader(new FileReader(logFile));
+            while ((line=reader.readLine())!=null)
+            {
+                if (line!=null && !line.equals(""))
+                    return false;
+            }
+            return true;
+        }
+        catch (IOException e) {
+            return false;
+        }
+    }
+    
+    @Test
+    public void testLoadForEach1() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");       
+        pigServer.registerQuery("B = foreach A generate (int)a1, (int)a2;");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue((Integer)t.get(0) == 2);
+        assertTrue((Integer)t.get(1) == 3);
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue((Integer)t.get(0) == 5);
+        assertTrue((Integer)t.get(1) == 2);
+        
+        assertTrue(checkLogFileMessage(new String[]{"Load fields [1,2]"}));
+    }
+    
+    @Test
+    public void testLoadForEach2() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");       
+        pigServer.registerQuery("B = foreach A generate (int)a0, (int)a2;");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue((Integer)t.get(0) == 1);
+        assertTrue((Integer)t.get(1) == 3);
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue((Integer)t.get(0) == 2);
+        assertTrue((Integer)t.get(1) == 2);
+        
+        assertTrue(checkLogFileMessage(new String[]{"Load fields [0,2]"}));
+    }
+    
+    @Test
+    public void testLoadForEach3() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("B = foreach A generate (int)a0, (int)a1;");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue((Integer)t.get(0) == 1);
+        assertTrue((Integer)t.get(1) == 2);
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue((Integer)t.get(0) == 2);
+        assertTrue((Integer)t.get(1) == 5);
+        
+        assertTrue(checkLogFileMessage(new String[]{"Load fields [0,1]"}));
+    }
+    
+    @Test
+    public void testJoin1() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A2 = foreach A generate (int)a0, (int)a1, (int)a2;");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0, b1);");
+        pigServer.registerQuery("B2 = foreach B generate (int)b0, (int)b1;");
+        pigServer.registerQuery("C = join A2 by a1, B2 by b1;");
+        pigServer.registerQuery("D = foreach C generate a1, a2, b0, b1;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(t.get(0).equals(2));
+        assertTrue(t.get(1).equals(3));
+        assertTrue(t.get(2).equals(2));
+        assertTrue(t.get(3).equals(2));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [1,2]"}));
+    }
+    
+    @Test
+    public void testJoin2() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A2 = foreach A generate (int)a0, (int)a1, (int)a2;");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0, b1);");
+        pigServer.registerQuery("B2 = foreach B generate (int)b0, (int)b1;");
+        pigServer.registerQuery("C = join A2 by a1, B2 by b1;");
+        pigServer.registerQuery("D = foreach C generate a1, a2, b1;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==3);
+        assertTrue(t.get(0).equals(2));
+        assertTrue(t.get(1).equals(3));
+        assertTrue(t.get(2).equals(2));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [1,2]", "Loader for B is pruned. Load fields [1]"}));
+    }
+    
+    @Test
+    public void testForEachFilter() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A2 = foreach A generate (int)a0, (int)a1, (int)a2;");
+        pigServer.registerQuery("B = filter A2 by a2==3;");
+        pigServer.registerQuery("C = foreach B generate a0, a1;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).equals(1));
+        assertTrue(t.get(1).equals(2));
+        
+        assertFalse(iter.hasNext());   
+        assertTrue(emptyLogFileMessage());    
+    }
+    
+    
+    @Test
+    public void testForEach1() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A2 = foreach A generate (int)a0, (int)a1, (int)a2;");
+        pigServer.registerQuery("B = foreach A2 generate a0, a1+a2;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).equals(1));
+        assertTrue(t.get(1).equals(5));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).equals(2));
+        assertTrue(t.get(1).equals(7));
+        
+        assertFalse(iter.hasNext());      
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    @Test
+    public void testForEach2() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("A2 = foreach A generate (int)a0, (int)a1, (int)a2;");
+        pigServer.registerQuery("B = foreach A2 generate a0 as b0, *;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(t.get(0).equals(1));
+        assertTrue(t.get(1).equals(1));
+        assertTrue(t.get(2).equals(2));
+        assertTrue(t.get(3).equals(3));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(t.get(0).equals(2));
+        assertTrue(t.get(1).equals(2));
+        assertTrue(t.get(2).equals(5));
+        assertTrue(t.get(3).equals(2));
+        
+        assertFalse(iter.hasNext());    
+        
+        assertTrue(emptyLogFileMessage());  
+    }
+    
+   /* @Test
+    public void testSplit1() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
+        pigServer.registerQuery("D = foreach B generate $1;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2", 
+                "No map keys pruned for A"}));
+    }
+    
+    @Test
+    public void testSplit2() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
+        pigServer.registerQuery("D = foreach B generate $1;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2", 
+                "No map keys pruned for A"}));
+    }
+    */
+    
+    
+    @Test
+    public void testForeachNoSchema1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
+        pigServer.registerQuery("B = foreach A generate $1, $2;");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(1).toString().equals("3"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("5"));
+        assertTrue(t.get(1).toString().equals("2"));
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    @Test
+    public void testForeachNoSchema2() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
+        pigServer.registerQuery("B = foreach A generate $1, 'aoeuaoeu';");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(1).toString().equals("aoeuaoeu"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("5"));
+        assertTrue(t.get(1).toString().equals("aoeuaoeu"));
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    @Test
+    public void testCoGroup1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A1 = foreach A generate a0,(int)a1,a2;" );
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("B1 = foreach B generate b0,(int)b1;" );
+        pigServer.registerQuery("C = cogroup A1 by $1, B1 by $1;");
+        pigServer.registerQuery("D = foreach C generate AVG($1.$1);");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0)==null);
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2.0"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("5.0"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for B is pruned. Load fields [1]"}));
+    }
+    
+    @Test
+    public void testCoGroup2() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = group A all;");
+        pigServer.registerQuery("C = foreach B generate $1;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("{(1,2,3),(2,5,2)}"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    @Test
+    public void testCoGroup3() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = group A by $1;");
+        pigServer.registerQuery("C = foreach B generate $1, '1';");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("{(1,2,3)}"));
+        assertTrue(t.get(1).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("{(2,5,2)}"));
+        assertTrue(t.get(1).toString().equals("1"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    @Test
+    public void testCoGroup4() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("A1 = foreach A generate a0,(int)a1,a2;");
+        pigServer.registerQuery("B1 = foreach B generate b0,(int)b1;");
+        pigServer.registerQuery("C = cogroup A1 by ($1), B1 by ($1);");
+        pigServer.registerQuery("D = foreach C generate $1.$1, $2.$1;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("{}"));
+        assertTrue(t.get(1).toString().equals("{(1)}"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("{(2)}"));
+        assertTrue(t.get(1).toString().equals("{(2)}"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("{(5)}"));
+        assertTrue(t.get(1).toString().equals("{}"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    @Test
+    public void testCoGroup5() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = group A by (a0, a1);");
+        pigServer.registerQuery("C = foreach B generate flatten(group);");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("2"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(1).toString().equals("5"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [0,1]"}));
+    }
+    
+  /*  @Test
+    public void testDistinct1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = distinct A;");
+        pigServer.registerQuery("C = foreach B generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    @Test
+    public void testStream1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = stream A through `" + simpleEchoStreamingCommand + "`;");
+        pigServer.registerQuery("C = foreach B generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    */
+    
+    @Test
+    public void testBinCond1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
+        pigServer.registerQuery("B = foreach A generate ($1 == '2'? $2 : $3);");
+        pigServer.registerQuery("C = foreach B generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("3"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("5"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [1,2,3]"}));
+    }
+    
+    
+    @Test
+    public void testCoGroup6() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
+        pigServer.registerQuery("D = foreach C generate A, flatten(B.($0, $1));");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==3);
+        assertTrue(t.get(0).toString().equals("{}"));
+        assertTrue(t.get(1).toString().equals("1"));
+        assertTrue(t.get(2).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==3);
+        assertTrue(t.get(0).toString().equals("{(1,2,3)}"));
+        assertTrue(t.get(1).toString().equals("2"));
+        assertTrue(t.get(2).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    /*
+    @Test
+    public void testCoGroup7() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
+        pigServer.registerQuery("D = foreach C {B = order B by $0;generate FLATTEN(A), B.($1);};");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==4);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("2"));
+        assertTrue(t.get(2).toString().equals("3"));
+        assertTrue(t.get(3).toString().equals("{(2)}"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==4);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(1).toString().equals("5"));
+        assertTrue(t.get(2).toString().equals("2"));
+        assertTrue(t.get(3).toString().equals("{}"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    @Test
+    public void testCross1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("C = cross A, B;");
+        pigServer.registerQuery("D = foreach C generate $0, $3;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,1)");
+        results.add("(2,1)");
+        results.add("(1,2)");
+        results.add("(2,2)");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2", 
+            "No map keys pruned for A", "Columns pruned for B: $1", "No map keys pruned for B"}));
+    }
+    
+    @Test
+    public void testUnion1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+        pigServer.registerQuery("C = union A, B;");
+        pigServer.registerQuery("D = foreach C generate $0, $2;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,3)");
+        results.add("(2,2)");
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        results.contains(t.toString());
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        results.contains(t.toString());
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        results.contains(t.toString());
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        results.contains(t.toString());
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1", 
+            "No map keys pruned for A", "Columns pruned for B: $1", "No map keys pruned for B"}));
+    }
+    */
+    
+    @Test
+    public void testFRJoin1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("C = join A by $0, B by $0 using \"replicated\";");
+        pigServer.registerQuery("D = foreach C generate $0, $3;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(1).toString().equals("2"));
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [0]", "Loader for B is pruned. Load fields [0]"}));
+    }
+    
+    /*
+    @Test
+    public void testFilter1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = order A by a1;");
+        pigServer.registerQuery("C = limit B 10;");
+        pigServer.registerQuery("D = foreach C generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2", 
+                "No map keys pruned for A"}));
+    }
+    */
+    
+    @Test
+    public void testFilter2() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = filter A by a0+a2 == 4;");
+        pigServer.registerQuery("C = foreach B generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [0,2]"}));
+    }
+    /*
+    @Test
+    public void testOrderBy1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = order A by $0;");
+        pigServer.registerQuery("C = foreach B generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2", 
+            "No map keys pruned for A"}));
+    }
+    
+    @Test
+    public void testOrderBy2() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = order A by *;");
+        pigServer.registerQuery("C = foreach B generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"No column pruned for A", 
+            "No map keys pruned for A"}));
+    }
+    */
+    
+    @Test
+    public void testCogroup8() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = group A by *;");
+        pigServer.registerQuery("C = foreach B generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("(1,2,3)"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("(2,5,2)"));
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());    
+    }
+        
+    @Test
+    public void testJoin3() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+        pigServer.registerQuery("C = join A by *, B by * using \"replicated\";");
+        pigServer.registerQuery("D = foreach C generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    
+    @Test
+    public void testLoadForEach4() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = foreach A generate *;");
+        pigServer.registerQuery("C = foreach B generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [0]"}));
+    }
+    
+    @Test
+    public void testForEachUDF() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A2 = foreach A generate (chararray)a0, (chararray)a1, (chararray)a2;");
+        pigServer.registerQuery("B = foreach A2 generate StringSize(*);");
+        pigServer.registerQuery("C = foreach B generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }
+    
+    @Test
+    public void testOutJoin1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile6.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("C = join A by $0 left, B by $0;");
+        pigServer.registerQuery("D = foreach C generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        Collection<String> results = new HashSet<String>();
+        results.add("(2)");
+        results.add("()");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [0]", "Loader for B is pruned. Load fields [0]"}));
+    }
+    
+    @Test
+    public void testFilter3() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = filter A by " + MyFilterFunc.class.getName() + "(*) ;");
+        pigServer.registerQuery("C = foreach B generate $0;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }
+
+    @Test
+    public void testMapKey1() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0, a1);");
+        pigServer.registerQuery("A1 = foreach A generate (int)a0,(map[])a1;");
+        File newTmpFile = File.createTempFile("pruneBinStorage", ".txt");
+        // Delete this file so that pig can write another file
+        newTmpFile.delete();
+        pigServer.store("A1", Util.generateURI(newTmpFile.toString(), pigServer.getPigContext()), 
+                "BinStorage()");
+        pigServer.registerQuery("A2 = load '"+ Util.generateURI(newTmpFile.toString(), pigServer.getPigContext()) + 
+                "' using BinStorage() as (a0:int, a1:map[]);");
+        pigServer.registerQuery("B = foreach A2 generate a0, a1#'key1';");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).equals(1));
+        assertTrue(t.get(1).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).equals(2));
+        assertTrue(t.get(1).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A2 is pruned. Load fields [0,1]",
+                "Map key required for A2: $1->[key1]"}));
+    }
+    
+    @Test
+    public void testMapKey2() throws Exception{
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0, a1);");
+        pigServer.registerQuery("A1 = foreach A generate (int)a0,(map[])a1;");
+        File newTmpFile = File.createTempFile("pruneBinStorage", ".txt");
+        // Delete this file so that pig can write another file
+        newTmpFile.delete();
+        pigServer.store("A1", Util.generateURI(newTmpFile.toString(), pigServer.getPigContext()), 
+        "BinStorage()");
+        pigServer.registerQuery("A2 = load '"+ Util.generateURI(newTmpFile.toString(), pigServer.getPigContext()) + 
+        "' using BinStorage() as (a0:int, a1:map[]);");
+        pigServer.registerQuery("B = foreach A2 generate a1, a1#'key1';");
+        pigServer.registerQuery("C = foreach B generate $0#'key2', $1;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(1).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("4"));
+        assertTrue(t.get(1).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A2 is pruned. Load fields [1]",
+                "Map key required for A2: $1->[key2, key1]"}));
+ 
+    }
+    
+    @Test
+    public void testMapKey3() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0, a1);");
+        pigServer.registerQuery("A1 = foreach A generate (int)a0,(map[])a1;");
+        File newTmpFile = File.createTempFile("pruneBinStorage", ".txt");
+        // Delete this file so that pig can write another file
+        newTmpFile.delete();
+        pigServer.store("A1", Util.generateURI(newTmpFile.toString(), pigServer.getPigContext()), 
+        "BinStorage()");
+        pigServer.registerQuery("A2 = load '"+ Util.generateURI(newTmpFile.toString(), pigServer.getPigContext()) + 
+        "' using BinStorage() as (a0:int, a1:map[]);");
+        pigServer.registerQuery("B = foreach A2 generate a1, a1#'key1';");
+        pigServer.registerQuery("C = group B all;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("all"));
+        assertTrue(t.get(1).toString().equals("{([key2#2,key1#1],1),([key2#4,key1#2],2)}"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A2 is pruned. Load fields [1]"}));
+    }
+    
+    /*
+    @Test
+    public void testMapKey4() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+        pigServer.registerQuery("B = limit A 10;");
+        pigServer.registerQuery("C = foreach B generate $0, $1#'key1';");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(1).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"No column pruned for A", 
+                "Map key required for A: $1->[key1]"}));
+    }
+    
+    @Test
+    public void testMapKey5() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
+        pigServer.registerQuery("B = foreach A generate $0, $1#'key1';");
+        pigServer.registerQuery("C = stream B through `" + simpleEchoStreamingCommand + "`;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(1).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"No column pruned for A", 
+                "Map key required for A: $1->[key1]"}));
+    }
+    */
+    
+    @Test
+    public void testConstantPlan() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("B = foreach A generate 1, a2;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("3"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [2]"}));
+    }
+    
+    /*
+    @Test
+    public void testPlainPlan() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("B = order A by $0;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==3);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("2"));
+        assertTrue(t.get(2).toString().equals("3"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==3);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(1).toString().equals("5"));
+        assertTrue(t.get(2).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(emptyLogFileMessage());
+    }*/
+    
+    @Test
+    public void testBinStorage1() throws Exception {
+        // get a temp intermediate filename
+        File intermediateFile = File.createTempFile("intemediate", "txt");
+        intermediateFile.delete(); // delete since we don't want the file to be present
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.store("A", intermediateFile.toString(), "BinStorage()");
+        
+        pigServer.registerQuery("A = load '"+ intermediateFile.toString() 
+                + "' using BinStorage() as (a0, a1, a2);");
+        
+        pigServer.registerQuery("B = foreach A generate a0;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [0]"}));
+        
+        intermediateFile.delete();
+    }
+    
+    @Test
+    public void testBinStorage2() throws Exception {
+        // get a temp intermediate filename
+        File intermediateFile = File.createTempFile("intemediate", "txt");
+        intermediateFile.delete(); // delete since we don't want the file to be present
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.store("A", intermediateFile.toString(), "BinStorage()");
+        
+        pigServer.registerQuery("A = load '"+ intermediateFile.toString() 
+                + "' using BinStorage() as (a0, a1, a2);");
+        
+        pigServer.registerQuery("B = foreach A generate a2, a0, a1;");
+        pigServer.registerQuery("C = foreach B generate a0, a2;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("3"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [0,2]"}));
+        
+        intermediateFile.delete();
+    }
+
+    
+    @Test
+    public void testProjectCastKeyLookup() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) 
+                + "' as (a0, a1);");
+        
+        pigServer.registerQuery("B = foreach A generate a1#'key1';");        
+        
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [1]"
+                }));
+        // "Map key required for A: $1->[key1]"
+    }
+    
+    @Test
+    public void testRelayFlattenMap() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) 
+                + "' as (a0, a1);");
+        pigServer.registerQuery("A1 = foreach A generate a0,(map[])a1;");
+        File tmpBinFile = File.createTempFile("pruneBinStorage", ".txt");
+        tmpBinFile.delete();
+        pigServer.store("A1", Util.generateURI(tmpBinFile.toString(), pigServer.getPigContext()),"BinStorage()");
+        pigServer.registerQuery("A2 = load '"+ Util.generateURI(tmpBinFile.toString(), pigServer.getPigContext()) 
+                + "' using BinStorage() as (a0, a1:map[]);");
+        pigServer.registerQuery("B = foreach A2 generate flatten(a1);");
+        pigServer.registerQuery("C = foreach B generate a1#'key1';");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("1"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A2 is pruned. Load fields [1]",
+                "Map key required for A2: $1->[key1]"}));
+    }
+    /*
+    @Test
+    public void testCrossAtLeastOneColumnOneInput() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
+        pigServer.registerQuery("C = cross A, B;");
+        pigServer.registerQuery("D = foreach C generate $0;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        Collection<String> results = new HashSet<String>();
+        results.add("(1)");
+        results.add("(2)");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2", 
+                "No map keys pruned for A", "Columns pruned for B: $1", 
+                "No map keys pruned for B"}));
+    }
+    
+    
+    @Test
+    public void testComplex1() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile7.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile8.toString(), pigServer.getPigContext()) + "' as (b0, b1, b2, b3);");
+        pigServer.registerQuery("B1 = foreach B generate b2, b0+b3;");
+        pigServer.registerQuery("C = join A by $0, B1 by $0;");
+        pigServer.registerQuery("D = order C by $4;");
+        pigServer.registerQuery("E = foreach D generate $0, $2;");
+        pigServer.registerQuery("F = filter E by $1<10;");
+        pigServer.registerQuery("G = group F by $0;");
+        pigServer.registerQuery("H = foreach G generate $1;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("H");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue(t.get(0).toString().equals("{(2,2)}"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1", 
+                "No map keys pruned for A", "Columns pruned for B: $1", 
+                "No map keys pruned for B"}));
+    }
+    */
+    
+    @Test
+    public void testCoGroup8() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A1 = foreach A generate a0,(int)a1,a2;");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("B1 = foreach B generate b0,(int)b1;");
+        pigServer.registerQuery("C = cogroup A1 by ($1), B1 by ($1);");
+        pigServer.registerQuery("D = foreach C generate $0, $1;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("{}"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(1).toString().equals("{(1,2,3)}"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("5"));
+        assertTrue(t.get(1).toString().equals("{(2,5,2)}"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for B is pruned. Load fields [1]"}));
+    }
+    
+    // See PIG-1128
+    @Test
+    public void testUserDefinedSchema() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);");
+        pigServer.registerQuery("A1 = foreach A generate (chararray)c1, (int)c2;");
+        pigServer.registerQuery("B = foreach A1 generate c1 as c1 : chararray, c2 as c2 : int, 'CA' as state : chararray;");
+        pigServer.registerQuery("C = foreach B generate c1 as c1 : chararray;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(1)"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(2)"));
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [0]"}));
+    }
+
+    /*
+    // See PIG-1127
+    @Test
+    public void testSharedSchemaObject() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile10.toString(), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);");
+        pigServer.registerQuery("B = foreach A generate a1;");
+        pigServer.registerQuery("C = limit B 10;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("([2#1,1#1])"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $2", 
+                "No map keys pruned for A"}));
+    }
+    */
+    
+    // See PIG-1142
+    @Test
+    public void testJoin4() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+        pigServer.registerQuery("C = join A by a2, B by b2;");
+        pigServer.registerQuery("D = foreach C generate $0,  $1,  $2;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,2,3)");
+        results.add("(2,5,2)");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(results.contains(t.toString()));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for B is pruned. Load fields [2]"}));
+    }
+    
+    @Test
+    public void testFilter4() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("A1 = foreach A generate a0,a1,(int)a2;");
+        pigServer.registerQuery("B = filter A1 by a2==3;");
+        pigServer.registerQuery("C = foreach B generate $2;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(3)"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [2]"}));
+    }
+    /*
+    @Test
+    public void testSplit3() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
+        pigServer.registerQuery("split A into B if a2==3, C if a2<3;");
+        pigServer.registerQuery("C = foreach B generate $2;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(3)"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $1", 
+                "No map keys pruned for A"}));
+    }
+    
+    @Test
+    public void testOrderBy3() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = order A by a2;");
+        pigServer.registerQuery("C = foreach B generate a2;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.toString().equals("(2)"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==1);
+        assertTrue(t.toString().equals("(3)"));
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $1", 
+            "No map keys pruned for A"}));
+    }
+    */
+    @Test
+    public void testCogroup9() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
+        pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (c0, c1, c2);");
+        pigServer.registerQuery("D = cogroup A by a2, B by b2, C by c2;");
+        pigServer.registerQuery("E = foreach D generate $1, $2;");
+        Iterator<Tuple> iter = pigServer.openIterator("E");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.toString().equals("({(2,5,2)},{(2,5,2)})"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.size()==2);
+        assertTrue(t.toString().equals("({(1,2,3)},{(1,2,3)})"));
+
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Loader for C is pruned. Load fields [2]"}));
+    }
+    /*
+
+    // See PIG-1165
+    @Test
+    public void testOrderbyWrongSignature() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("C = order A by a1;");
+        pigServer.registerQuery("D = join C by a1, B by b0;");
+        pigServer.registerQuery("E = foreach D generate a1, b0, b1;");
+        Iterator<Tuple> iter = pigServer.openIterator("E");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==3);
+        assertTrue(t.toString().equals("(2,2,2)"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $2", 
+            "No map keys pruned for A", "No column pruned for B", "No map keys pruned for B"}));
+    }
+    
+    // See PIG-1146
+    @Test
+    public void testUnionMixedPruning() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b2);");
+        pigServer.registerQuery("C = foreach B generate b0, 'hello', b2;");
+        pigServer.registerQuery("D = union A, C;");
+        pigServer.registerQuery("E = foreach D generate $0, $2;");
+        Iterator<Tuple> iter = pigServer.openIterator("E");
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,3)");
+        results.add("(2,2)");
+        results.add("(1,1)");
+        results.add("(2,2)");
+
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertFalse(iter.hasNext());
+
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
+            "No map keys pruned for A", "No column pruned for B",
+            "No map keys pruned for B"}));
+    }
+    
+    // See PIG-1176
+    @Test
+    public void testUnionMixedSchemaPruning() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = foreach A generate a0;;");
+        pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "';");
+        pigServer.registerQuery("D = foreach C generate $0;");
+        pigServer.registerQuery("E = union B, D;");
+        Iterator<Tuple> iter = pigServer.openIterator("E");
+        Collection<String> results = new HashSet<String>();
+        results.add("(1)");
+        results.add("(2)");
+        results.add("(1)");
+        results.add("(2)");
+
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertFalse(iter.hasNext());
+
+        assertTrue(emptyLogFileMessage());
+    }
+    */
+    // See PIG-1184
+    @Test
+    public void testForEachFlatten() throws Exception {
+        File inputFile = Util.createInputFile("table_testForEachFlatten", "", new String[]{"oiue\tM\t3", "oiue\tM\t4"});
+        pigServer.registerQuery("M = load '"+inputFile.toString()+"' as (a0, a1, a2);");
+        pigServer.registerQuery("N = group M by (a0, a1);");
+        pigServer.registerQuery("S = foreach N generate FLATTEN(group), M;");
+        long time = System.currentTimeMillis();
+        pigServer.store("S", "tmp"+time, BinStorage.class.getName() + "()");
+        
+        pigServer.registerQuery("A = load 'tmp"+time+"' using BinStorage() as (a0:chararray, a1:chararray, a2:bag{t:tuple(id:chararray, v:chararray, v2:chararray)});");             
+        pigServer.registerQuery("B = foreach A generate a0, a1, flatten(a2), 10;");
+        pigServer.registerQuery("C = foreach B generate a0, $5;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(oiue,10)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(oiue,10)"));
+      
+        assertFalse(iter.hasNext());
+
+        assertTrue(checkLogFileMessage(new String[]{"Loader for A is pruned. Load fields [0,2]"}));
+    }
+    
+    
+    // See PIG-1210
+    @Test
+    public void testFieldsToReadDuplicatedEntry() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' using "+PigStorageWithTrace.class.getName()
+                +" AS (a0, a1, a2);");
+        pigServer.registerQuery("B = foreach A generate a0+a0, a1, a2;");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(2.0,2,3)"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(4.0,5,2)"));
+
+        assertFalse(iter.hasNext());
+
+        assertTrue(emptyLogFileMessage());
+    }
+
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Mon Mar 15 03:28:27 2010
@@ -21,6 +21,7 @@ import static java.util.regex.Matcher.qu
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileReader;
@@ -29,6 +30,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
+import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -58,6 +60,7 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.experimental.logical.optimizer.PlanPrinter;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -506,4 +509,28 @@ public class Util {
                 pigContext.getProperties());
         createInputFile(FileSystem.get(conf), fileName, input); 
     }
+    
+    public static void printPlan(org.apache.pig.experimental.logical.relational.LogicalPlan logicalPlan ) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(out);
+        PlanPrinter pp = new PlanPrinter(logicalPlan,ps);
+        pp.visit();
+        System.err.println(out.toString());
+    }
+
+    public static void printPlan(LogicalPlan logicalPlan) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(out);
+        logicalPlan.explain(ps, "text", true);
+        System.err.println(out.toString());
+    }
+
+    public static void printPlan(PhysicalPlan physicalPlan) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(out);
+        physicalPlan.explain(ps, "text", true);
+        System.err.println(out.toString());
+    }
+
+
 }