You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/03 18:58:21 UTC

svn commit: r1099123 [3/16] - in /pig/branches/branch-0.9: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/ src/org/apache/pig/impl/logi...

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue May  3 16:58:19 2011
@@ -18,22 +18,15 @@
 package org.apache.pig.test;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.net.URL;
 import java.util.List;
-import java.util.ArrayList;
-import java.util.Set;
+import java.util.Properties;
 
+import junit.framework.Assert;
 import junit.framework.AssertionFailedError;
 
+import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -42,179 +35,181 @@ import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.ExecType;
 import org.apache.pig.impl.builtin.GFAny;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanValidationException;
-import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.validators.SchemaAliasValidator;
-import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
 import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.parser.ParseException ;
-import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.test.utils.Identity;
-import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.PigException;
-
-
-public class TestLogicalPlanBuilder extends junit.framework.TestCase {
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+
+public class TestLogicalPlanBuilder {
+    PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+    private PigServer pigServer = null;
+    
+    @Before
+    public void setUp() throws Exception {
+    	pigServer = new PigServer( pigContext );
+    	pigContext.connect();
+    }
 
-    MiniCluster cluster = MiniCluster.buildCluster();
-    private final Log log = LogFactory.getLog(getClass());
-    
     @Test
-    public void testQuery1() {
+    public void testQuery1() throws Exception {
         String query = "foreach (load 'a') generate $1,$2;";
         buildPlan(query);
     }
 
     @Test
-    public void testQuery2() {
+    public void testQuery2() throws Exception {
         String query = "foreach (load 'a' using " + PigStorage.class.getName() + "(':')) generate $1, 'aoeuaoeu' ;";
         buildPlan(query);
     }
 
     // TODO FIX Query3 and Query4
     @Test
-    public void testQuery3() {
+    public void testQuery3() throws Exception {
         String query = "foreach (cogroup (load 'a') by $1, (load 'b') by $1) generate org.apache.pig.builtin.AVG($1) ;";
         buildPlan(query);
     }
 
     @Test
-    public void testQuery4() {
+    public void testQuery4() throws Exception {
         String query = "foreach (load 'a') generate AVG($1, $2) ;";
         buildPlan(query);
     }
 
     @Test
-    public void testQuery5() {
+    public void testQuery5() throws Exception {
         String query = "foreach (group (load 'a') ALL) generate $1 ;";
         buildPlan(query);
     }
 
     
     @Test
-    public void testQuery6() {
+    public void testQuery6() throws Exception {
         String query = "foreach (group (load 'a') by $1) generate group, '1' ;";
         buildPlan(query);
     }
 
     
     @Test
-    public void testQuery7() {
+    public void testQuery7() throws Exception {
         String query = "foreach (load 'a' using " + PigStorage.class.getName() + "()) generate $1 ;";
         buildPlan(query);
     }
 
     
     @Test
-    public void testQuery10() {
+    public void testQuery10() throws Exception {
         String query = "foreach (cogroup (load 'a') by ($1), (load 'b') by ($1)) generate $1.$1, $2.$1 ;";
         buildPlan(query);
     }
 
     // TODO FIX Query11 and Query12
     @Test
-    public void testQuery11() {
+    public void testQuery11() throws Exception {
         String query = " foreach (group (load 'a') by $1, (load 'b') by $2) generate group, AVG($1) ;";
         buildPlan(query);
     }
     
     @Test
-    public void testQuery12() {
+    public void testQuery12() throws Exception {
         String query = "foreach (load 'a' using " + PigStorage.class.getName() + "()) generate AVG($1) ;";
         buildPlan(query);
     }
 
     @Test
-    public void testQuery13() {
+    public void testQuery13() throws Exception {
         String query = "foreach (cogroup (load 'a') ALL) generate group ;";
         buildPlan(query);
     }
 
     @Test
-    public void testQuery14() {
+    public void testQuery14() throws Exception {
         String query = "foreach (group (load 'a') by ($6, $7)) generate flatten(group) ;";
         buildPlan(query);
     }
 
     @Test
-    public void testQuery15() {
+    public void testQuery15() throws Exception {
         String query = " foreach (load 'a') generate $1, 'hello', $3 ;";
         buildPlan(query);
     }
     
     @Test
-    public void testQuery100() {
+    public void testQuery100() throws Exception {
         // test define syntax
         String query = "define FUNC ARITY();";
-        LogicalOperator lo = buildPlan(query).getRoots().get(0);
-        assertTrue(lo instanceof LODefine);
+        buildPlan(query);
     }
 
 
 
     @Test
-    public void testQueryFail1() {
+    public void testQueryFail1() throws Exception {
         String query = " foreach (group (A = load 'a') by $1) generate A.'1' ;";
         try {
             buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            return;
         }
+        Assert.fail("Test case should fail" );
     }
 
     @Test
-    public void testQueryFail2() {
+    public void testQueryFail2() throws Exception {
         String query = "foreach group (load 'a') by $1 generate $1.* ;";
         try {
             buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+        	return;
         }
+        Assert.fail("Test case should fail" );
     }
     
 
     @Test
-    public void testQueryFail3() {
-        String query = "generate DISTINCT foreach (load 'a');";
+    public void testQueryFail3() throws Exception {
+        String query = "A = generate DISTINCT foreach (load 'a');";
         try {
-            buildPlan(query);
+            LogicalPlan lp = buildPlan(query);
+            System.out.println( lp.toString() );
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+        	return;
         }
+        Assert.fail("Test case should fail" );
     }
     
     @Test
-    public void testQueryFail4() {
-        String query = "generate [ORDER BY $0][$3, $4] foreach (load 'a');";
+    public void testQueryFail4() throws Exception {
+        String query = "A = generate [ORDER BY $0][$3, $4] foreach (load 'a');";
         try {
             buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+        	return;
         }
+        Assert.fail("Test case should fail" );
     }
 
     @Test
-    public void testQueryFail5() {
-        String query = "generate " + TestApplyFunc.class.getName() + "($2.*) foreach (load 'a');";
+    public void testQueryFail5() throws Exception {
+        String query = "A = generate " + TestApplyFunc.class.getName() + "($2.*) foreach (load 'a');";
         try {
             buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+        	return;
         }
+        Assert.fail("Test case should fail" );
     }
 
     
@@ -225,7 +220,7 @@ public class TestLogicalPlanBuilder exte
  
     // TODO FIX Query17
     @Test
-    public void testQuery17() {
+    public void testQuery17() throws Exception {
         String query =  "foreach (load 'A')" + "generate " + TestApplyFunc.class.getName() + "($1);";
         buildPlan(query);
     }
@@ -244,146 +239,144 @@ public class TestLogicalPlanBuilder exte
      * Validate that parallel is parsed correctly Bug 831714 - fixed
      */
     
-    @Test
-    public void testQuery18() {
-        String query = "FOREACH (group (load 'a') ALL PARALLEL 16) generate group;";
-        LogicalPlan lp = buildPlan(query);
-        LogicalOperator root = lp.getRoots().get(0);   
-        
-        List<LogicalOperator> listOp = lp.getSuccessors(root);
-        
-        LogicalOperator lo = listOp.get(0);
-        
-        if (lo instanceof LOCogroup) {
-            assertTrue(((LOCogroup) lo).getRequestedParallelism() == 16);
-        } else {
-            fail("Error: Unexpected Parse Tree output");
-        }  
-    }
-    
-    
-    
-    
-    @Test
-    public void testQuery19() {
-        buildPlan("a = load 'a';");
-        buildPlan("b = filter a by $1 == '3';");
+//    @Test Waiting for PIG-1996
+//    public void testQuery18() throws Exception {
+//        String query = "FOREACH (group (load 'a') ALL PARALLEL 16) generate group;";
+//        LogicalPlan lp = buildPlan(query);
+//        Operator root = lp.getSources().get(0);   
+//        
+//        List<Operator> listOp = lp.getSuccessors(root);
+//        
+//        Operator lo = listOp.get(0);
+//        
+//        if (lo instanceof LOCogroup) {
+//            Assert.assertTrue(((LOCogroup) lo).getRequestedParallelisam() == 16);
+//        } else {
+//            Assert.fail("Error: Unexpected Parse Tree output");
+//        }  
+//    }
+    
+    @Test
+    public void testQuery19() throws Exception {
+        String query = "a = load 'a';" +
+                       "b = filter a by $1 == '3';";
+        buildPlan( query );
     }
     
     
     @Test
-    public void testQuery20() {
+    public void testQuery20() throws Exception {
         String query = "foreach (load 'a') generate ($1 == '3'? $2 : $3) ;";
         buildPlan(query);
     }
     
     @Test
-    public void testQuery21() {
-        buildPlan("A = load 'a';");
-        buildPlan("B = load 'b';");
-        buildPlan("foreach (cogroup A by ($1), B by ($1)) generate A, flatten(B.($1, $2, $3));");
+    public void testQuery21() throws Exception {
+        String query = "A = load 'a';" +
+                       "B = load 'b';" +
+                       "foreach (cogroup A by ($1), B by ($1)) generate A, flatten(B.($1, $2, $3));";
+        buildPlan( query );
     }
     
     @Test
-    public void testQuery22() {
-        buildPlan("A = load 'a';");
-        buildPlan("B = load 'b';");
-        buildPlan("C = cogroup A by ($1), B by ($1);");
-        String query = "foreach C { " +
-                "B = order B by $0; " +
-                "generate FLATTEN(A), B.($1, $2, $3) ;" +
-                "};" ;
+    public void testQuery22() throws Exception {
+        String query = "A = load 'a';" +
+                       "B = load 'b';" +
+                       "C = cogroup A by ($1), B by ($1);" +
+                       "foreach C { " +
+                       "B = order B by $0; " +
+                       "generate FLATTEN(A), B.($1, $2, $3) ;" + "};" ;
         buildPlan(query);
     }
     
     @Test
-    public void testQuery22Fail() {
-        buildPlan("A = load 'a' as (a:int, b: double);");
+    public void testQuery22Fail() throws Exception {
+        String query = "A = load 'a' as (a:int, b: double);" +
+                       "B = group A by (*, $0);";
         try {
-            buildPlan("B = group A by (*, $0);");
+            buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
+            Assert.assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
         }
     }
     
     @Test
-    public void testQuery23() {
-        buildPlan("A = load 'a';");
-        buildPlan("B = load 'b';");
+    public void testQuery23() throws Exception {
+        String query = "A = load 'a';" + 
+                       "B = load 'b';" +
         
-        buildPlan("C = cogroup A by ($1), B by ($1);");
+                       "C = cogroup A by ($1), B by ($1);" +
         
-        String query = "foreach C { " +
-        "A = Distinct A; " +
-        "B = FILTER A BY $1 < 'z'; " +
+                       "foreach C { " +
+                       "A = Distinct A; " +
+                       "B = FILTER A BY $1 < 'z'; " +
         //TODO
         //A sequence of filters within a foreach translates to
         //a split statement. Currently it breaks as adding an
         //additional output to the filter fails as filter supports
         //single output
-        "C = FILTER A BY $2 == $3;" +
-        "B = ARRANGE B BY $1;" +
-        "GENERATE A, FLATTEN(B.$0);" +
-        "};";
+                       "C = FILTER A BY $2 == $3;" +
+                       "B = ORDER B BY $1;" +
+                       "GENERATE A, FLATTEN(B.$0);" +
+                       "};";
         buildPlan(query);
     }
 
     @Test
-    public void testQuery23Fail() {
-        buildPlan("A = load 'a' as (a: int, b:double);");
-        buildPlan("B = load 'b';");
+    public void testQuery23Fail() throws Exception {
+        String query = "A = load 'a' as (a: int, b:double);" +
+                       "B = load 'b';" +
+                       "C = cogroup A by (*, $0), B by ($0, $1);";
         boolean exceptionThrown = false;
         try {
-            buildPlan("C = cogroup A by (*, $0), B by ($0, $1);");
+            buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+            Assert.assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
                         "do not match"));
             exceptionThrown = true;
         }
-        assertTrue(exceptionThrown);
+        Assert.assertTrue(exceptionThrown);
     }
 
     @Test
-    public void testQuery23Fail2() {
-        buildPlan("A = load 'a';");
-        buildPlan("B = load 'b';");
+    public void testQuery23Fail2() throws Exception {
+        String query = "A = load 'a';" +
+                       "B = load 'b';" +
+                       "C = cogroup A by (*, $0), B by ($0, $1);";
         boolean exceptionThrown = false;
         try {
-            buildPlan("C = cogroup A by (*, $0), B by ($0, $1);");
+            buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
-            "the input has a schema"));
             exceptionThrown = true;
         }
-        assertTrue(exceptionThrown);
+        Assert.assertTrue(exceptionThrown);
     }
     
     @Test
-    public void testQuery23Fail3() {
-        buildPlan("A = load 'a' as (a: int, b:double);");
-        buildPlan("B = load 'b' as (a:int);");
+    public void testQuery23Fail3() throws Exception {
+        String query = "A = load 'a' as (a: int, b:double);" +
+                       "B = load 'b' as (a:int);" +
+                       "C = cogroup A by *, B by *;";
         boolean exceptionThrown = false;
         try {
-            buildPlan("C = cogroup A by *, B by *;");
+            buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+            Assert.assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
                         "do not match"));
             exceptionThrown = true;
         }
-        assertTrue(exceptionThrown);
+        Assert.assertTrue(exceptionThrown);
     }
 
     
     @Test
-    public void testQuery24() {
-        buildPlan("a = load 'a';");
-        
-        String query = "foreach a generate (($0 == $1) ? 'a' : $2), $4 ;";
+    public void testQuery24() throws Exception {
+        String query = "a = load 'a';" + "foreach a generate (($0 == $1) ? 'a' : $2), $4 ;";
         buildPlan(query);
     }
 
     @Test
-    public void testQuery25() {
+    public void testQuery25() throws Exception {
         String query = "foreach (load 'a') {" +
                 "B = FILTER $0 BY (($1 == $2) AND ('a' < 'b'));" +
                 "generate B;" +
@@ -393,14 +386,14 @@ public class TestLogicalPlanBuilder exte
     
     
     @Test
-    public void testQuery26() {
+    public void testQuery26() throws Exception {
         String query = "foreach (load 'a') generate  ((NOT (($1 == $2) OR ('a' < 'b'))) ? 'a' : $2), 'x' ;";
         buildPlan(query);
     }
     
     // TODO FIX Query27 and Query28
     @Test
-    public void testQuery27() {
+    public void testQuery27() throws Exception {
         String query =  "foreach (load 'a'){" +
                 "A = DISTINCT $3.$1;" +
                 " generate " + TestApplyFunc.class.getName() + "($2, $1.($1, $4));" +
@@ -409,20 +402,20 @@ public class TestLogicalPlanBuilder exte
     }
     
     @Test
-    public void testQuery28() {
+    public void testQuery28() throws Exception {
         String query = "foreach (load 'a') generate " + TestApplyFunc.class.getName() + "($2, " + TestApplyFunc.class.getName() + "($2.$3));";
         buildPlan(query);
     }
     
     @Test
-    public void testQuery29() {
+    public void testQuery29() throws Exception {
         String query = "load 'myfile' using " + TestStorageFunc.class.getName() + "() as (col1);";
         buildPlan(query);
     }
 
 
     @Test
-    public void testQuery30() {
+    public void testQuery30() throws Exception {
         String query = "load 'myfile' using " + TestStorageFunc.class.getName() + "() as (col1, col2);";
         buildPlan(query);
     }
@@ -464,63 +457,64 @@ public class TestLogicalPlanBuilder exte
     
     
     @Test
-    public void testQuery31() {
+    public void testQuery31() throws Exception {
         String query = "load 'myfile' as (col1, col2);";
         buildPlan(query);
     }
     
     @Test
-    public void testQuery32() {
+    public void testQuery32() throws Exception {
         String query = "foreach (load 'myfile' as (col1, col2 : tuple(sub1, sub2), col3 : tuple(bag1))) generate col1 ;";
         buildPlan(query);
     }
     
     @Test
-    public void testQuery33() {
-        buildPlan("A = load 'a' as (aCol1, aCol2);");
-        buildPlan("B = load 'b' as (bCol1, bCol2);");
-        buildPlan("C = cogroup A by (aCol1), B by bCol1;");
-        String query = "foreach C generate group, A.aCol1;";
+    public void testQuery33() throws Exception {
+        String query = "A = load 'a' as (aCol1, aCol2);" +
+                       "B = load 'b' as (bCol1, bCol2);" +
+                       "C = cogroup A by (aCol1), B by bCol1;" +
+                       "foreach C generate group, A.aCol1;";
         buildPlan(query);
     }
     
     
     @Test
     //TODO: Nested schemas don't work now. Probably a bug in the new parser.
-    public void testQuery34() {
-        buildPlan("A = load 'a' as (aCol1, aCol2 : tuple(subCol1, subCol2));");
-        buildPlan("A = filter A by aCol2 == '1';");
-        buildPlan("B = load 'b' as (bCol1, bCol2);");
-        String query = "foreach (cogroup A by (aCol1), B by bCol1 ) generate A.aCol2, B.bCol2 ;";
+    public void testQuery34() throws Exception {
+        String query = "A = load 'a' as (aCol1, aCol2 : tuple(subCol1, subCol2));" +
+        "A = filter A by aCol2 == '1';" +
+        "B = load 'b' as (bCol1, bCol2);" +
+        "foreach (cogroup A by (aCol1), B by bCol1 ) generate A.aCol2, B.bCol2 ;";
         buildPlan(query);
     }
     
     
     
     @Test
-    public void testQuery35() {
+    public void testQuery35() throws Exception {
         String query = "foreach (load 'a' as (col1, col2)) generate col1, col2 ;";
         buildPlan(query);
     }
     
     @Test
-    public void testQuery36() {
+    public void testQuery36() throws Exception {
         String query = "foreach (cogroup ( load 'a' as (col1, col2)) by col1) generate $1.(col2, col1);";
         buildPlan(query);
     }
     
     @Test
-    public void testQueryFail37() {
+    public void testQueryFail37() throws Exception {
         String query = "A = load 'a'; asdasdas";
         try{
             buildPlan(query);
         }catch(AssertionFailedError e){
-            assertTrue(e.getMessage().contains("Exception"));
+            return;
         }
+        Assert.fail( "Query should fail." );
     }
     
     @Test
-    public void testQuery38(){
+    public void testQuery38() throws Exception {
         String query = "c = cross (load 'a'), (load 'b');";
         buildPlan(query);
     }
@@ -528,40 +522,42 @@ public class TestLogicalPlanBuilder exte
     
     // TODO FIX Query39 and Query40
     @Test
-    public void testQuery39(){
-        buildPlan("a = load 'a' as (url, host, rank);");
-        buildPlan("b = group a by (url,host); ");
-        LogicalPlan lp = buildPlan("c = foreach b generate flatten(group.url), SUM(a.rank) as totalRank;");
-        buildPlan("d = filter c by totalRank > '10';");
-        buildPlan("e = foreach d generate totalRank;");
+    public void testQuery39() throws Exception{
+        String query = "a = load 'a' as (url, host, rank);" +
+                       "b = group a by (url,host); " +
+                       "c = foreach b generate flatten(group.url), SUM(a.rank) as totalRank;";
+        buildPlan(query);
+        query += "d = filter c by totalRank > '10';" +
+                 "e = foreach d generate totalRank;";
+        buildPlan( query );
     }
     
     @Test
-    public void testQueryFail39(){
-        buildPlan("a = load 'a' as (url, host, rank);");
-        buildPlan("b = group a by (url,host); ");
-        LogicalPlan lp = buildPlan("c = foreach b generate flatten(group.url), SUM(a.rank) as totalRank;");
-        buildPlan("d = filter c by totalRank > '10';");
+    public void testQueryFail39() throws Exception{
+        String query = "a = load 'a' as (url, host, rank);" +
+                       "b = group a by (url,host); " +
+             "c = foreach b generate flatten(group.url), SUM(a.rank) as totalRank;" +
+                       "d = filter c by totalRank > '10';" +
+                       "e = foreach d generate url;";
         try {
-            buildPlan("e = foreach d generate url;");//url has been falttened and hence the failure
+            buildPlan(query);//url has been falttened and hence the failure
         } catch(AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            Assert.assertTrue(e.getMessage().contains("Exception"));
         }
     }
     
     @Test
-    public void testQuery40() {
-        buildPlan("a = FILTER (load 'a') BY IsEmpty($2);");
-        buildPlan("a = FILTER (load 'a') BY (IsEmpty($2) AND ($3 == $2));");
+    public void testQuery40() throws Exception {
+        String query = "a = FILTER (load 'a') BY IsEmpty($2);";
+        buildPlan( query +"a = FILTER (load 'a') BY (IsEmpty($2) AND ($3 == $2));" );
     }
     
     @Test
-    public void testQueryFail41() {
-        buildPlan("a = load 'a';");
+    public void testQueryFail41() throws Exception {
         try {
-            buildPlan("b = a as (host,url);");
+            buildPlan("a = load 'a';" + "b = a as (host,url);");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Currently PIG does not support assigning an existing relation"));
+            return;
         }
         // TODO
         // the following statement was earlier present
@@ -569,43 +565,48 @@ public class TestLogicalPlanBuilder exte
         // above, we should test with the line below
         // uncommented
         //buildPlan("foreach b generate host;");
+        Assert.fail( "Query should fail." );
     }
     
     @Test
-    public void testQuery42() {
-        buildPlan("a = load 'a';");
-        buildPlan("b = foreach a generate $0 as url, $1 as rank;");
-        buildPlan("foreach b generate url;");
+    public void testQuery42() throws Exception {
+        String q = "a = load 'a';" +
+        "b = foreach a generate $0 as url, $1 as rank;" +
+        "foreach b generate url;";
+        buildPlan( q );
     }
 
     @Test
-    public void testQuery43() {
-        buildPlan("a = load 'a' as (url,hitCount);");
-        buildPlan("b = load 'a' as (url,rank);");
-        buildPlan("c = cogroup a by url, b by url;");
-        buildPlan("d = foreach c generate group,flatten(a),flatten(b);");
-        buildPlan("e = foreach d generate group, a::url, b::url, b::rank, rank;");
+    public void testQuery43() throws Exception {
+        String q = "a = load 'a' as (url,hitCount);" +
+        "b = load 'a' as (url,rank);" +
+        "c = cogroup a by url, b by url;" +
+        "d = foreach c generate group,flatten(a),flatten(b);" +
+        "e = foreach d generate group, a::url, b::url, b::rank, rank;";
+        buildPlan( q );
     }
 
     @Test
-    public void testQueryFail43() {
-        buildPlan("a = load 'a' as (name, age, gpa);");
-        buildPlan("b = load 'b' as (name, height);");
+    public void testQueryFail43() throws Exception {
+        String q = "a = load 'a' as (name, age, gpa);" +
+        "b = load 'b' as (name, height);";
         try {
-            String query = "c = cogroup a by (name, age), b by (height);";
+            String query = q + "c = cogroup a by (name, age), b by (height);";
             buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            return;
         }
+        Assert.fail( "Query should fail." );
     } 
 
 
     @Test
-    public void testQuery44() {
-        buildPlan("a = load 'a' as (url, pagerank);");
-        buildPlan("b = load 'b' as (url, query, rank);");
-        buildPlan("c = cogroup a by (pagerank#'nonspam', url) , b by (rank/'2', url) ;");
-        buildPlan("foreach c generate group.url;");
+    public void testQuery44() throws Exception {
+        String q = "a = load 'a' as (url, pagerank);" +
+        "b = load 'b' as (url, query, rank);" +
+        "c = cogroup a by (pagerank#'nonspam', url) , b by (rank/'2', url) ;" +
+        "foreach c generate group.url;";
+        buildPlan( q );
     }
 
 //TODO
@@ -616,7 +617,7 @@ public class TestLogicalPlanBuilder exte
         try {
             pig = new PigServer("local");
         } catch (IOException e) {
-            assertTrue(false);  // pig server failed for some reason
+            Assert.assertTrue(false);  // pig server failed for some reason
         }
         pig.registerFunction("myTr",
             new FuncSpec(GFAny.class.getName() + "('tr o 0')"));
@@ -625,355 +626,299 @@ public class TestLogicalPlanBuilder exte
         }catch(Exception e){
             return;
         }
-        assertTrue(false);
-    }
-    
-    /*
-    // Select
-    public void testQuery45() {
-        buildPlan("A = load 'a' as (url,hitCount);");
-        buildPlan("B = select url, hitCount from A;");
-        buildPlan("C = select url, hitCount from B;");
-    }
-
-    //Select + Join
-    public void testQuery46() {
-        buildPlan("A = load 'a' as (url,hitCount);");
-        buildPlan("B = load 'b' as (url,pageRank);");
-        buildPlan("C = select A.url, A.hitCount, B.pageRank from A join B on A.url == B.url;");        
-    }
-
-    // Mutliple Joins
-    public void testQuery47() {
-        buildPlan("A = load 'a' as (url,hitCount);");
-        buildPlan("B = load 'b' as (url,pageRank);");
-        buildPlan("C = load 'c' as (pageRank, position);");
-        buildPlan("B = select A.url, A.hitCount, B.pageRank from (A join B on A.url == B.url) join C on B.pageRank == C.pageRank;");
-    }
-
-    // Group
-    public void testQuery48() {
-        buildPlan("A = load 'a' as (url,hitCount);");        
-        buildPlan("C = select A.url, AVG(A.hitCount) from A group by url;");
-    }
-
-    // Join + Group
-    public void testQuery49() {
-        buildPlan("A = load 'a' as (url,hitCount);");
-        buildPlan("B = load 'b' as (url,pageRank);");
-        buildPlan("C = select A.url, AVG(B.pageRank), SUM(A.hitCount) from A join B on A.url == B.url group by A.url;");
-    }
-
-    // Group + Having
-    public void testQuery50() {
-        buildPlan("A = load 'a' as (url,hitCount);");        
-        buildPlan("C = select A.url, AVG(A.hitCount) from A group by url having AVG(A.hitCount) > '6';");
-    }
-
- // Group + Having + Order
-    public void testQuery51() {
-        buildPlan("A = load 'a' as (url,hitCount);");        
-        buildPlan("C = select A.url, AVG(A.hitCount) from A group by url order by A.url;");
+        Assert.assertTrue(false);
     }
     
-    // Group + Having + Order
-    public void testQuery52() {
-        buildPlan("A = load 'a' as (url,hitCount);");        
-        buildPlan("C = select A.url, AVG(A.hitCount) from A group by url having AVG(A.hitCount) > '6' order by A.url;");
-    }
-
-    // Group + Having + Order 2
-    public void testQuery53() {
-        buildPlan("A = load 'a' as (url,hitCount);");
-        buildPlan("C = select A.url, AVG(A.hitCount) from A group by url having AVG(A.hitCount) > '6' order by AVG(A.hitCount);");
-    }
-
-    // Group + Having + Order 2
-    public void testQuery54() {
-        buildPlan("A = load 'a' as (url,hitCount, size);");
-        buildPlan("C = select A.url, AVG(A.hitCount) from A group by url having AVG(A.size) > '6' order by AVG(A.hitCount);");
-    }
-
-    // Group + Having + Order 2
-    public void testQuery55() {
-        buildPlan("A = load 'a' as (url,hitCount, size);");
-        buildPlan("C = select A.url, AVG(A.hitCount), SUM(A.size) from A group by url having AVG(A.size) > '6' order by AVG(A.hitCount);");
-    }
-
-    // Group + Having + Order 2
-    public void testQuery56() {
-        buildPlan("A = load 'a' as (url,hitCount, date);");
-        buildPlan("C = select A.url, A.date, SUM(A.hitCount) from A group by url, date having AVG(A.hitCount) > '6' order by A.date;");
-    }
-    */
-
     @Test
-    public void testQuery57() {
+    public void testQuery57() throws Exception {
         String query = "foreach (load 'a') generate ($1+$2), ($1-$2), ($1*$2), ($1/$2), ($1%$2), -($1) ;";
         buildPlan(query);
     }
 
     
     @Test
-    public void testQuery58() {
-        buildPlan("a = load 'a' as (name, age, gpa);");
-        buildPlan("b = group a by name;");
-        String query = "foreach b {d = a.name; generate group, d;};";
+    public void testQuery58() throws Exception {
+        String query = "a = load 'a' as (name, age, gpa);" +
+        "b = group a by name;" +
+        "foreach b {d = a.name; generate group, d;};";
         buildPlan(query);
     } 
 
 	@Test
-    public void testQueryFail58(){
-        buildPlan("a = load 'a' as (url, host, rank);");
-        buildPlan("b = group a by url; ");
+    public void testQueryFail58() throws Exception{
+        String query = "a = load 'a' as (url, host, rank);" +
+        "b = group a by url; ";
         try {
-        	LogicalPlan lp = buildPlan("c = foreach b generate group.url;");
+        	LogicalPlan lp = buildPlan(query + "c = foreach b generate group.url;");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            Assert.assertTrue(e.getMessage().contains("Exception"));
         }
     }
 
     @Test
-    public void testQuery59() {
-        buildPlan("a = load 'a' as (name, age, gpa);");
-        buildPlan("b = load 'b' as (name, height);");
-        String query = "c = join a by name, b by name;";
+    public void testQuery59() throws Exception {
+    	String query = "a = load 'a' as (name, age, gpa);" +
+        "b = load 'b' as (name, height);" +
+        "c = join a by name, b by name;";
         buildPlan(query);
     } 
     
     @Test
-    public void testQuery60() {
-        buildPlan("a = load 'a' as (name, age, gpa);");
-        buildPlan("b = load 'b' as (name, height);");
-        String query = "c = cross a,b;";
+    public void testQuery60() throws Exception {
+         String query = "a = load 'a' as (name, age, gpa);" +
+        "b = load 'b' as (name, height);" +
+       "c = cross a,b;";
         buildPlan(query);
     } 
 
     @Test
-    public void testQuery61() {
-        buildPlan("a = load 'a' as (name, age, gpa);");
-        buildPlan("b = load 'b' as (name, height);");
-        String query = "c = union a,b;";
+    public void testQuery61() throws Exception {
+        String query = "a = load 'a' as (name, age, gpa);" +
+        "b = load 'b' as (name, height);" +
+        "c = union a,b;";
         buildPlan(query);
     }
 
     @Test
-    public void testQuery62() {
-        buildPlan("a = load 'a' as (name, age, gpa);");
-        buildPlan("b = load 'b' as (name, height);");
-        String query = "c = cross a,b;";
+    public void testQuery62() throws Exception {
+        String query = "a = load 'a' as (name, age, gpa);" +
+        "b = load 'b' as (name, height);" +
+        "c = cross a,b;" +
+        "d = order c by b::name, height, a::gpa;" +
+        "e = order a by name, age, gpa desc;" +
+        "f = order a by $0 asc, age, gpa desc;" +
+        "g = order a by * asc;" +
+        "h = cogroup a by name, b by name;" +
+        "i = foreach h {i1 = order a by *; generate i1;};";
         buildPlan(query);
-        buildPlan("d = order c by b::name, height, a::gpa;");
-        buildPlan("e = order a by name, age, gpa desc;");
-        buildPlan("f = order a by $0 asc, age, gpa desc;");
-        buildPlan("g = order a by * asc;");
-        buildPlan("h = cogroup a by name, b by name;");
-        buildPlan("i = foreach h {i1 = order a by *; generate i1;};");
     }
 
     @Test
-    public void testQueryFail62() {
-        buildPlan("a = load 'a' as (name, age, gpa);");
-        buildPlan("b = load 'b' as (name, height);");
-        String query = "c = cross a,b;";
-        buildPlan(query);
+    public void testQueryFail62() throws Exception {
+        String query = "a = load 'a' as (name, age, gpa);" +
+        "b = load 'b' as (name, height);" +
+        "c = cross a,b;" +
+        "d = order c by name, b::name, height, a::gpa;";
         try {
-        	buildPlan("d = order c by name, b::name, height, a::gpa;");
+            buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            Assert.assertTrue(e.getMessage().contains("Exception"));
         }
     }
 
     @Test
-    public void testQuery63() {
-        buildPlan("a = load 'a' as (name, details: tuple(age, gpa));");
-        buildPlan("b = group a by details;");
-        String query = "d = foreach b generate group.age;";
+    public void testQuery63() throws Exception {
+        String query = "a = load 'a' as (name, details: tuple(age, gpa));" +
+        "b = group a by details;" +
+        "d = foreach b generate group.age;" +
+        "e = foreach a generate name, details;";
         buildPlan(query);
-        buildPlan("e = foreach a generate name, details;");
+        
     }
 
     @Test
-    public void testQueryFail63() {
+    public void testQueryFail63() throws Exception {
         String query = "foreach (load 'myfile' as (col1, col2 : (sub1, sub2), col3 : (bag1))) generate col1 ;";
         try {
         	buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            Assert.assertTrue(e.getMessage().contains("Exception"));
         }
     }
     
     @Test
-    public void testQuery64() {
-        buildPlan("a = load 'a' as (name: chararray, details: tuple(age, gpa), mymap: map[]);");
-        buildPlan("c = load 'a' as (name, details: bag{mytuple: tuple(age: int, gpa)});");
-        buildPlan("b = group a by details;");
-        String query = "d = foreach b generate group.age;";
+    public void testQuery64() throws Exception {
+        String query = "a = load 'a' as (name: chararray, details: tuple(age, gpa), mymap: map[]);" +
+        "c = load 'a' as (name, details: bag{mytuple: tuple(age: int, gpa)});" +
+        "b = group a by details;" +
+        "d = foreach b generate group.age;" +
+		"e = foreach a generate name, details;" +
+		"f = LOAD 'myfile' AS (garage: bag{tuple1: tuple(num_tools: int)}, links: bag{tuple2: tuple(websites: chararray)}, page: bag{something_stupid: tuple(yeah_double: double)}, coordinates: bag{another_tuple: tuple(ok_float: float, bite_the_array: bytearray, bag_of_unknown: bag{})});";
         buildPlan(query);
-		buildPlan("e = foreach a generate name, details;");
-		buildPlan("f = LOAD 'myfile' AS (garage: bag{tuple1: tuple(num_tools: int)}, links: bag{tuple2: tuple(websites: chararray)}, page: bag{something_stupid: tuple(yeah_double: double)}, coordinates: bag{another_tuple: tuple(ok_float: float, bite_the_array: bytearray, bag_of_unknown: bag{})});");
     }
 
     @Test
-    public void testQueryFail64() {
+    public void testQueryFail64() throws Exception {
         String query = "foreach (load 'myfile' as (col1, col2 : bag{age: int})) generate col1 ;";
         try {
         	buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            return;
         }
+        Assert.fail( "query should fail" );
     }
     
     @Test
-    public void testQuery65() {
-        buildPlan("a = load 'a' as (name, age, gpa);");
-        buildPlan("b = load 'b' as (name, height);");
-		buildPlan("c = cogroup a by (name, age), b by (name, height);");
-		buildPlan("d = foreach c generate group.name, a.name as aName, b.name as b::name;");
+    public void testQuery65() throws Exception {
+        String q = "a = load 'a' as (name, age, gpa);" +
+        "b = load 'b' as (name, height);" +
+		"c = cogroup a by (name, age), b by (name, height);" +
+		"d = foreach c generate group.name, a.name as aName, b.name as bname;";
+        buildPlan( q );
 	}
 
     @Test
-    public void testQueryFail65() {
-        buildPlan("a = load 'a' as (name, age, gpa);");
-        buildPlan("b = load 'b' as (name, height);");
-		buildPlan("c = cogroup a by (name, age), b by (name, height);");
+    public void testQueryFail65() throws Exception {
+        String q = "a = load 'a' as (name, age, gpa);" +
+        "b = load 'b' as (name, height);" +
+		"c = cogroup a by (name, age), b by (name, height);" +
+	    "d = foreach c generate group.name, a.name, b.height as age, a.age;";
         try {
-			buildPlan("d = foreach c generate group.name, a.name, b.height as age, a.age;");
+        	buildPlan( q );
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            Assert.assertTrue(e.getMessage().contains("Exception"));
         }
 	}
 
     @Test
-    public void testQuery67() {
-        buildPlan(" a = load 'input1' as (name, age, gpa);");
-        buildPlan(" b = foreach a generate age, age * 10L, gpa/0.2f, {(16, 4.0e-2, 'hello')};");
+    public void testQuery67() throws Exception {
+        String q = " a = load 'input1' as (name, age, gpa);" +
+        " b = foreach a generate age, age * 10L, gpa/0.2f, {(16, 4.0e-2, 'hello')};";
+        buildPlan( q );
     }
 
     @Test
-    public void testQuery68() {
-        buildPlan(" a = load 'input1';");
-        buildPlan(" b = foreach a generate 10, {(16, 4.0e-2, 'hello'), (0.5f, 12l, 'another tuple')};");
+    public void testQuery68() throws Exception {
+    	String q = " a = load 'input1';" +
+        " b = foreach a generate 10, {(16, 4.0e-2, 'hello'), (0.5f, 12l, 'another tuple')};";
+        buildPlan( q );
     }
 
     @Test
-    public void testQuery69() {
-        buildPlan(" a = load 'input1';");
-        buildPlan(" b = foreach a generate {(16, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12L, (1))};");
+    public void testQuery69() throws Exception {
+    	String q = " a = load 'input1';" +
+        " b = foreach a generate {(16, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12L, (1))};";
+        buildPlan( q );
     }
 
     @Test
-    public void testQuery70() {
-        buildPlan(" a = load 'input1';");
-        buildPlan(" b = foreach a generate ['10'#'hello', '4.0e-2'#10L, '0.5f'#(1), 'world'#42, '42'#{('guide')}] as mymap:map[];");
-        buildPlan(" c = foreach b generate mymap#'10';");
+    public void testQuery70() throws Exception {
+    	String q = " a = load 'input1';" +
+        " b = foreach a generate ['10'#'hello', '4.0e-2'#10L, '0.5f'#(1), 'world'#42, '42'#{('guide')}] as mymap:map[];" +
+        " c = foreach b generate mymap#'10';";
+        buildPlan( q );
     }
 
     @Test
-    public void testQueryFail67() {
-        buildPlan(" a = load 'input1' as (name, age, gpa);");
+    public void testQueryFail67() throws Exception {
+        String q = " a = load 'input1' as (name, age, gpa);" +
+        " b = foreach a generate age, age * 10L, gpa/0.2f, {16, 4.0e-2, 'hello'};";
         try {
-            buildPlan(" b = foreach a generate age, age * 10L, gpa/0.2f, {16, 4.0e-2, 'hello'};");
+            buildPlan(q);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            return;
         }
+        Assert.fail( "query should fail" );
     }
     
     @Test
-    public void testQueryFail68() {
-        buildPlan(" a = load 'input1' as (name, age, gpa);");
+    public void testQueryFail68() throws Exception {
+        String q = " a = load 'input1' as (name, age, gpa);";
         try {
-            buildPlan(" b = foreach a generate {(16 L, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12L, {()})};");
+        	buildPlan( q +
+            " b = foreach a generate {(16 L, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12L, {()})};");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+           return;
         }
+        Assert.fail( "query should fail" );
     }
     
     @Test
-    public void testQuery71() {
-        buildPlan("split (load 'a') into x if $0 > '7', y if $0 < '7';");
-        buildPlan("b = foreach x generate $0;");
-        buildPlan("c = foreach y generate $1;");
+    public void testQuery71() throws Exception {
+        String q = "split (load 'a') into x if $0 > '7', y if $0 < '7';" +
+        "b = foreach x generate $0;" +
+        "c = foreach y generate $1;";
+        buildPlan( q );
     }
 
     @Test
-    public void testQuery72() {
-        buildPlan("split (load 'a') into x if $0 > '7', y if $0 < '7';");
-        buildPlan("b = foreach x generate (int)$0;");
-        buildPlan("c = foreach y generate (bag{})$1;");
-        buildPlan("d = foreach y generate (int)($1/2);");
-        buildPlan("e = foreach y generate (bag{tuple(int, float)})($1/2);");
-        buildPlan("f = foreach x generate (tuple(int, float))($1/2);");
-        buildPlan("g = foreach x generate (tuple())($1/2);");
-        buildPlan("h = foreach x generate (chararray)($1/2);");
+    public void testQuery72() throws Exception {
+        String q = "split (load 'a') into x if $0 > '7', y if $0 < '7';" +
+        "b = foreach x generate (int)$0;" +
+        "c = foreach y generate (bag{})$1;" +
+        "d = foreach y generate (int)($1/2);" +
+        "e = foreach y generate (bag{tuple(int, float)})($1/2);" +
+        "f = foreach x generate (tuple(int, float))($1/2);" +
+        "g = foreach x generate (tuple())($1/2);" +
+        "h = foreach x generate (chararray)($1/2);";
+        buildPlan( q );
     }
 
     @Test
-    public void testQueryFail72() {
-        buildPlan("split (load 'a') into x if $0 > '7', y if $0 < '7';");
+    public void testQueryFail72() throws Exception {
+    	boolean catchEx = false;
+        String q = "split (load 'a') into x if $0 > '7', y if $0 < '7';";
         try {
-            buildPlan("c = foreach y generate (bag)$1;");
+            buildPlan( q + "c = foreach y generate (bag)$1;");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+        	catchEx = true;
         }
+        Assert.assertTrue( catchEx );
+        catchEx = false;
         try {
-            buildPlan("c = foreach y generate (bag{int, float})$1;");
+        	buildPlan( q + "c = foreach y generate (bag{int, float})$1;");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+        	catchEx = true;
         }
+        Assert.assertTrue( catchEx );
+        catchEx = false;
         try {
-            buildPlan("c = foreach y generate (tuple)$1;");
+        	buildPlan( q + "c = foreach y generate (tuple)$1;");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+        	catchEx = true;
         }
+        Assert.assertTrue( catchEx );
     }
 
     @Test
-    public void testQuery73() {
-        buildPlan("split (load 'a') into x if $0 > '7', y if $0 < '7';");
-        buildPlan("b = filter x by $0 matches '^fred.*';");
-        buildPlan("c = foreach y generate $0, ($0 matches 'yuri.*' ? $1 - 10 : $1);");
+    public void testQuery73() throws Exception {
+    	String q = "split (load 'a') into x if $0 > '7', y if $0 < '7';" +
+        "b = filter x by $0 matches '^fred.*';" +
+        "c = foreach y generate $0, ($0 matches 'yuri.*' ? $1 - 10 : $1);";
+        buildPlan( q );
     }
 
     @Test
-    public void testQuery74() {
-        buildPlan("a = load 'a' as (field1: int, field2: long);");
-        buildPlan("b = load 'a' as (field1: bytearray, field2: double);");
-        buildPlan("c = group a by field1, b by field1;");
-        buildPlan("d = cogroup a by ((field1+field2)*field1), b by field1;");
+    public void testQuery74() throws Exception {
+        String q = "a = load 'a' as (field1: int, field2: long);" +
+        "b = load 'a' as (field1: bytearray, field2: double);" +
+        "c = group a by field1, b by field1;" +
+        "d = cogroup a by ((field1+field2)*field1), b by field1;";
+        buildPlan( q );
     }
 
     @Test
-    public void testQuery77() {
+    public void testQuery77() throws Exception {
         buildPlan("limit (load 'a') 100;");
     }
     
     @Test
-    public void testLimitWithLong() {
+    public void testLimitWithLong() throws Exception {
         buildPlan("limit (load 'a') 100L;");
     }
 
     @Test
-    public void testQuery75() {
-        buildPlan("a = union (load 'a'), (load 'b'), (load 'c');");
-        buildPlan("b = foreach a {generate $0;} parallel 10;");
+    public void testQuery75() throws Exception {
+        String q = "a = union (load 'a'), (load 'b'), (load 'c');";
+        buildPlan( q + "b = foreach a {generate $0;} parallel 10;");
     }
     
     @Test
-    public void testQuery76() {
-        buildPlan("split (load 'a') into x if $0 > '7', y if $0 < '7';");
-        buildPlan("b = filter x by $0 IS NULL;");
-        buildPlan("c = filter y by $0 IS NOT NULL;");
-        buildPlan("d = foreach b generate $0, ($1 IS NULL ? 0 : $1 - 7);");
-        buildPlan("e = foreach c generate $0, ($1 IS NOT NULL ? $1 - 5 : 0);");
+    public void testQuery76() throws Exception {
+    	String q = "split (load 'a') into x if $0 > '7', y if $0 < '7';" +
+        "b = filter x by $0 IS NULL;" +
+        "c = filter y by $0 IS NOT NULL;" +
+        "d = foreach b generate $0, ($1 IS NULL ? 0 : $1 - 7);" +
+        "e = foreach c generate $0, ($1 IS NOT NULL ? $1 - 5 : 0);";
+    	buildPlan( q );
     }
 
     @Test 
-    public void testQuery80() {
-        buildPlan("a = load 'input1' as (name, age, gpa);");
-        buildPlan("b = filter a by age < '20';");
-        buildPlan("c = group b by age;");
-        String query = "d = foreach c {" 
+    public void testQuery80() throws Exception {
+    	String q = "a = load 'input1' as (name, age, gpa);" +
+        "b = filter a by age < '20';" +
+        "c = group b by age;" +
+        "d = foreach c {" 
             + "cf = filter b by gpa < '3.0';"
             + "cp = cf.gpa;"
             + "cd = distinct cp;"
@@ -981,125 +926,118 @@ public class TestLogicalPlanBuilder exte
             + "generate group, flatten(co);"
             //+ "generate group, flatten(cd);"
             + "};";
-        buildPlan(query);
+        buildPlan(q);
     }
 
     @Test
-    public void testQuery81() {
-        buildPlan("a = load 'input1' using PigStorage() as (name, age, gpa);");
-        buildPlan("split a into b if name lt 'f', c if (name gte 'f' and name lte 'h'), d if name gt 'h';");
+    public void testQuery81() throws Exception {
+    	String q = "a = load 'input1' using PigStorage() as (name, age, gpa);" +
+        "split a into b if name lt 'f', c if (name gte 'f' and name lte 'h'), d if name gt 'h';";
+    	buildPlan( q );
     }
 
     @Test
-    public void testQueryFail81() {
-        buildPlan("a = load 'input1' using PigStorage() as (name, age, gpa);");
+    public void testQueryFail81() throws Exception {
+        String q = "a = load 'input1' using PigStorage() as (name, age, gpa);";
         try {
-            buildPlan("split a into b if name lt 'f', c if (name ge 'f' and name le 'h'), d if name gt 'h';");
+            buildPlan(q + "split a into b if name lt 'f', c if (name ge 'f' and name le 'h'), d if name gt 'h';");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            return;
         }
+        Assert.fail( "Query should fail." );
     }
     
     @Test
-    public void testQuery82() {
-        buildPlan("a = load 'myfile';");
-        buildPlan("b = group a by $0;"); 
-        String query = "c = foreach b {"
+    public void testQuery82() throws Exception {
+        String q = "a = load 'myfile';" +
+        "b = group a by $0;" + 
+        "c = foreach b {"
             + "c1 = order $1 by *;" 
             + "c2 = $1.$0;" 
             + "generate flatten(c1), c2;"
             + "};";
-        buildPlan(query);
+        buildPlan(q);
     }
 
     @Test
-    public void testQueryFail82() {
-        buildPlan("a = load 'myfile';");
-        buildPlan("b = group a by $0;"); 
-        String query = "c = foreach b {"
+    public void testQueryFail82() throws Exception {
+    	String q = "a = load 'myfile';" +
+        "b = group a by $0;" + 
+        "c = foreach b {"
             + "c1 = order $1 by *;" 
             + "c2 = $1;" 
             + "generate flatten(c1), c2;"
             + "};";
         try {
-        buildPlan(query);
+        buildPlan(q);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Exception"));
+            Assert.assertTrue(e.getMessage().contains("Exception"));
         }
     }
 
     @Test
-    public void testQuery83() {
-        buildPlan("a = load 'input1' as (name, age, gpa);");
-        buildPlan("b = filter a by age < '20';");
-        buildPlan("c = group b by (name,age);");
-        String query = "d = foreach c {" 
+    public void testQuery83() throws Exception {
+    	String q = "a = load 'input1' as (name, age, gpa);" +
+        "b = filter a by age < '20';" +
+        "c = group b by (name,age);" +
+        "d = foreach c {" 
             + "cf = filter b by gpa < '3.0';"
             + "cp = cf.gpa;"
             + "cd = distinct cp;"
             + "co = order cd by gpa;"
             + "generate group, flatten(co);"
             + "};";
-        buildPlan(query);
+        buildPlan(q);
     }
 
     @Test
-    public void testQuery84() {
-        buildPlan("a = load 'input1' as (name, age, gpa);");
-        buildPlan("b = filter a by age < '20';");
-        buildPlan("c = group b by (name,age);");
-        String query = "d = foreach c {"
+    public void testQuery84() throws Exception {
+    	String q = "a = load 'input1' as (name, age, gpa);" +
+        "b = filter a by age < '20';" +
+        "c = group b by (name,age);" +
+        "d = foreach c {"
             + "cf = filter b by gpa < '3.0';"
             + "cp = cf.$2;"
             + "cd = distinct cp;"
             + "co = order cd by gpa;"
             + "generate group, flatten(co);"
             + "};";
-        buildPlan(query);
+        buildPlan(q);
     }
     
     @Test
-    public void testQuery85() throws FrontendException {
+    public void testQuery85() throws Exception {
         LogicalPlan lp;
-        buildPlan("a = load 'myfile' as (name, age, gpa);");
-        lp = buildPlan("b = group a by (name, age);");
-        LOCogroup cogroup = (LOCogroup) lp.getLeaves().get(0);
-
-        Schema.FieldSchema nameFs = new Schema.FieldSchema("name", DataType.BYTEARRAY);
-        Schema.FieldSchema ageFs = new Schema.FieldSchema("age", DataType.BYTEARRAY);
-        Schema.FieldSchema gpaFs = new Schema.FieldSchema("gpa", DataType.BYTEARRAY);
-        
-        Schema groupSchema = new Schema(nameFs);
-        groupSchema.add(ageFs);
-        Schema.FieldSchema groupFs = new Schema.FieldSchema("group", groupSchema, DataType.TUPLE);
-        
-        Schema loadSchema = new Schema(nameFs);
-        loadSchema.add(ageFs);
-        loadSchema.add(gpaFs);
-
-        Schema.FieldSchema bagFs = new Schema.FieldSchema("a", loadSchema, DataType.BAG);
+        String query = "a = load 'myfile' as (name, age, gpa);" + 
+		               "b = group a by (name, age);";
+        lp = buildPlan( query + "store b into 'output';");
+        Operator store = lp.getSinks().get(0);
+        LOCogroup cogroup = (LOCogroup) lp.getPredecessors(store).get(0);
         
-        Schema cogroupExpectedSchema = new Schema(groupFs);
-        cogroupExpectedSchema.add(bagFs);
+        LogicalSchema actual = cogroup.getSchema();
+        System.out.println( actual.toString( false ) );
 
-        assertTrue(cogroup.getSchema().equals(cogroupExpectedSchema));
+        Assert.assertTrue(  actual.toString( false ).equals( "group:tuple(name:bytearray,age:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}" ) );
 
-        lp = buildPlan("c = foreach b generate group.name, group.age, COUNT(a.gpa);");
-        LOForEach foreach  = (LOForEach) lp.getLeaves().get(0);
+        lp = buildPlan(query +
+        		       "c = foreach b generate group.name, group.age, COUNT(a.gpa);" +
+        		       "store c into 'output';");
+        store = lp.getSinks().get(0);
+        LOForEach foreach  = (LOForEach) lp.getPredecessors(store).get(0);
 
-        Schema foreachExpectedSchema = new Schema(nameFs);
-        foreachExpectedSchema.add(ageFs);
-        foreachExpectedSchema.add(new Schema.FieldSchema(null, DataType.LONG));
 
-        assertTrue(foreach.getSchema().equals(foreachExpectedSchema));
+        Assert.assertTrue( foreach.getSchema().toString( false ).equals("name:bytearray,age:bytearray,:long") );
     }
 
     @Test
-    public void testQuery86() throws FrontendException {
+    public void testQuery86() throws Exception {
         LogicalPlan lp;
-        buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);");
-        lp = buildPlan("b = group a by (name, age);");
-        LOCogroup cogroup = (LOCogroup) lp.getLeaves().get(0);
+        String query = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
+                       "b = group a by (name, age);" +
+                       "store b into 'output';";
+        lp = buildPlan( query );
+        Operator store = lp.getSinks().get(0);
+        LOCogroup cogroup = (LOCogroup) lp.getPredecessors(store).get(0);
 
         Schema.FieldSchema nameFs = new Schema.FieldSchema("name", DataType.CHARARRAY);
         Schema.FieldSchema ageFs = new Schema.FieldSchema("age", DataType.INTEGER);
@@ -1117,316 +1055,337 @@ public class TestLogicalPlanBuilder exte
 
         Schema cogroupExpectedSchema = new Schema(groupFs);
         cogroupExpectedSchema.add(bagFs);
-
-        assertTrue(cogroup.getSchema().equals(cogroupExpectedSchema));
-
+        Assert.assertTrue(cogroup.getSchema().toString(false).equals("group:tuple(name:chararray,age:int),a:bag{:tuple(name:chararray,age:int,gpa:float)}"));
     }
 
     @Test
-    public void testQuery87() {
-        buildPlan("a = load 'myfile';");
-        buildPlan("b = group a by $0;");
-        LogicalPlan lp = buildPlan("c = foreach b {c1 = order $1 by $1; generate flatten(c1); };");
-        LOForEach foreach = (LOForEach)lp.getLeaves().get(0);
-        LogicalPlan nestedPlan = foreach.getForEachPlans().get(0);
-        LOProject sortInput = (LOProject)nestedPlan.getRoots().get(0);
-        LOSort nestedSort = (LOSort)nestedPlan.getSuccessors(sortInput).get(0);
-        LogicalPlan sortPlan = nestedSort.getSortColPlans().get(0);
-        assertTrue(sortPlan.getLeaves().size() == 1);
+    public void testQuery87() throws Exception {
+        String query = "a = load 'myfile';" +
+                       "b = group a by $0;" +
+                       "c = foreach b {c1 = order $1 by $1; generate flatten(c1); };" +
+                       "store c into 'output';";
+        LogicalPlan lp = buildPlan( query );
+        Operator store = lp.getSinks().get(0);
+        LOForEach foreach = (LOForEach) lp.getPredecessors(store).get(0);
+        LogicalPlan nestedPlan = foreach.getInnerPlan();
+//        LOProject sortInput = (LOProject)nestedPlan.getSources().get(0);
+//        LOSort nestedSort = (LOSort)nestedPlan.getSuccessors(sortInput).get(0);
+//        LogicalPlan sortPlan = nestedSort.getSortColPlans().get(0);
+//        Assert.assertTrue(sortPlan.getLeaves().size() == 1);
     }
 
     @Test
-    public void testQuery88() {
-        buildPlan("a = load 'myfile';");
-        buildPlan("b = group a by $0;");
-        LogicalPlan lp = buildPlan("c = order b by $1 ;");
-        LOSort sort = (LOSort)lp.getLeaves().get(0);
-        LOProject project1 = (LOProject) sort.getSortColPlans().get(0).getLeaves().get(0) ;
-        LOCogroup cogroup = (LOCogroup) lp.getPredecessors(sort).get(0) ;
-        assertEquals(project1.getExpression(), cogroup) ;
+    public void testQuery88() throws Exception {
+        String query = "a = load 'myfile';" +
+                       "b = group a by $0;" +
+                       "c = order b by $1 ;" +
+                       "store c into 'output';";
+        LogicalPlan lp = buildPlan( query );
+        Operator store = lp.getSinks().get(0);
+        LOSort sort = (LOSort) lp.getPredecessors(store).get(0);
+//        LOProject project1 = (LOProject) sort.getSortColPlans().get(0).getSinks().get(0) ;
+//        LOCogroup cogroup = (LOCogroup) lp.getPredecessors(sort).get(0) ;
+//        assertEquals(project1.getExpression(), cogroup) ;
     }
 
     @Test
-    public void testQuery89() {
-        buildPlan("a = load 'myfile';");
-        buildPlan("b = foreach a generate $0, $100;");
-        buildPlan("c = load 'myfile' as (i: int);");
-        buildPlan("d = foreach c generate $0 as zero, i;");
+    public void testQuery89() throws Exception {
+        String query = "a = load 'myfile';" + 
+                       "b = foreach a generate $0, $100;" +
+                       "c = load 'myfile' as (i: int);" +
+                       "d = foreach c generate $0 as zero, i;";
+        buildPlan( query );
     }
 
     @Test
-    public void testQueryFail89() {
-        buildPlan("c = load 'myfile' as (i: int);");
+    public void testQueryFail89() throws Exception {
+        String q = "c = load 'myfile' as (i: int);";
         try {
-            buildPlan("d = foreach c generate $0, $5;");
+            buildPlan(q + "d = foreach c generate $0, $5;");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Out of bound access"));
+            Assert.assertTrue(e.getMessage().contains("Out of bound access"));
         }
     }
 
     @Test
-    public void testQuery90() throws FrontendException, ParseException {
+    public void testQuery90() throws Exception {
         LogicalPlan lp;
         LOForEach foreach;
 
-        buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);");
-        buildPlan("b = group a by (name, age);");
-
-        //the first element in group, i.e., name is renamed as myname
-        lp = buildPlan("c = foreach b generate flatten(group) as (myname), COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("myname: chararray, age: int, mycount: long")));
-
+        String query = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
+                       "b = group a by (name, age);";
         //the first and second elements in group, i.e., name and age are renamed as myname and myage
-        lp = buildPlan("c = foreach b generate flatten(group) as (myname, myage), COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("myname: chararray, myage: int, mycount: long")));
+        lp = buildPlan(query + 
+        		"c = foreach b generate flatten(group) as (myname, myage), COUNT(a) as mycount;" +
+        		"store c into 'output';");
+        Operator store = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        Assert.assertTrue(foreach.getSchema().toString( false ).equals("myname:chararray,myage:int,mycount:long"));
 
         //the schema of group is unchanged
-        lp = buildPlan("c = foreach b generate flatten(group) as (), COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("group::name: chararray, group::age: int, mycount: long")));
-
-        //the first element in group, i.e., name is renamed as myname 
-        lp = buildPlan("c = foreach b generate flatten(group) as myname, COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("myname: chararray, age: int, mycount: long")));
+        lp = buildPlan( query + 
+        		"c = foreach b generate flatten(group), COUNT(a) as mycount;" +
+        		"store c into 'output';" );
+        store = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        Assert.assertTrue(foreach.getSchema().toString( false ).equals("group::name:chararray,group::age:int,mycount:long"));
 
         //group is renamed as mygroup
-        lp = buildPlan("c = foreach b generate group as mygroup, COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long")));
-
-        //group is renamed as mygroup and the first element is renamed as myname
-        lp = buildPlan("c = foreach b generate group as mygroup:(myname), COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(myname: chararray, age: int), mycount: long")));
+        lp = buildPlan(query +
+        		"c = foreach b generate group as mygroup, COUNT(a) as mycount;" +
+        		"store c into 'output';");
+        store = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        Assert.assertTrue(foreach.getSchema().toString( false ).equals("mygroup:tuple(name:chararray,age:int),mycount:long"));
 
         //group is renamed as mygroup and the elements are renamed as myname and myage
-        lp = buildPlan("c = foreach b generate group as mygroup:(myname, myage), COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(myname: chararray, myage: int), mycount: long")));
-
-        //group is renamed to mygroup as the tuple schema is empty
-        lp = buildPlan("c = foreach b generate group as mygroup:(), COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long")));
-
+        lp = buildPlan(query +
+        		"c = foreach b generate group as mygroup:(myname, myage), COUNT(a) as mycount;" +
+        	    "store c into 'output';");
+        store = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        Assert.assertTrue(foreach.getSchema().toString( false ).equals("mygroup:tuple(myname:chararray,myage:int),mycount:long"));
+/*
         //setting the schema of flattened bag that has no schema with the user defined schema
-        buildPlan("c = load 'another_file';");
-        buildPlan("d = cogroup a by $0, c by $0;");
-        lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as (x, y, z), COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: bytearray, y: bytearray, z: bytearray, mycount: long")));
+        String q = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
+                   "c = load 'another_file';" +
+                   "d = cogroup a by $0, c by $0;";
+        lp = buildPlan( q + "e = foreach d generate flatten(DIFF(a, c)) as (x, y, z), COUNT(a) as mycount;" + "store e into 'output';" );
+        store = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: bytearray, y: bytearray, z: bytearray, mycount: long")));
 
         //setting the schema of flattened bag that has no schema with the user defined schema
-        buildPlan("c = load 'another_file';");
-        buildPlan("d = cogroup a by $0, c by $0;");
-        lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as (x: int, y: float, z), COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: int, y: float, z: bytearray, mycount: long")));
+        q = query +
+                  "c = load 'another_file';" +
+                  "d = cogroup a by $0, c by $0;" +
+                  "e = foreach d generate flatten(DIFF(a, c)) as (x: int, y: float, z), COUNT(a) as mycount;";
+        lp = buildPlan(q);
+        store = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: int, y: float, z: bytearray, mycount: long")));
 
         //setting the schema of flattened bag that has no schema with the user defined schema
-        buildPlan("c = load 'another_file';");
-        buildPlan("d = cogroup a by $0, c by $0;");
-        lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as x, COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: bytearray, mycount: long")));
+        q = query +
+            "c = load 'another_file';" +
+            "d = cogroup a by $0, c by $0;" +
+            "e = foreach d generate flatten(DIFF(a, c)) as x, COUNT(a) as mycount;";
+        lp = buildPlan(q);
+        store = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: bytearray, mycount: long")));
 
         //setting the schema of flattened bag that has no schema with the user defined schema
-        buildPlan("c = load 'another_file';");
-        buildPlan("d = cogroup a by $0, c by $0;");
-        lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as x: int, COUNT(a) as mycount;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: int, mycount: long")));
-
+        q = query + 
+            "c = load 'another_file';" +
+            "d = cogroup a by $0, c by $0;" +
+            "e = foreach d generate flatten(DIFF(a, c)) as x: int, COUNT(a) as mycount;";
+        lp = buildPlan(q);
+        store = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: int, mycount: long")));
+*/
     }
 
     @Test
-    public void testQueryFail90() throws FrontendException, ParseException {
-        buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);");
-        buildPlan("b = group a by (name, age);");
+    public void testQueryFail90() throws Exception {
+        String query = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
+                       "b = group a by (name, age);";
 
         try {
-            buildPlan("c = foreach b generate group as mygroup:(myname, myage, mygpa), COUNT(a) as mycount;");
+            buildPlan( query + "c = foreach b generate group as mygroup:(myname, myage, mygpa), COUNT(a) as mycount;");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Schema size mismatch"));
+            Assert.assertTrue(e.getMessage().contains("Schema size mismatch"));
         }
 
         try {
-            buildPlan("c = foreach b generate group as mygroup:(myname: int, myage), COUNT(a) as mycount;");
+            buildPlan( query + "c = foreach b generate group as mygroup:(myname: int, myage), COUNT(a) as mycount;");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Type mismatch"));
+            Assert.assertTrue(e.getMessage().contains("Type mismatch"));
         }
 
         try {
-            buildPlan("c = foreach b generate group as mygroup:(myname, myage: chararray), COUNT(a) as mycount;");
+            buildPlan( query + "c = foreach b generate group as mygroup:(myname, myage: chararray), COUNT(a) as mycount;");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Type mismatch"));
+            Assert.assertTrue(e.getMessage().contains("Type mismatch"));
         }
 
         try {
-            buildPlan("c = foreach b generate group as mygroup:{t: (myname, myage)}, COUNT(a) as mycount;");
+            buildPlan( query + "c = foreach b generate group as mygroup:{t: (myname, myage)}, COUNT(a) as mycount;");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Type mismatch"));
+            Assert.assertTrue(e.getMessage().contains("Type mismatch"));
         }
 
         try {
-            buildPlan("c = foreach b generate flatten(group) as (myname, myage, mygpa), COUNT(a) as mycount;");
+            buildPlan( query + "c = foreach b generate flatten(group) as (myname, myage, mygpa), COUNT(a) as mycount;");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Schema size mismatch"));
+            Assert.assertTrue(e.getMessage().contains("Schema size mismatch"));
         }
     }
         
     @Test
-    public void testQuery91() {
-        buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);");
-        buildPlan("b = group a by name;");
-        buildPlan("c = foreach b generate SUM(a.age) + SUM(a.gpa);");
+    public void testQuery91() throws Exception {
+        String query = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
+                       "b = group a by name;";
+        buildPlan(query + "c = foreach b generate SUM(a.age) + SUM(a.gpa);");
     }
 
     @Test
-    public void testQuery92() {
-        buildPlan("a = load 'myfile' as (name, age, gpa);");
-        buildPlan("b = group a by name;");
-        String query = "c = foreach b { "
+    public void testQuery92() throws Exception {
+        String query = "a = load 'myfile' as (name, age, gpa);" +
+                       "b = group a by name;" +
+                       "c = foreach b { "
         + " alias = name#'alias'; "
         + " af = alias#'first'; "
         + " al = alias#'last'; "
         + " generate SUM(a.age) + SUM(a.gpa); "
         + "};";
+        buildPlan( query );
     }
 
     @Test
-    public void testQuery93() throws FrontendException, ParseException {
-        buildPlan("a = load 'one' as (name, age, gpa);");
-        buildPlan("b = group a by name;");
-        buildPlan("c = foreach b generate flatten(a);");
-        buildPlan("d = foreach c generate name;");
+    public void testQuery93() throws Exception {
+        String query = "a = load 'one' as (name, age, gpa);" +
+                       "b = group a by name;" +
+                       "c = foreach b generate flatten(a);" +
+                       "d = foreach c generate name;" +
         // test that we can refer to "name" field and not a::name
-        buildPlan("e = foreach d generate name;");
+                       "e = foreach d generate name;";
+        buildPlan( query );
     }
     
     @Test
-    public void testQueryFail93() throws FrontendException, ParseException {
-        buildPlan("a = load 'one' as (name, age, gpa);");
-        buildPlan("b = group a by name;");
-        buildPlan("c = foreach b generate flatten(a);");
-        buildPlan("d = foreach c generate name;");
+    public void testQueryFail93() throws Exception {
+        String query = "a = load 'one' as (name, age, gpa);" +
+        "b = group a by name;"+
+        "c = foreach b generate flatten(a);"+
+        "d = foreach c generate name;"+
         // test that we can refer to "name" field and a::name
-        buildPlan("e = foreach d generate a::name;");
+        "e = foreach d generate a::name;";
+        buildPlan( query );
     }
     
     @Test
-    public void testQuery94() throws FrontendException, ParseException {
-        buildPlan("a = load 'one' as (name, age, gpa);");
-        buildPlan("b = load 'two' as (name, age, somethingelse);");
-        buildPlan("c = cogroup a by name, b by name;");
-        buildPlan("d = foreach c generate flatten(a), flatten(b);");
+    public void testQuery94() throws Exception {
+        String query = "a = load 'one' as (name, age, gpa);" +
+        "b = load 'two' as (name, age, somethingelse);"+
+        "c = cogroup a by name, b by name;"+
+        "d = foreach c generate flatten(a), flatten(b);"+
         // test that we can refer to "a::name" field and not name
         // test that we can refer to "b::name" field and not name
-        buildPlan("e = foreach d generate a::name, b::name;");
+        "e = foreach d generate a::name, b::name;"+
         // test that we can refer to gpa and somethingelse
-        buildPlan("f = foreach d generate gpa, somethingelse, a::gpa, b::somethingelse;");
-        
+        "f = foreach d generate gpa, somethingelse, a::gpa, b::somethingelse;";
+        buildPlan( query );
     }
     
     @Test
-    public void testQueryFail94() throws FrontendException, ParseException {
-        buildPlan("a = load 'one' as (name, age, gpa);");
-        buildPlan("b = load 'two' as (name, age, somethingelse);");
-        buildPlan("c = cogroup a by name, b by name;");
-        buildPlan("d = foreach c generate flatten(a), flatten(b);");
+    public void testQueryFail94() throws Exception {
+        String query = "a = load 'one' as (name, age, gpa);" +
+        "b = load 'two' as (name, age, somethingelse);"+
+        "c = cogroup a by name, b by name;"+
+        "d = foreach c generate flatten(a), flatten(b);"+
+        "e = foreach d generate name;";
         // test that we can refer to "a::name" field and not name
         try {
-            buildPlan("e = foreach d generate name;");
+            buildPlan(query);
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Found more than one match:"));
+            Assert.assertTrue(e.getMessage().contains("Found more than one match:"));
         }
     }
 
     @Test
-    public void testQuery95() throws FrontendException, ParseException {
-        buildPlan("a = load 'myfile' as (name, age, gpa);");
-        buildPlan("b = group a by name;");
-        LogicalPlan lp = buildPlan("c = foreach b {d = order a by $1; generate flatten(d), MAX(a.age) as max_age;};");
-        LOForEach foreach = (LOForEach) lp.getLeaves().get(0);
+    public void testQuery95() throws Exception {
+        String query = "a = load 'myfile' as (name, age, gpa);" +
+                       "b = group a by name;" +
+                       "c = foreach b {d = order a by $1; generate flatten(d), MAX(a.age) as max_age;};" +
+                       "store c into 'output';";
+        LogicalPlan lp = buildPlan(query);
+        Operator store = lp.getSinks().get(0);
+        LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         LOCogroup cogroup = (LOCogroup) lp.getPredecessors(foreach).get(0);
         Schema.FieldSchema bagFs = new Schema.FieldSchema("a", Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), DataType.BAG);
         Schema.FieldSchema groupFs = new Schema.FieldSchema("group", DataType.BYTEARRAY);
         Schema cogroupExpectedSchema = new Schema();
         cogroupExpectedSchema.add(groupFs);
         cogroupExpectedSchema.add(bagFs);
-        assertTrue(Schema.equals(cogroup.getSchema(), cogroupExpectedSchema, false, false));
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray, max_age: double"), false, true));
+//        Assert.assertTrue(LogicalSchema.equals(cogroup.getSchema(), cogroupExpectedSchema, false, false));
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray, max_age: double"), false, true));
     }
 
     @Test
-    public void testQuery96() throws FrontendException, ParseException {
-        buildPlan("a = load 'input' as (name, age, gpa);");
-        buildPlan("b = filter a by age < 20;");
-        buildPlan("c = group b by age;");
-        String query = "d = foreach c {"
+    public void testQuery96() throws Exception {
+        String query = "a = load 'input' as (name, age, gpa);" +
+                       "b = filter a by age < 20;" +
+                       "c = group b by age;" +
+                       "d = foreach c {"
         + "cf = filter b by gpa < 3.0;"
         + "cd = distinct cf.gpa;"
         + "co = order cd by $0;"
         + "generate group, flatten(co);"
-        + "};";
+        + "};" +
+        "store d into 'output';";
         LogicalPlan lp = buildPlan(query);
-
-        LOForEach foreach = (LOForEach)lp.getLeaves().get(0);
-        ArrayList<LogicalPlan> foreachPlans = foreach.getForEachPlans();
-        LogicalPlan flattenPlan = foreachPlans.get(1);
-        LogicalOperator project = flattenPlan.getLeaves().get(0);
-        assertTrue(project instanceof LOProject);
-        LogicalOperator sort = flattenPlan.getPredecessors(project).get(0);
-        assertTrue(sort instanceof LOSort);
-        LogicalOperator distinct = flattenPlan.getPredecessors(sort).get(0);
-        assertTrue(distinct instanceof LODistinct);
-
-        //testing the presence of the nested foreach
-        LogicalOperator nestedForeach = flattenPlan.getPredecessors(distinct).get(0);
-        assertTrue(nestedForeach instanceof LOForEach);
-        LogicalPlan nestedForeachPlan = ((LOForEach)nestedForeach).getForEachPlans().get(0);
-        LogicalOperator nestedProject = nestedForeachPlan.getRoots().get(0);
-        assertTrue(nestedProject instanceof LOProject);
-        assertTrue(((LOProject)nestedProject).getCol() == 2);
-
-        //testing the filter inner plan for the absence of the project connected to project
-        LogicalOperator filter = flattenPlan.getPredecessors(nestedForeach).get(0);
-        assertTrue(filter instanceof LOFilter);
-        LogicalPlan comparisonPlan = ((LOFilter)filter).getComparisonPlan();
-        LOLesserThan lessThan = (LOLesserThan)comparisonPlan.getLeaves().get(0);
-        LOProject filterProject = (LOProject)lessThan.getLhsOperand();
-        assertTrue(null == comparisonPlan.getPredecessors(filterProject));
+        Operator store = lp.getSinks().get(0);
+        LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        LogicalPlan foreachPlans = foreach.getInnerPlan();
+//        LogicalPlan flattenPlan = foreachPlans.get(1);
+//        LogicalOperator project = flattenPlan.getLeaves().get(0);
+//        Assert.assertTrue(project instanceof LOProject);
+//        LogicalOperator sort = flattenPlan.getPredecessors(project).get(0);
+//        Assert.assertTrue(sort instanceof LOSort);
+//        LogicalOperator distinct = flattenPlan.getPredecessors(sort).get(0);
+//        Assert.assertTrue(distinct instanceof LODistinct);
+//
+//        //testing the presence of the nested foreach
+//        LogicalOperator nestedForeach = flattenPlan.getPredecessors(distinct).get(0);
+//        Assert.assertTrue(nestedForeach instanceof LOForEach);
+//        LogicalPlan nestedForeachPlan = ((LOForEach)nestedForeach).getForEachPlans().get(0);
+//        LogicalOperator nestedProject = nestedForeachPlan.getRoots().get(0);
+//        Assert.assertTrue(nestedProject instanceof LOProject);
+//        Assert.assertTrue(((LOProject)nestedProject).getCol() == 2);
+//
+//        //testing the filter inner plan for the absence of the project connected to project
+//        LogicalOperator filter = flattenPlan.getPredecessors(nestedForeach).get(0);
+//        Assert.assertTrue(filter instanceof LOFilter);
+//        LogicalPlan comparisonPlan = ((LOFilter)filter).getComparisonPlan();
+//        LOLesserThan lessThan = (LOLesserThan)comparisonPlan.getLeaves().get(0);
+//        LOProject filterProject = (LOProject)lessThan.getLhsOperand();
+//        Assert.assertTrue(null == comparisonPlan.getPredecessors(filterProject));
     }
-
+/*
     @Test
     public void testQuery97() throws FrontendException, ParseException {
         LogicalPlan lp;
         LOForEach foreach;
 
-        buildPlan("a = load 'one' as (name, age, gpa);");
-
-        lp = buildPlan("b = foreach a generate 1;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: int"), false, true));
-
-        lp = buildPlan("b = foreach a generate 1L;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: long"), false, true));
-
-        lp = buildPlan("b = foreach a generate 1.0;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: double"), false, true));
-
-        lp = buildPlan("b = foreach a generate 1.0f;");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: float"), false, true));
+        String query = "a = load 'one' as (name, age, gpa);";
+        String store = "store b into 'output';";
 
-        lp = buildPlan("b = foreach a generate 'hello';");
-        foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: chararray"), false, true));
+        lp = buildPlan(query + "b = foreach a generate 1;" + store);
+        foreach = (LOForEach)lp.getPredecessors(op);
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: int"), false, true));
+
+        lp = buildPlan(query + "b = foreach a generate 1L;" + store);
+        op = lp.getSinks().get(0);
+        Operator op = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(op);
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: long"), false, true));
+
+        lp = buildPlan(query + "b = foreach a generate 1.0;" + store);
+        op = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(op);
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: double"), false, true));
+
+        lp = buildPlan(query + "b = foreach a generate 1.0f;" + store);
+        op = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(op);
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: float"), false, true));
+
+        lp = buildPlan(query + "b = foreach a generate 'hello';" + store);
+        op = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(op);
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: chararray"), false, true));
     }
 
     @Test
@@ -1438,31 +1397,31 @@ public class TestLogicalPlanBuilder exte
 
         lp = buildPlan("b = foreach a generate (1);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: int)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: int)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1L);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: long)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: long)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1.0);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: double)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: double)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1.0f);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: float)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: float)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello');");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello', 1, 1L, 1.0f, 1.0);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, y: int, z: long, a: float, b: double)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, y: int, z: long, a: float, b: double)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello', {(1), (1.0)});");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, ib:{it:(d: double)})"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, ib:{it:(d: double)})"), false, true));
 
     }
 
@@ -1475,44 +1434,44 @@ public class TestLogicalPlanBuilder exte
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (2, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: int, y: chararray)}"), false, true));

[... 857 lines stripped ...]