You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC

svn commit: r1783988 [18/24] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Modified: pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java Wed Feb 22 09:43:41 2017
@@ -18,7 +18,9 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -26,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import junit.framework.Assert;
 import junit.framework.AssertionFailedError;
 
 import org.apache.hadoop.fs.Path;
@@ -49,8 +50,8 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.GFAny;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.logical.expression.ConstantExpression;
@@ -70,12 +71,14 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestLogicalPlanBuilder {
-    PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+    PigContext pigContext = null;
     private PigServer pigServer = null;
 
     @Before
     public void setUp() throws Exception {
+        pigContext = new PigContext(Util.getLocalTestMode(), new Properties());
     	pigServer = new PigServer( pigContext );
+        pigServer.setValidateEachStatement(true);
     	pigContext.connect();
     }
 
@@ -169,60 +172,35 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail1() throws Exception {
         String query = " foreach (group (A = load 'a') by $1) generate A.'1' ;";
-        try {
-            buildPlan(query);
-        } catch (AssertionFailedError e) {
-            return;
-        }
-        Assert.fail("Test case should fail" );
+        buildPlan(query);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail2() throws Exception {
         String query = "foreach group (load 'a') by $1 generate $1.* ;";
-        try {
-            buildPlan(query);
-        } catch (AssertionFailedError e) {
-        	return;
-        }
-        Assert.fail("Test case should fail" );
+        buildPlan(query);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail3() throws Exception {
         String query = "A = generate DISTINCT foreach (load 'a');";
-        try {
-            LogicalPlan lp = buildPlan(query);
-            System.out.println( lp.toString() );
-        } catch (AssertionFailedError e) {
-        	return;
-        }
-        Assert.fail("Test case should fail" );
+        LogicalPlan lp = buildPlan(query);
+        System.out.println( lp.toString() );
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail4() throws Exception {
         String query = "A = generate [ORDER BY $0][$3, $4] foreach (load 'a');";
-        try {
-            buildPlan(query);
-        } catch (AssertionFailedError e) {
-        	return;
-        }
-        Assert.fail("Test case should fail" );
+        buildPlan(query);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail5() throws Exception {
         String query = "A = generate " + TestApplyFunc.class.getName() + "($2.*) foreach (load 'a');";
-        try {
-            buildPlan(query);
-        } catch (AssertionFailedError e) {
-        	return;
-        }
-        Assert.fail("Test case should fail" );
+        buildPlan(query);
     }
 
     /**
@@ -255,11 +233,9 @@ public class TestLogicalPlanBuilder {
         List<Operator> listOp = lp.getSuccessors(root);
         Operator lo = listOp.get(0);
 
-        if (lo instanceof LOCogroup) {
-            Assert.assertEquals( 16, ((LOCogroup) lo).getRequestedParallelism() );
-        } else {
-            Assert.fail("Error: Unexpected Parse Tree output");
-    }
+        assertTrue(lo instanceof LOCogroup);
+        assertEquals( 16, ((LOCogroup) lo).getRequestedParallelism() );//Local mode, paraallel = 1
+
     }
 
     @Test
@@ -294,14 +270,15 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test
-    public void testQuery22Fail() throws Exception {
-        String query = "A = load 'a' as (a:int, b: double);" +
+    @Test(expected = FrontendException.class)
+    public void testQueryFail22() throws Exception {
+        String query = "A = load 'a';" +
                        "B = group A by (*, $0);";
         try {
             buildPlan(query);
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
+        } catch (Exception e) {
+            assertTrue(e.getCause().getMessage().contains("Grouping attributes can either be star (*"));
+            throw e;
         }
     }
 
@@ -327,50 +304,40 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQuery23Fail() throws Exception {
         String query = "A = load 'a' as (a: int, b:double);" +
                        "B = load 'b';" +
                        "C = cogroup A by (*, $0), B by ($0, $1);";
-        boolean exceptionThrown = false;
         try {
             buildPlan(query);
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
                         "do not match"));
-            exceptionThrown = true;
+            throw e;
         }
-        Assert.assertTrue(exceptionThrown);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQuery23Fail2() throws Exception {
         String query = "A = load 'a';" +
                        "B = load 'b';" +
                        "C = cogroup A by (*, $0), B by ($0, $1);";
-        boolean exceptionThrown = false;
-        try {
-            buildPlan(query);
-        } catch (AssertionFailedError e) {
-            exceptionThrown = true;
-        }
-        Assert.assertTrue(exceptionThrown);
+        buildPlan(query);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQuery23Fail3() throws Exception {
         String query = "A = load 'a' as (a: int, b:double);" +
                        "B = load 'b' as (a:int);" +
                        "C = cogroup A by *, B by *;";
-        boolean exceptionThrown = false;
         try {
             buildPlan(query);
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
                         "do not match"));
-            exceptionThrown = true;
+            throw e;
         }
-        Assert.assertTrue(exceptionThrown);
     }
 
     @Test
@@ -499,15 +466,10 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail37() throws Exception {
         String query = "A = load 'a'; asdasdas";
-        try{
-            buildPlan(query);
-        }catch(AssertionFailedError e){
-            return;
-        }
-        Assert.fail( "Query should fail." );
+        buildPlan(query);
     }
 
     @Test
@@ -528,18 +490,14 @@ public class TestLogicalPlanBuilder {
         buildPlan( query );
     }
 
-    @Test
-    public void testQueryFail39() throws Exception{
+    @Test(expected = FrontendException.class)
+    public void testQuery39Fail() throws Exception{
         String query = "a = load 'a' as (url, host, ranking);" +
                        "b = group a by (url,host); " +
         "c = foreach b generate flatten(group.url), SUM(a.ranking) as totalRank;" +
                        "d = filter c by totalRank > '10';" +
                        "e = foreach d generate url;";
-        try {
-            buildPlan(query);//url has been falttened and hence the failure
-        } catch(AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Exception"));
-        }
+        buildPlan(query);
     }
 
     @Test
@@ -548,20 +506,15 @@ public class TestLogicalPlanBuilder {
         buildPlan( query +"a = FILTER (load 'a') BY (IsEmpty($2) AND ($3 == $2));" );
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail41() throws Exception {
-        try {
-            buildPlan("a = load 'a';" + "b = a as (host,url);");
-        } catch (AssertionFailedError e) {
-            return;
-        }
+        buildPlan("a = load 'a';" + "b = a as (host,url);");
         // TODO
         // the following statement was earlier present
         // eventually when we do allow assignments of the form
         // above, we should test with the line below
         // uncommented
         //buildPlan("foreach b generate host;");
-        Assert.fail( "Query should fail." );
     }
 
     @Test
@@ -582,17 +535,12 @@ public class TestLogicalPlanBuilder {
         buildPlan( q );
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail43() throws Exception {
         String q = "a = load 'a' as (name, age, gpa);" +
         "b = load 'b' as (name, height);";
-        try {
-            String query = q + "c = cogroup a by (name, age), b by (height);";
-            buildPlan(query);
-        } catch (AssertionFailedError e) {
-            return;
-        }
-        Assert.fail( "Query should fail." );
+        String query = q + "c = cogroup a by (name, age), b by (height);";
+        buildPlan(query);
     }
 
     @Test
@@ -604,22 +552,13 @@ public class TestLogicalPlanBuilder {
         buildPlan( q );
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail44() throws Throwable {
         PigServer pig = null;
-        try {
-            pig = new PigServer("local");
-        } catch (IOException e) {
-            Assert.assertTrue(false);  // pig server failed for some reason
-        }
+        pig = new PigServer(Util.getLocalTestMode());
         pig.registerFunction("myTr",
             new FuncSpec(GFAny.class.getName() + "('tr o 0')"));
-        try{
-            pig.registerQuery("b = foreach (load 'a') generate myTr(myTr(*));");
-        }catch(Exception e){
-            return;
-        }
-        Assert.assertTrue(false);
+        pig.registerQuery("b = foreach (load 'a') generate myTr(myTr(*));");
     }
 
     @Test
@@ -636,15 +575,11 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-	@Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail58() throws Exception{
         String query = "a = load 'a' as (url, host, ranking);" +
         "b = group a by url; ";
-        try {
-            buildPlan(query + "c = foreach b generate group.url;");
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Exception"));
-        }
+        buildPlan(query + "c = foreach b generate group.url;");
     }
 
     @Test
@@ -685,17 +620,13 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail62() throws Exception {
         String query = "a = load 'a' as (name, age, gpa);" +
         "b = load 'b' as (name, height);" +
         "c = cross a,b;" +
         "d = order c by name, b::name, height, a::gpa;";
-        try {
-            buildPlan(query);
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Exception"));
-        }
+        buildPlan(query);
     }
 
     @Test
@@ -708,13 +639,9 @@ public class TestLogicalPlanBuilder {
     }
 
     @Test
-    public void testQueryFail63() throws Exception {
+    public void testQuery63a() throws Exception {
         String query = "foreach (load 'myfile' as (col1, col2 : (sub1, sub2), col3 : (bag1))) generate col1 ;";
-        try {
-        	buildPlan(query);
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Exception"));
-        }
+        buildPlan(query);
     }
 
     @Test
@@ -728,15 +655,10 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail64() throws Exception {
         String query = "foreach (load 'myfile' as (col1, col2 : bag{age: int})) generate col1 ;";
-        try {
-        	buildPlan(query);
-        } catch (AssertionFailedError e) {
-            return;
-        }
-        Assert.fail( "query should fail" );
+        buildPlan(query);
     }
 
     @Test
@@ -749,16 +671,12 @@ public class TestLogicalPlanBuilder {
 	}
 
     @Test
-    public void testQueryFail65() throws Exception {
+    public void testQuery66() throws Exception {
         String q = "a = load 'a' as (name, age, gpa);" +
         "b = load 'b' as (name, height);" +
-		"c = cogroup a by (name, age), b by (name, height);" +
-	    "d = foreach c generate group.name, a.name, b.height as age, a.age;";
-        try {
-        	buildPlan( q );
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Exception"));
-        }
+        "c = cogroup a by (name, age), b by (name, height);" +
+        "d = foreach c generate group.name, a.name, b.height as age, a.age;";
+        buildPlan( q );
 	}
 
     @Test
@@ -790,28 +708,23 @@ public class TestLogicalPlanBuilder {
         buildPlan( q );
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail67() throws Exception {
         String q = " a = load 'input1' as (name, age, gpa);" +
         " b = foreach a generate age, age * 10L, gpa/0.2f, {16, 4.0e-2, 'hello'};";
         try {
             buildPlan(q);
-        } catch (AssertionFailedError e) {
-            return;
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Pig script failed to parse: MismatchedTokenException"));
+            throw e;
         }
-        Assert.fail( "query should fail" );
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail68() throws Exception {
         String q = " a = load 'input1' as (name, age, gpa);";
-        try {
-        	buildPlan( q +
-            " b = foreach a generate {(16 L, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12L, {()})};");
-        } catch (AssertionFailedError e) {
-           return;
-        }
-        Assert.fail( "query should fail" );
+        buildPlan( q +
+          " b = foreach a generate {(16 L, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12L, {()})};");
     }
 
     @Test
@@ -841,24 +754,24 @@ public class TestLogicalPlanBuilder {
         String q = "split (load 'a') into x if $0 > '7', y if $0 < '7';";
         try {
             buildPlan( q + "c = foreach y generate (bag)$1;");
-        } catch (AssertionFailedError e) {
+        } catch (FrontendException e) {
         	catchEx = true;
         }
-        Assert.assertTrue( catchEx );
+        assertTrue( catchEx );
         catchEx = false;
         try {
         	buildPlan( q + "c = foreach y generate (bag{int, float})$1;");
-        } catch (AssertionFailedError e) {
+        } catch (FrontendException e) {
         	catchEx = true;
         }
-        Assert.assertTrue( catchEx );
+        assertTrue( catchEx );
         catchEx = false;
         try {
         	buildPlan( q + "c = foreach y generate (tuple)$1;");
-        } catch (AssertionFailedError e) {
+        } catch (FrontendException e) {
         	catchEx = true;
         }
-        Assert.assertTrue( catchEx );
+        assertTrue( catchEx );
     }
 
     @Test
@@ -904,12 +817,12 @@ public class TestLogicalPlanBuilder {
     	buildPlan( q );
     }
 
-    @Test 
+    @Test
     public void testQuery80() throws Exception {
     	String q = "a = load 'input1' as (name, age, gpa);" +
         "b = filter a by age < '20';" +
         "c = group b by age;" +
-        "d = foreach c {" 
+        "d = foreach c {"
             + "cf = filter b by gpa < '3.0';"
             + "cp = cf.gpa;"
             + "cd = distinct cp;"
@@ -927,15 +840,10 @@ public class TestLogicalPlanBuilder {
     	buildPlan( q );
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail81() throws Exception {
         String q = "a = load 'input1' using PigStorage() as (name, age, gpa);";
-        try {
-            buildPlan(q + "split a into b if name lt 'f', c if (name ge 'f' and name le 'h'), d if name gt 'h';");
-        } catch (AssertionFailedError e) {
-            return;
-        }
-        Assert.fail( "Query should fail." );
+        buildPlan(q + "split a into b if name lt 'f', c if (name ge 'f' and name le 'h'), d if name gt 'h';");
     }
 
     @Test
@@ -951,19 +859,15 @@ public class TestLogicalPlanBuilder {
     }
 
     @Test
-    public void testQueryFail82() throws Exception {
+    public void testQuery82a() throws Exception {
     	String q = "a = load 'myfile';" +
-        "b = group a by $0;" + 
+        "b = group a by $0;" +
         "c = foreach b {"
-            + "c1 = order $1 by *;" 
-            + "c2 = $1;" 
+            + "c1 = order $1 by *;"
+            + "c2 = $1;"
             + "generate flatten(c1), c2;"
             + "};";
-        try {
         buildPlan(q);
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Exception"));
-        }
     }
 
     @Test
@@ -971,7 +875,7 @@ public class TestLogicalPlanBuilder {
     	String q = "a = load 'input1' as (name, age, gpa);" +
         "b = filter a by age < '20';" +
         "c = group b by (name,age);" +
-        "d = foreach c {" 
+        "d = foreach c {"
             + "cf = filter b by gpa < '3.0';"
             + "cp = cf.gpa;"
             + "cd = distinct cp;"
@@ -1008,7 +912,7 @@ public class TestLogicalPlanBuilder {
         LogicalSchema actual = cogroup.getSchema();
         System.out.println( actual.toString( false ) );
 
-        Assert.assertTrue(  actual.toString( false ).equals( "group:tuple(name:bytearray,age:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}" ) );
+        assertEquals("group:tuple(name:bytearray,age:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}", actual.toString(false));
 
         lp = buildPlan(query +
         		       "c = foreach b generate group.name, group.age, COUNT(a.gpa);" +
@@ -1017,7 +921,7 @@ public class TestLogicalPlanBuilder {
         LOForEach foreach  = (LOForEach) lp.getPredecessors(store).get(0);
 
 
-        Assert.assertTrue( foreach.getSchema().toString( false ).equals("name:bytearray,age:bytearray,:long") );
+        assertEquals("name:bytearray,age:bytearray,:long", foreach.getSchema().toString(false));
     }
 
     @Test
@@ -1046,7 +950,7 @@ public class TestLogicalPlanBuilder {
 
         Schema cogroupExpectedSchema = new Schema(groupFs);
         cogroupExpectedSchema.add(bagFs);
-        Assert.assertTrue(cogroup.getSchema().toString(false).equals("group:tuple(name:chararray,age:int),a:bag{:tuple(name:chararray,age:int,gpa:float)}"));
+        assertEquals("group:tuple(name:chararray,age:int),a:bag{:tuple(name:chararray,age:int,gpa:float)}", cogroup.getSchema().toString(false));
     }
 
     @Test
@@ -1062,7 +966,7 @@ public class TestLogicalPlanBuilder {
         LOGenerate gen = (LOGenerate) nestedPlan.getSinks().get(0);
         LOSort nestedSort = (LOSort)nestedPlan.getPredecessors(gen).get(0);
         LogicalExpressionPlan sortPlan = nestedSort.getSortColPlans().get(0);
-        Assert.assertTrue(sortPlan.getSinks().size() == 1);
+        assertEquals(1, sortPlan.getSinks().size());
     }
 
     @Test
@@ -1081,20 +985,21 @@ public class TestLogicalPlanBuilder {
 
     @Test
     public void testQuery89() throws Exception {
-        String query = "a = load 'myfile';" + 
+        String query = "a = load 'myfile';" +
                        "b = foreach a generate $0, $100;" +
                        "c = load 'myfile' as (i: int);" +
                        "d = foreach c generate $0 as zero, i;";
         buildPlan( query );
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail89() throws Exception {
         String q = "c = load 'myfile' as (i: int);";
         try {
             buildPlan(q + "d = foreach c generate $0, $5;");
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Out of bound access"));
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Out of bound access"));
+            throw e;
         }
     }
 
@@ -1106,20 +1011,20 @@ public class TestLogicalPlanBuilder {
         String query = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
                        "b = group a by (name, age);";
         //the first and second elements in group, i.e., name and age are renamed as myname and myage
-        lp = buildPlan(query + 
+        lp = buildPlan(query +
         		"c = foreach b generate flatten(group) as (myname, myage), COUNT(a) as mycount;" +
         		"store c into 'output';");
         Operator store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        Assert.assertTrue(foreach.getSchema().isEqual(Utils.parseSchema("myname: chararray, age: int, mycount: long")));
+        assertTrue(foreach.getSchema().isEqual(Utils.parseSchema("myname: chararray, age: int, mycount: long")));
 
         //the schema of group is unchanged
-        lp = buildPlan( query + 
+        lp = buildPlan( query +
         		"c = foreach b generate flatten(group), COUNT(a) as mycount;" +
         		"store c into 'output';" );
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        Assert.assertTrue(foreach.getSchema().toString( false ).equals("group::name:chararray,group::age:int,mycount:long"));
+        assertEquals("group::name:chararray,group::age:int,mycount:long", foreach.getSchema().toString(false));
 
         //group is renamed as mygroup
         lp = buildPlan(query +
@@ -1127,7 +1032,7 @@ public class TestLogicalPlanBuilder {
         		"store c into 'output';");
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        Assert.assertTrue(foreach.getSchema().toString( false ).equals("mygroup:tuple(name:chararray,age:int),mycount:long"));
+        assertEquals("mygroup:tuple(name:chararray,age:int),mycount:long", foreach.getSchema().toString(false));
 
         //group is renamed as mygroup and the elements are renamed as myname and myage
         lp = buildPlan(query +
@@ -1135,8 +1040,7 @@ public class TestLogicalPlanBuilder {
         	    "store c into 'output';");
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        Assert.assertTrue(foreach.getSchema().toString( false ).equals("mygroup:tuple(myname:chararray,myage:int),mycount:long"));
-        /*
+        assertEquals("mygroup:tuple(myname:chararray,myage:int),mycount:long",foreach.getSchema().toString(false));
         //setting the schema of flattened bag that has no schema with the user defined schema
         String q = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
                    "c = load 'another_file';" +
@@ -1144,73 +1048,88 @@ public class TestLogicalPlanBuilder {
         lp = buildPlan( q + "e = foreach d generate flatten(DIFF(a, c)) as (x, y, z), COUNT(a) as mycount;" + "store e into 'output';" );
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: bytearray, y: bytearray, z: bytearray, mycount: long")));
+        assertEquals("x:bytearray,y:bytearray,z:bytearray,mycount:long",foreach.getSchema().toString(false));
 
         //setting the schema of flattened bag that has no schema with the user defined schema
         q = query +
                   "c = load 'another_file';" +
                   "d = cogroup a by $0, c by $0;" +
-                  "e = foreach d generate flatten(DIFF(a, c)) as (x: int, y: float, z), COUNT(a) as mycount;";
+                  "e = foreach d generate flatten(DIFF(a, c)) as (x: int, y: float, z), COUNT(a) as mycount;" +
+                  "store e into 'output';";
         lp = buildPlan(q);
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: int, y: float, z: bytearray, mycount: long")));
+        assertEquals("x:int,y:float,z:bytearray,mycount:long",foreach.getSchema().toString(false));
 
         //setting the schema of flattened bag that has no schema with the user defined schema
         q = query +
             "c = load 'another_file';" +
             "d = cogroup a by $0, c by $0;" +
-            "e = foreach d generate flatten(DIFF(a, c)) as x, COUNT(a) as mycount;";
+            "e = foreach d generate flatten(DIFF(a, c)) as x, COUNT(a) as mycount;" +
+            "store e into 'output';";
         lp = buildPlan(q);
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: bytearray, mycount: long")));
+        assertEquals("x:bytearray,mycount:long",foreach.getSchema().toString(false));
 
         //setting the schema of flattened bag that has no schema with the user defined schema
-        q = query + 
+        q = query +
             "c = load 'another_file';" +
             "d = cogroup a by $0, c by $0;" +
-            "e = foreach d generate flatten(DIFF(a, c)) as x: int, COUNT(a) as mycount;";
+            "e = foreach d generate flatten(DIFF(a, c)) as x: int, COUNT(a) as mycount;" +
+            "store e into 'output';";
         lp = buildPlan(q);
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: int, mycount: long")));
-         */
+        assertEquals("x:int,mycount:long",foreach.getSchema().toString(false));
     }
 
+    /**
+     * Test for {@link org.apache.pig.newplan.logical.visitor.ForEachUserSchemaVisitor} visit() for inserting Cast operator into plan correctly.
+     * @throws Exception
+     */
     @Test
-    public void testQueryFail90() throws Exception {
+    public void testQuery90a() throws Exception {
+        LogicalPlan lp = null;
+        LOForEach foreach = null;
+        Operator store = null;
+
         String query = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
-                       "b = group a by (name, age);";
+                "b = group a by (name, age);";
 
-        try {
-            buildPlan( query + "c = foreach b generate group as mygroup:(myname, myage), COUNT(a) as mycount;");
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Schema size mismatch"));
-        }
+        // Simply should work renaming
+        lp = buildPlan( query + "c = foreach b generate group as mygroup:(myname, myage), COUNT(a) as mycount; store c into 'output';");
+        store = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        assertEquals("mygroup:tuple(myname:chararray,myage:int),mycount:long",foreach.getSchema().toString(false));
 
-        try {
-            buildPlan( query + "c = foreach b generate group as mygroup:(myname: int, myage), COUNT(a) as mycount;");
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Type mismatch"));
-        }
+        // Casting should work regardless of type mismatch
+        lp = buildPlan( query + "c = foreach b generate group as mygroup:(myname: int, myage), COUNT(a) as mycount; store c into 'output';");
+        store = lp.getSinks().get(0);
+        foreach = (LOForEach)lp.getPredecessors(store).get(0);
+        assertEquals("mygroup:tuple(myname:int,myage:int),mycount:long",foreach.getSchema().toString(false));
 
-        try {
-            buildPlan( query + "c = foreach b generate group as mygroup:(myname, myage: chararray), COUNT(a) as mycount;");
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Type mismatch"));
-        }
+        // Casting in final phase should work too (no succeeding operators after cast in final foreach)
+        buildPlan( query + "c = foreach b generate group as mygroup:(myname, myage: chararray), COUNT(a) as mycount;");
+    }
+
+    @Test
+    public void testQueryFail90() throws Exception {
+        String query = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
+                       "b = group a by (name, age);";
 
         try {
             buildPlan( query + "c = foreach b generate group as mygroup:{t: (myname, myage)}, COUNT(a) as mycount;");
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Incompatible field schema"));
+            fail("Should have thrown error");
+        } catch (FrontendException e) {
+            assertTrue(e.getMessage().contains("Incompatible field schema"));
         }
 
         try {
             buildPlan( query + "c = foreach b generate flatten(group) as (myname, myage, mygpa), COUNT(a) as mycount;");
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Incompatible schema"));
+            fail("Should have thrown error");
+        } catch (FrontendException e) {
+            assertTrue(e.getMessage().contains("Incompatible schema"));
         }
     }
 
@@ -1246,7 +1165,7 @@ public class TestLogicalPlanBuilder {
     }
 
     @Test
-    public void testQueryFail93() throws Exception {
+    public void testQuery93_a() throws Exception {
         String query = "a = load 'one' as (name, age, gpa);" +
         "b = group a by name;"+
         "c = foreach b generate flatten(a);"+
@@ -1270,7 +1189,7 @@ public class TestLogicalPlanBuilder {
         buildPlan( query );
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQueryFail94() throws Exception {
         String query = "a = load 'one' as (name, age, gpa);" +
         "b = load 'two' as (name, age, somethingelse);"+
@@ -1280,8 +1199,9 @@ public class TestLogicalPlanBuilder {
         // test that we can refer to "a::name" field and not name
         try {
             buildPlan(query);
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Invalid field projection. Projected field [name] does not exist"));
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Invalid field projection. Projected field [name] does not exist"));
+            throw e;
         }
     }
 
@@ -1296,9 +1216,9 @@ public class TestLogicalPlanBuilder {
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         LOCogroup cogroup = (LOCogroup) lp.getPredecessors(foreach).get(0);
         String s = cogroup.getSchema().toString(false);
-        Assert.assertTrue( s.equals("group:bytearray,a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}"));
+        assertEquals("group:bytearray,a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}", s);
         s = foreach.getSchema().toString(false);
-        Assert.assertTrue( s.equals("d::name:bytearray,d::age:bytearray,d::gpa:bytearray,max_age:double"));
+        assertEquals("d::name:bytearray,d::age:bytearray,d::gpa:bytearray,max_age:double", s);
     }
 
     @Test
@@ -1319,27 +1239,27 @@ public class TestLogicalPlanBuilder {
         LogicalPlan foreachPlans = foreach.getInnerPlan();
         //        LogicalPlan flattenPlan = foreachPlans.get(1);
         //        LogicalOperator project = flattenPlan.getLeaves().get(0);
-        //        Assert.assertTrue(project instanceof LOProject);
+        //        assertTrue(project instanceof LOProject);
         //        LogicalOperator sort = flattenPlan.getPredecessors(project).get(0);
-        //        Assert.assertTrue(sort instanceof LOSort);
+        //        assertTrue(sort instanceof LOSort);
         //        LogicalOperator distinct = flattenPlan.getPredecessors(sort).get(0);
-        //        Assert.assertTrue(distinct instanceof LODistinct);
+        //        assertTrue(distinct instanceof LODistinct);
         //
         //        //testing the presence of the nested foreach
         //        LogicalOperator nestedForeach = flattenPlan.getPredecessors(distinct).get(0);
-        //        Assert.assertTrue(nestedForeach instanceof LOForEach);
+        //        assertTrue(nestedForeach instanceof LOForEach);
         //        LogicalPlan nestedForeachPlan = ((LOForEach)nestedForeach).getForEachPlans().get(0);
         //        LogicalOperator nestedProject = nestedForeachPlan.getRoots().get(0);
-        //        Assert.assertTrue(nestedProject instanceof LOProject);
-        //        Assert.assertTrue(((LOProject)nestedProject).getCol() == 2);
+        //        assertTrue(nestedProject instanceof LOProject);
+        //        assertTrue(((LOProject)nestedProject).getCol() == 2);
         //
         //        //testing the filter inner plan for the absence of the project connected to project
         //        LogicalOperator filter = flattenPlan.getPredecessors(nestedForeach).get(0);
-        //        Assert.assertTrue(filter instanceof LOFilter);
+        //        assertTrue(filter instanceof LOFilter);
         //        LogicalPlan comparisonPlan = ((LOFilter)filter).getComparisonPlan();
         //        LOLesserThan lessThan = (LOLesserThan)comparisonPlan.getLeaves().get(0);
         //        LOProject filterProject = (LOProject)lessThan.getLhsOperand();
-        //        Assert.assertTrue(null == comparisonPlan.getPredecessors(filterProject));
+        //        assertTrue(null == comparisonPlan.getPredecessors(filterProject));
     }
     /*
     @Test
@@ -1352,28 +1272,28 @@ public class TestLogicalPlanBuilder {
 
         lp = buildPlan(query + "b = foreach a generate 1;" + store);
         foreach = (LOForEach)lp.getPredecessors(op);
-//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: int"), false, true));
+//        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: int"), false, true));
 
         lp = buildPlan(query + "b = foreach a generate 1L;" + store);
         op = lp.getSinks().get(0);
         Operator op = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(op);
-//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: long"), false, true));
+//        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: long"), false, true));
 
         lp = buildPlan(query + "b = foreach a generate 1.0;" + store);
         op = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(op);
-//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: double"), false, true));
+//        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: double"), false, true));
 
         lp = buildPlan(query + "b = foreach a generate 1.0f;" + store);
         op = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(op);
-//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: float"), false, true));
+//        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: float"), false, true));
 
         lp = buildPlan(query + "b = foreach a generate 'hello';" + store);
         op = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(op);
-//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: chararray"), false, true));
+//        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: chararray"), false, true));
     }
 
     @Test
@@ -1385,31 +1305,31 @@ public class TestLogicalPlanBuilder {
 
         lp = buildPlan("b = foreach a generate (1);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: int)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: int)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1L);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: long)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: long)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1.0);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: double)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: double)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1.0f);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: float)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: float)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello');");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello', 1, 1L, 1.0f, 1.0);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, y: int, z: long, a: float, b: double)"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, y: int, z: long, a: float, b: double)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello', {(1), (1.0)});");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, ib:{it:(d: double)})"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, ib:{it:(d: double)})"), false, true));
 
     }
 
@@ -1422,39 +1342,39 @@ public class TestLogicalPlanBuilder {
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (2, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: int, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: int, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (1L, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: long, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: long, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (1.0f, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (1.0, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1L, 'hello'), (1.0f, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1L, 'hello'), (1.0, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1.0f, 'hello'), (1.0, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1.0, 'hello'), (1.0f, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1.0, 'hello', 3.14), (1.0f, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:()}"), false, true));
+        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:()}"), false, true));
 
     }
 
@@ -1517,7 +1437,7 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        Assert.assertTrue( s.equals("name:bytearray,age:bytearray,gpa:bytearray"));
+        assertEquals("name:bytearray,age:bytearray,gpa:bytearray", s);
     }
 
     @Test
@@ -1528,19 +1448,19 @@ public class TestLogicalPlanBuilder {
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         LOGenerate gen = (LOGenerate) foreach.getInnerPlan().getSinks().get(0);
         LogicalExpressionPlan foreachPlan = gen.getOutputPlans().get(0);
-        Assert.assertTrue(checkPlanForProjectStar(foreachPlan));
+        assertTrue(checkPlanForProjectStar(foreachPlan));
     }
 
     @Test
     public void testQuery108()  throws Exception {
-        String query = "a = load 'one' as (name, age, gpa);" + 
+        String query = "a = load 'one' as (name, age, gpa);" +
         "b = group a by *;" +
         "store b into 'output';";
         LogicalPlan lp = buildPlan(query);
         Operator store = lp.getSinks().get(0);
         LOCogroup cogroup = (LOCogroup)lp.getPredecessors(store).get(0);
         String s = cogroup.getSchema().toString(false);
-        Assert.assertTrue(s.equals("group:tuple(name:bytearray,age:bytearray,gpa:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}"));
+        assertEquals("group:tuple(name:bytearray,age:bytearray,gpa:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}", s);
     }
 
     @Test
@@ -1553,22 +1473,21 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOCogroup cogroup = (LOCogroup)lp.getPredecessors(store).get(0);
         String s = cogroup.getSchema().toString(false);
-        Assert.assertTrue(s.equals("group:tuple(name:bytearray,age:bytearray,gpa:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)},b:bag{:tuple(first_name:bytearray,enrol_age:bytearray,high_school_gpa:bytearray)}"));
+        assertEquals("group:tuple(name:bytearray,age:bytearray,gpa:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)},b:bag{:tuple(first_name:bytearray,enrol_age:bytearray,high_school_gpa:bytearray)}", s);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testQuery110Fail()  throws Exception {
     	String query = "a = load 'one' as (name, age, gpa);" +
     	"b = load 'two';" + "c = cogroup a by $0, b by *;";
 
         try {
             buildPlan( query );
-        } catch(AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only allowed " +
+        } catch(Exception e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only allowed " +
             		"if the input has a schema" ) );
-            return;
+            throw e;
         }
-        Assert.fail( "Test case should fail." );
     }
 
     @Test
@@ -1581,13 +1500,13 @@ public class TestLogicalPlanBuilder {
         LOSort sort = (LOSort)lp.getPredecessors(store).get(0);
 
         for(LogicalExpressionPlan sortPlan: sort.getSortColPlans() ) {
-            Assert.assertTrue(checkPlanForProjectStar(sortPlan) == false);
+            assertFalse(checkPlanForProjectStar(sortPlan));
         }
     }
 
     @Test
     public void testQuery112()  throws Exception {
-        String query = "a = load 'one' as (name, age, gpa);" + 
+        String query = "a = load 'one' as (name, age, gpa);" +
         "b = group a by *;" +
         "c = foreach b {a1 = order a by *; generate a1;};" +
         "store c into 'y';";
@@ -1596,7 +1515,7 @@ public class TestLogicalPlanBuilder {
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         LOGenerate gen = (LOGenerate) foreach.getInnerPlan().getSinks().get(0);
         for(LogicalExpressionPlan foreachPlan: gen.getOutputPlans()) {
-            Assert.assertTrue(checkPlanForProjectStar(foreachPlan) == true);
+            assertTrue(checkPlanForProjectStar(foreachPlan));
         }
 
         LogicalPlan foreachPlan = foreach.getInnerPlan();
@@ -1605,13 +1524,13 @@ public class TestLogicalPlanBuilder {
         // project (*) operator here is translated to a list of projection
         // operators
         for(LogicalExpressionPlan sortPlan: sort.getSortColPlans()) {
-            Assert.assertTrue(checkPlanForProjectStar(sortPlan) == false);
+            assertFalse(checkPlanForProjectStar(sortPlan));
         }
     }
 
     @Test
     public void testQuery114()  throws Exception {
-        String query = "a = load 'one' as (name, age, gpa);" + 
+        String query = "a = load 'one' as (name, age, gpa);" +
         "b = foreach a generate " + Identity.class.getName() + "(name, age);" +
         "store b into 'y';";
 
@@ -1619,7 +1538,7 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        Assert.assertTrue(s.equals(":tuple(name:bytearray,age:bytearray)"));
+        assertEquals(":tuple(name:bytearray,age:bytearray)", s);
     }
 
     @Test
@@ -1632,7 +1551,7 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        Assert.assertTrue(s.equals(":tuple(name:bytearray,age:bytearray,gpa:bytearray)"));
+        assertEquals(":tuple(name:bytearray,age:bytearray,gpa:bytearray)", s);
     }
 
     @Test
@@ -1645,19 +1564,19 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        Assert.assertTrue(s.equals(":tuple(:bytearray,:bytearray)"));
+        assertEquals(":tuple(:bytearray,:bytearray)", s);
     }
 
     @Test
     public void testQuery117()  throws Exception {
-        String query = "a = load 'one';" + 
+        String query = "a = load 'one';" +
         "b = foreach a generate " + Identity.class.getName() + "(*);" +
         "store b into 'y';";
         LogicalPlan lp = buildPlan(query);
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        Assert.assertTrue(s.equals(":tuple()"));
+        assertTrue(s.equals(":tuple()"));
     }
 
     @Test
@@ -1708,7 +1627,7 @@ public class TestLogicalPlanBuilder {
     @Test
     public void testNullConsConcatSize() throws Exception {
     	String query = "a = load 'a' as (x:int, y:double, str:chararray);" +
-        "b = foreach a generate SIZE(null), CONCAT(str, null), " + 
+        "b = foreach a generate SIZE(null), CONCAT(str, null), " +
                 "CONCAT(null, str);" +
                 "store b into 'output';";
         buildPlan(query);
@@ -1770,7 +1689,7 @@ public class TestLogicalPlanBuilder {
     @Test
     public void testCast() throws Exception {
     	String query = "a = load 'one.txt' as (x,y); " +
-        "b = foreach a generate (int)$0, (double)$1;" + 
+        "b = foreach a generate (int)$0, (double)$1;" +
         "c = group b by $0;"+
         "store c into 'output';";
     	buildPlan(query);
@@ -1856,14 +1775,14 @@ public class TestLogicalPlanBuilder {
 
     @Test
     public void testTokenizeSchema()  throws Exception {
-        String query = "a = load 'one' as (f1: chararray);" + 
+        String query = "a = load 'one' as (f1: chararray);" +
         "b = foreach a generate TOKENIZE(f1);" +
         "store b into 'output';";
         LogicalPlan lp = buildPlan(query);
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach) lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        Assert.assertTrue( s.equals("bag_of_tokenTuples_from_f1:bag{tuple_of_tokens:tuple(token:chararray)}"));
+        assertEquals("bag_of_tokenTuples_from_f1:bag{tuple_of_tokens:tuple(token:chararray)}", s);
     }
 
     @Test
@@ -1875,8 +1794,8 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach) lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        assertEquals(s, "bag_of_tokenTuples_from_f1:bag{tuple_of_tokens:tuple(token:chararray)}"
-        		+",bag_of_tokenTuples_from_f2:bag{tuple_of_tokens:tuple(token:chararray)}");
+        assertEquals("bag_of_tokenTuples_from_f1:bag{tuple_of_tokens:tuple(token:chararray)}"
+                +",bag_of_tokenTuples_from_f2:bag{tuple_of_tokens:tuple(token:chararray)}", s);
     }
 
     @Test
@@ -1888,15 +1807,15 @@ public class TestLogicalPlanBuilder {
         LOGenerate gen = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
         LogicalExpressionPlan exprPlan = gen.getOutputPlans().get(0);
         Operator logOp = exprPlan.getSources().get(0);
-        Assert.assertTrue( logOp instanceof ConstantExpression);
+        assertTrue(logOp instanceof ConstantExpression);
 
         ConstantExpression loConst = (ConstantExpression)logOp;
-        Assert.assertTrue(loConst.getType() == DataType.TUPLE);
-        Assert.assertTrue(loConst.getValue() instanceof Tuple);
-        Assert.assertTrue(loConst.getValue().equals(TupleFactory.getInstance().newTuple()));
+        assertEquals(DataType.TUPLE, loConst.getType());
+        assertTrue(loConst.getValue() instanceof Tuple);
+        assertEquals(TupleFactory.getInstance().newTuple(), loConst.getValue());
 
         String s = foreach.getSchema().toString(false);
-        Assert.assertTrue( s.equals(":tuple()"));
+        assertEquals(":tuple()", s);
     }
 
     @Test
@@ -1908,15 +1827,15 @@ public class TestLogicalPlanBuilder {
         LOGenerate gen = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
         LogicalExpressionPlan exprPlan = gen.getOutputPlans().get(0);
         Operator logOp = exprPlan.getSources().get(0);
-        Assert.assertTrue( logOp instanceof ConstantExpression);
+        assertTrue(logOp instanceof ConstantExpression);
 
         ConstantExpression loConst = (ConstantExpression)logOp;
-        Assert.assertTrue(loConst.getType() == DataType.MAP);
-        Assert.assertTrue(loConst.getValue() instanceof Map);
-        Assert.assertTrue(loConst.getValue().equals(new HashMap<String,Object>()));
+        assertEquals(DataType.MAP, loConst.getType());
+        assertTrue(loConst.getValue() instanceof Map);
+        assertEquals(new HashMap<String,Object>(), loConst.getValue());
 
         String s = foreach.getSchema().toString(false);
-        Assert.assertTrue( s.equals(":map"));
+        assertTrue( s.equals(":map"));
     }
 
     @Test
@@ -1929,15 +1848,14 @@ public class TestLogicalPlanBuilder {
         LOGenerate gen = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
         LogicalExpressionPlan exprPlan = gen.getOutputPlans().get(0);
         Operator logOp = exprPlan.getSources().get(0);
-        Assert.assertTrue( logOp instanceof ConstantExpression);
+        assertTrue(logOp instanceof ConstantExpression);
 
         ConstantExpression loConst = (ConstantExpression)logOp;
-        Assert.assertTrue(loConst.getType() == DataType.BAG);
-        Assert.assertTrue(loConst.getValue() instanceof DataBag);
-        Assert.assertTrue(loConst.getValue().equals(BagFactory.getInstance().newDefaultBag()));
+        assertEquals(DataType.BAG, loConst.getType());
+        assertTrue(loConst.getValue() instanceof DataBag);
+        assertEquals(BagFactory.getInstance().newDefaultBag(), loConst.getValue());
 
-        String s = foreach.getSchema().toString(false);
-        Assert.assertTrue( s.equals(":bag{}") );
+        assertEquals(":bag{}", foreach.getSchema().toString(false));
     }
 
     @Test
@@ -1948,8 +1866,7 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach) lp.getPredecessors(store).get(0);
 
-        String s = foreach.getSchema().toString(false);
-        Assert.assertTrue( s.equals(":tuple(:tuple())") );
+        assertEquals(":tuple(:tuple())", foreach.getSchema().toString(false));
     }
 
     @Test
@@ -1960,8 +1877,7 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach) lp.getPredecessors(store).get(0);
 
-        String s = foreach.getSchema().toString(false);
-        Assert.assertTrue( s.equals(":tuple(:map)") );
+        assertEquals(":tuple(:map)", foreach.getSchema().toString(false));
     }
 
     @Test
@@ -1972,8 +1888,7 @@ public class TestLogicalPlanBuilder {
         Operator op = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(op).get(0);
 
-        String s = foreach.getSchema().toString(false);
-        Assert.assertTrue( s.equals(":tuple(:bag{})") );
+        assertEquals(":tuple(:bag{})", foreach.getSchema().toString(false));
     }
 
     @Test
@@ -1984,8 +1899,7 @@ public class TestLogicalPlanBuilder {
         Operator op = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(op).get(0);
 
-        String s = foreach.getSchema().toString(false);
-        Assert.assertTrue( s.equals(":bag{:tuple()}") );
+        assertEquals(":bag{:tuple()}", foreach.getSchema().toString(false));
     }
 
     @Test
@@ -2014,42 +1928,36 @@ public class TestLogicalPlanBuilder {
         buildPlan( query );
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testCogroupByStarFailure1() throws Exception {
-        boolean exceptionThrown = false;
         try {
             String query = " a = load '1.txt' as (a0:int, a1:int);" +
             " b = load '2.txt'; " +
             "c = cogroup a by *, b by *;" +
             "store c into 'output';";
             buildPlan(query);
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only" +
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only" +
             		" allowed if the input has a schema"));
-            exceptionThrown = true;
+            throw e;
         }
-        Assert.assertEquals("An exception was expected but did " +
-                "not occur", true, exceptionThrown);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testCogroupByStarFailure2() throws Exception {
-        boolean exceptionThrown = false;
         try {
             String query = " a = load '1.txt' ;" +
             " b = load '2.txt' as (b0:int, b1:int); " +
             "c = cogroup a by *, b by *;" +
             "store c into 'output';";
             buildPlan( query );
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only allowed if the input has a schema"));
-            exceptionThrown = true;
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only allowed if the input has a schema"));
+            throw e;
         }
-        Assert.assertEquals("An exception was expected but did " +
-                "not occur", true, exceptionThrown);
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testMissingSemicolon() throws Exception {
         try {
             String query = "A = load '1.txt' \n" +
@@ -2057,32 +1965,27 @@ public class TestLogicalPlanBuilder {
                            "C = union A, B;\n" +
                            "store C into 'output';";
             buildPlan( query );
-        } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("mismatched input 'B' expecting SEMI_COLON"));
-           return;
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("mismatched input 'B' expecting SEMI_COLON"));
+            throw e;
         }
-        Assert.fail("An exception was expected but did not occur");
     }
 
-    @Test
+    @Test(expected = FrontendException.class)
     public void testCogroupByIncompatibleSchemaFailure() throws Exception {
-        boolean exceptionThrown = false;
         try {
             String query = " a = load '1.txt' as (a0:int, a1:int);" +
             " b = load '2.txt' as (a0:int, a1:chararray); " +
-            "c = cogroup a by (a0,a1), b by (a0,a1);" +
-            "store c into 'output';";
+            "c = cogroup a by (a0,a1), b by (a0,a1);";
             buildPlan( query );
-        } catch (AssertionFailedError e) {
+        } catch (Exception e) {
             String msg =
                 "group column no. 2 in relation no. 2 of  group statement" +
                 " has datatype chararray which is incompatible with type of" +
                 " corresponding column in earlier relation(s) in the statement";
-            Assert.assertTrue(e.getMessage().contains(msg));
-            exceptionThrown = true;
+            assertTrue(e.getCause().getMessage().contains(msg));
+            throw e;
         }
-        Assert.assertEquals("An exception was expected but did " +
-                "not occur", true, exceptionThrown);
     }
 
     @Test
@@ -2094,7 +1997,7 @@ public class TestLogicalPlanBuilder {
         LOLoad load = (LOLoad)plan.getPredecessors(op).get(0);
         // the signature is now a unique string of the format "{alias}_{scope id}-{id}" example: "a_12-0"
         String udfContextSignature = ((PigStorageWithSchema)(load).getLoadFunc()).getUDFContextSignature();
-        Assert.assertTrue(udfContextSignature, udfContextSignature.matches("a_[0-9]*-[0-9]*"));
+        assertTrue(udfContextSignature, udfContextSignature.matches("a_[0-9]*-[0-9]*"));
 
         query = " b = load '1.txt' using org.apache.pig.test.PigStorageWithSchema();" +
                 "store b into 'output';";
@@ -2102,20 +2005,15 @@ public class TestLogicalPlanBuilder {
         op = plan.getSinks().get(0);
         load = (LOLoad)plan.getPredecessors(op).get(0);
         udfContextSignature = ((PigStorageWithSchema)(load).getLoadFunc()).getUDFContextSignature();
-        Assert.assertTrue(udfContextSignature, udfContextSignature.matches("b_[0-9]*-[0-9]*"));
+        assertTrue(udfContextSignature, udfContextSignature.matches("b_[0-9]*-[0-9]*"));
     }
 
     @Test
     public void testLastAlias() throws Exception {
-        try {
-            String query = "B = load '2.txt' as (b0:int, b1:int);\n" +
-            		"C = ORDER B by b0;" ;
-            buildPlan( query );
-
-        } catch (AssertionFailedError e) {
-            // Ignore the exception
-        }
-        Assert.assertEquals("C", pigServer.getPigContext().getLastAlias());
+        String query = "B = load '2.txt' as (b0:int, b1:int);\n" +
+                "C = ORDER B by b0;";
+        buildPlan(query);
+        assertEquals("C", pigServer.getPigContext().getLastAlias());
     }
     
     @Test
@@ -2174,7 +2072,25 @@ public class TestLogicalPlanBuilder {
         assertTrue("Sink must end with output", lData.getSinks().get(0).endsWith("output"));
         assertEquals("Number of logical relational operators must be 4", lData.getNumLogicalRelationOperators(), 4);
     }
-    
+
+    @Test
+    public void testFlattenMap() throws Exception {
+       String query = "A = LOAD 'input.txt' as (rowId:int, dataMap:map[int]);" +
+               "B = FOREACH A GENERATE rowId, FLATTEN(dataMap);";
+
+        pigServer.registerQuery(query);
+        Schema schema = pigServer.dumpSchema("B");
+
+        assertEquals(3, schema.size());
+
+        assertEquals(DataType.INTEGER, schema.getField(0).type);
+        assertEquals("rowId", schema.getField(0).alias);
+
+        assertEquals(DataType.CHARARRAY, schema.getField(1).type);
+        assertEquals("dataMap::key", schema.getField(1).alias);
+        assertEquals(DataType.INTEGER, schema.getField(2).type);
+        assertEquals("dataMap::value", schema.getField(2).alias);
+    }
     /**
      * This method is not generic. Expects logical plan to have atleast
      * 1 source and returns the corresponding FuncSpec.
@@ -2216,19 +2132,6 @@ public class TestLogicalPlanBuilder {
 
     // Helper Functions
     public LogicalPlan buildPlan(String query) throws Exception {
-    	try {
-            return Util.buildLp(pigServer, query);
-    	} catch(Throwable t) {
-    	    PigException pigEx = LogUtils.getPigException(t);
-    	    Throwable cause = null;
-    	    if(pigEx != null){
-    	        cause = pigEx;
-    	    }else{
-    	        cause = t.getCause();
-    	    }
-    	    String msg = cause != null ? cause.toString() : t.toString();
-    	    throw new AssertionFailedError( msg );
-    	}
+        return Util.buildLp(pigServer, query);
     }
-
-}
+}
\ No newline at end of file

Modified: pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java Wed Feb 22 09:43:41 2017
@@ -102,7 +102,7 @@ public class TestMRJobStats {
         try {
             Constructor<MRJobStats> con = MRJobStats.class.getDeclaredConstructor(String.class, JobGraph.class);
             con.setAccessible(true);
-            MRJobStats jobStats = (MRJobStats) con.newInstance(name, plan);
+            MRJobStats jobStats = con.newInstance(name, plan);
             return jobStats;
         } catch (Exception e) {
             return null;
@@ -202,14 +202,49 @@ public class TestMRJobStats {
         }
     }
 
-    private static POStore createPOStoreForFileBasedSystem(long size, StoreFuncInterface storeFunc,
-            Configuration conf) throws Exception {
+    private POStore createPOStoreForFileBasedSystemWithSubDirectories(long size, StoreFuncInterface storeFunc, Configuration conf) throws Exception {
+        File root = createTmpDirectory("outputRoot", null);
+        File dir1 = createTmpDirectory("dir1", root);
+        File dir2 = createTmpDirectory("dir2", root);
+        createTmpFile("tempFile1", size, dir1);
+        createTmpFile("tempFile2", size, dir2);
+
+        storeFunc.setStoreLocation(root.getAbsolutePath(), new Job(conf));
+        FuncSpec funcSpec = new FuncSpec(storeFunc.getClass().getCanonicalName());
+        POStore poStore = new POStore(new OperatorKey());
+        poStore.setSFile(new FileSpec(root.getAbsolutePath(), funcSpec));
+        poStore.setStoreFunc(storeFunc);
+        poStore.setUp();
+
+        return poStore;
+    }
+
+    private static File createTmpDirectory(String name, File root) throws Exception {
+        File directory = File.createTempFile(name, "", root);
+
+        if (!(directory.delete())) {
+            throw new IOException("Could not delete temp file: " + directory.getAbsolutePath());
+        }
+
+        if (!(directory.mkdir())) {
+            throw new IOException("Could not create temp directory: " + directory.getAbsolutePath());
+        }
 
-        File file = File.createTempFile("tempFile", ".tmp");
+        return directory;
+    }
+
+    private static File createTmpFile(String name, long size, File directory) throws Exception {
+        File file = directory == null ? File.createTempFile(name, ".tmp") : File.createTempFile(name, ".tmp", directory);
         file.deleteOnExit();
         RandomAccessFile f = new RandomAccessFile(file, "rw");
         f.setLength(size);
         f.close();
+        return file;
+    }
+
+    private static POStore createPOStoreForFileBasedSystem(long size, StoreFuncInterface storeFunc,
+            Configuration conf) throws Exception {
+        File file = createTmpFile("tempFile", size, null);
 
         storeFunc.setStoreLocation(file.getAbsolutePath(), new Job(conf));
         FuncSpec funcSpec = new FuncSpec(storeFunc.getClass().getCanonicalName());
@@ -236,7 +271,7 @@ public class TestMRJobStats {
     }
 
     @Test
-    public void testGetOuputSizeUsingFileBasedStorage() throws Exception {
+    public void testGetOutputSizeUsingFileBasedStorage() throws Exception {
         // By default, FileBasedOutputSizeReader is used to compute the size of output.
         Configuration conf = new Configuration();
 
@@ -249,7 +284,20 @@ public class TestMRJobStats {
     }
 
     @Test
-    public void testGetOuputSizeUsingNonFileBasedStorage1() throws Exception {
+    public void testGetOutputSizeUsingFileBasedStorageWithSubDirectories() throws Exception {
+        // By default, FileBasedOutputSizeReader is used to compute the size of output.
+        Configuration conf = new Configuration();
+
+        long size = 2L * 1024 * 1024 * 1024;
+        long outputSize = JobStats.getOutputSize(
+                createPOStoreForFileBasedSystemWithSubDirectories(size, new PigStorageWithStatistics(), conf), conf);
+
+        assertEquals("The returned output size is expected to be sum of file sizes in the sub-directories",
+                2 * size, outputSize);
+    }
+
+    @Test
+    public void testGetOutputSizeUsingNonFileBasedStorage1() throws Exception {
         // By default, FileBasedOutputSizeReader is used to compute the size of output.
         Configuration conf = new Configuration();
 
@@ -263,7 +311,7 @@ public class TestMRJobStats {
     }
 
     @Test
-    public void testGetOuputSizeUsingNonFileBasedStorage2() throws Exception {
+    public void testGetOutputSizeUsingNonFileBasedStorage2() throws Exception {
         // Register a custom output size reader in configuration
         Configuration conf = new Configuration();
         conf.set(PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
@@ -279,7 +327,7 @@ public class TestMRJobStats {
     }
 
     @Test(expected = RuntimeException.class)
-    public void testGetOuputSizeUsingNonFileBasedStorage3() throws Exception {
+    public void testGetOutputSizeUsingNonFileBasedStorage3() throws Exception {
         // Register an invalid output size reader in configuration, and verify
         // that an exception is thrown at run-time.
         Configuration conf = new Configuration();
@@ -292,7 +340,7 @@ public class TestMRJobStats {
     }
 
     @Test
-    public void testGetOuputSizeUsingNonFileBasedStorage4() throws Exception {
+    public void testGetOutputSizeUsingNonFileBasedStorage4() throws Exception {
         // Register a comma-separated list of readers in configuration, and
         // verify that the one that supports a non-file-based uri is used.
         Configuration conf = new Configuration();
@@ -310,7 +358,7 @@ public class TestMRJobStats {
     }
 
     @Test
-    public void testGetOuputSizeUsingNonFileBasedStorage5() throws Exception {
+    public void testGetOutputSizeUsingNonFileBasedStorage5() throws Exception {
         Configuration conf = new Configuration();
 
         long size = 2L * 1024 * 1024 * 1024;

Modified: pig/branches/spark/test/org/apache/pig/test/TestMacroExpansion.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMacroExpansion.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMacroExpansion.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMacroExpansion.java Wed Feb 22 09:43:41 2017
@@ -1187,6 +1187,33 @@ public class TestMacroExpansion {
         
         validateFailure(sb.toString(), expectedErr, "at");
     }
+
+    @Test
+    public void lineNumberTest3() throws Throwable {
+        StringBuilder sb = new StringBuilder();
+        sb.append("/*\n" +
+        " * extra lines to offset the line number for the macro\n" +
+        " *\n" +
+        " *\n" +
+        " */\n" +
+        "\n" +
+        "\n" +
+        "define mymacro() returns void {\n" +
+        "A = load 'x' as ( u:int, v:long, w:bytearray);\n" +
+        "B = limit A 100;\n" +
+        "C = filter_typo B by 2 > 1;\n" +
+        "D = load 'y' as (d1, d2);\n" +
+        "E = join C by ( $0, $1 ), D by ( d1, d2 ) using 'replicated' parallel 16;\n" +
+        "F = store E into 'output';\n" +
+        "};\n"  +
+        "mymacro();\n"
+        );
+
+        String expectedErr =
+            "/myscript.pig, line 11, column 0>  Syntax error, unexpected symbol at or near 'C'";
+
+        validateFailure(sb.toString(), expectedErr, "/myscript.pig, line ");
+    }
     
     //see Pig-2184
     @Test
@@ -2252,6 +2279,135 @@ public class TestMacroExpansion {
 
         verify(script, expected);
     }
+
+    // When  declare-in-macro, macro param and command-line param contain the
+    // same name, last declare wins
+    @Test
+    public void testParamOverLap1() throws Exception {
+        String macro =
+            "DEFINE mygroupby(REL, key, number) RETURNS G {\n" +
+            "    %declare number 333;\n"  +
+            "    $G = GROUP $REL by $key parallel $number;\n" +
+            "};";
+        createFile("my_macro.pig", macro);
+
+        String script =
+            "%declare number 111;\n" +
+            "IMPORT 'my_macro.pig';\n" +
+            "data = LOAD '1234.txt' USING PigStorage() AS (i: int);\n" +
+            "result = mygroupby(data, i, 222);\n" +
+            "STORE result INTO 'test.out' USING PigStorage();";
+
+        String expected =
+            "data = LOAD '1234.txt' USING PigStorage() AS i:int;\n" +
+            "result = GROUP data by (i) parallel 333;\n" +
+            "STORE result INTO 'test.out' USING PigStorage();\n";
+
+        verify(script, expected);
+    }
+
+    // When  default-in-macro, macro param and command-line param contain the
+    // same name, then default should be ignored and macro param to be taken
+    @Test
+    public void testParamOverLap2() throws Exception {
+        String macro =
+            "DEFINE mygroupby(REL, key, number) RETURNS G {\n" +
+            "    %default number 333;\n"  +
+            "    $G = GROUP $REL by $key parallel $number;\n" +
+            "};";
+        createFile("my_macro.pig", macro);
+
+        String script =
+            "%declare number 111;\n" +
+            "IMPORT 'my_macro.pig';\n" +
+            "data = LOAD '1234.txt' USING PigStorage() AS (i: int);\n" +
+            "result = mygroupby(data, i, 222);\n" +
+            "STORE result INTO 'test.out' USING PigStorage();";
+
+        String expected =
+            "data = LOAD '1234.txt' USING PigStorage() AS i:int;\n" +
+            "result = GROUP data by (i) parallel 222;\n" +
+            "STORE result INTO 'test.out' USING PigStorage();\n";
+
+        verify(script, expected);
+    }
+
+    // Overlapping of  macro param and command-line param used to be disallowed.
+    // Now, simply taking the macro param when this happens
+    @Test
+    public void testParamOverLap3() throws Exception {
+        String macro =
+            "DEFINE mygroupby(REL, key, number) RETURNS G {\n" +
+            "    $G = GROUP $REL by $key parallel $number;\n" +
+            "};";
+        createFile("my_macro.pig", macro);
+
+        String script =
+            "%default number 111;\n" +
+            "IMPORT 'my_macro.pig';\n" +
+            "data = LOAD '1234.txt' USING PigStorage() AS (i: int);\n" +
+            "result = mygroupby(data, i, 222);\n" +
+            "STORE result INTO 'test.out' USING PigStorage();";
+
+        String expected =
+            "data = LOAD '1234.txt' USING PigStorage() AS i:int;\n" +
+            "result = GROUP data by (i) parallel 222;\n" +
+            "STORE result INTO 'test.out' USING PigStorage();\n";
+
+        verify(script, expected);
+    }
+
+    // Testing inline declare and commandline param overlap.
+    // testParamOverLap1 should cover this case as well but creating a specific
+    // case since this pair used to fail with NPE
+    @Test
+    public void testParamOverLap4() throws Exception {
+        String macro =
+            "DEFINE mygroupby(REL, key) RETURNS G {\n" +
+            "    %declare number 333;\n"  +
+            "    $G = GROUP $REL by $key parallel $number;\n" +
+            "};";
+        createFile("my_macro.pig", macro);
+
+        String script =
+            "%default number 111;\n" +
+            "IMPORT 'my_macro.pig';\n" +
+            "data = LOAD '1234.txt' USING PigStorage() AS (i: int);\n" +
+            "result = mygroupby(data, i);\n" +
+            "STORE result INTO 'test.out' USING PigStorage();";
+
+        String expected =
+            "data = LOAD '1234.txt' USING PigStorage() AS i:int;\n" +
+            "result = GROUP data by (i) parallel 333;\n" +
+            "STORE result INTO 'test.out' USING PigStorage();\n";
+
+        verify(script, expected);
+    }
+
+    // default-in-macro should yield to command-line param
+    @Test
+    public void testParamOverLap5() throws Exception {
+        String macro =
+            "DEFINE mygroupby(REL, key) RETURNS G {\n" +
+            "    %default number 333;\n"  +
+            "    $G = GROUP $REL by $key parallel $number;\n" +
+            "};";
+        createFile("my_macro.pig", macro);
+
+        String script =
+            "%declare number 111;\n" +
+            "IMPORT 'my_macro.pig';\n" +
+            "data = LOAD '1234.txt' USING PigStorage() AS (i: int);\n" +
+            "result = mygroupby(data, i);\n" +
+            "STORE result INTO 'test.out' USING PigStorage();";
+
+        String expected =
+            "data = LOAD '1234.txt' USING PigStorage() AS i:int;\n" +
+            "result = GROUP data by (i) parallel 111;\n" +
+            "STORE result INTO 'test.out' USING PigStorage();\n";
+
+        verify(script, expected);
+    }
 
     //-------------------------------------------------------------------------
     

Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Wed Feb 22 09:43:41 2017
@@ -883,6 +883,49 @@ public class TestMultiQuery {
     }
 
     @Test
+    public void testMultiQueryJiraPig4883() throws Exception {
+        Storage.Data data = Storage.resetData(myPig);
+        data.set("inputLocation",
+                Storage.tuple("c", "12"), Storage.tuple("d", "-12"));
+        myPig.setBatchOn();
+        myPig.registerQuery("A = load 'inputLocation' using mock.Storage();");
+        myPig.registerQuery("A = foreach A generate (chararray)$0 as id, (long)$1 as val;");
+        myPig.registerQuery("B = filter A by val > 0;");
+        myPig.registerQuery("B1 = group B by val;");
+        myPig.registerQuery("B1 = foreach B1 generate group as name, COUNT(B) as value;");
+        myPig.registerQuery("B1 = foreach B1 generate (chararray)name,value;");
+        myPig.registerQuery("store B1 into 'output1' using mock.Storage();");
+        myPig.registerQuery("B2 = group B by id;");
+        myPig.registerQuery("B2 = foreach B2 generate group as name, COUNT(B) as value;");
+        myPig.registerQuery("store B2 into 'output2' using mock.Storage();");
+        myPig.registerQuery("C = filter A by val < 0;");
+        myPig.registerQuery("C1 = group C by val;");
+        myPig.registerQuery("C1 = foreach C1 generate group as name, COUNT(C) as value;");
+        myPig.registerQuery("store C1 into 'output3' using mock.Storage();");
+        myPig.registerQuery("C2 = group C by id;");
+        myPig.registerQuery("C2 = foreach C2 generate group as name, COUNT(C) as value;");
+        myPig.registerQuery("store C2 into 'output4' using mock.Storage();");
+        myPig.executeBatch();
+
+        List<Tuple> actualResults = data.get("output1");
+        String[] expectedResults = new String[]{"(12, 1)"};
+        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("B1")));
+
+
+        actualResults = data.get("output2");
+        expectedResults = new String[]{"(c,1)"};
+        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("B2")));
+
+        actualResults = data.get("output3");
+        expectedResults = new String[]{"(-12, 1)"};
+        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("C1")));
+
+        actualResults = data.get("output4");
+        expectedResults = new String[]{"(d,1)"};
+        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("C2")));
+    }
+
+    @Test
     public void testMultiQueryJiraPig4899() throws Exception {
         myPig.setBatchOn();
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java Wed Feb 22 09:43:41 2017
@@ -1558,14 +1558,7 @@ public class TestMultiQueryCompiler {
         MROperPlan mrp = null;
 
         try {
-            java.lang.reflect.Method compile = launcher.getClass()
-                    .getDeclaredMethod("compile",
-                            new Class[] { PhysicalPlan.class, PigContext.class });
-
-            compile.setAccessible(true);
-
-            mrp = (MROperPlan) compile.invoke(launcher, new Object[] { pp, myPig.getPigContext() });
-
+            mrp = launcher.compile(pp, myPig.getPigContext());
             Assert.assertNotNull(mrp);
 
         } catch (Exception e) {

Modified: pig/branches/spark/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java Wed Feb 22 09:43:41 2017
@@ -57,6 +57,8 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.IsNullExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
 import org.apache.pig.newplan.logical.expression.OrExpression;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
@@ -127,6 +129,17 @@ public class TestNewPartitionFilterPushD
      * @throws Exception
      */
     @Test
+    public void testPartIsNullFilter() throws Exception {
+        String q = query + "b = filter a by srcid is null;" + "store b into 'out';";
+        test(q, Arrays.asList("srcid"),
+                null, "(srcid is null)");
+    }
+
+    /**
+     * test case where filter only contains condition on partition cols
+     * @throws Exception
+     */
+    @Test
     public void testOnlyPartFilter2() throws Exception {
         String q = query + "b = filter a by mrkt == 'us';" + "store b into 'out';";
         test(q, Arrays.asList("srcid", "mrkt"),
@@ -685,6 +698,15 @@ public class TestNewPartitionFilterPushD
         testFull(q, "((srcid < 5) or (srcid == 10))", "((f1 < 5) or (f2 == 'UK'))", false);
     }
 
+    // PIG-4940
+    @Test
+    public void testUnaryExpressions() throws Exception {
+        String q = query + "b = filter a by srcid == 10 and not browser#'type' is null;" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid"), "(srcid == 10)",
+                "(not (browser#'type' is null))", true);
+    }
+
     //// helper methods ///////
     private PartitionFilterExtractor test(String query, List<String> partitionCols,
             String expPartFilterString, String expFilterString)
@@ -849,7 +871,7 @@ public class TestNewPartitionFilterPushD
         return "(" + input + ")";
     }
 
-    private static String getTestExpression(LogicalExpression op) throws FrontendException {
+    public static String getTestExpression(LogicalExpression op) throws FrontendException {
         if(op == null) {
             return null;
         }
@@ -871,6 +893,8 @@ public class TestNewPartitionFilterPushD
                     opStr = " and ";
                 } else if (op instanceof OrExpression) {
                     opStr = " or ";
+                } else if (op instanceof NotEqualExpression) {
+                    opStr = " != ";
                 } else {
                     opStr = op.getName();
                 }
@@ -890,6 +914,9 @@ public class TestNewPartitionFilterPushD
                 int colind = ((DereferenceExpression) op).getBagColumns().get(0);
                 String column = String.valueOf(colind);
                 return alias + ".$" + column;
+            } else if (op instanceof NotExpression) {
+                String expr = getTestExpression(((NotExpression) op).getExpression());
+                return braketize("not " + expr);
             } else {
                 throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
             }

Modified: pig/branches/spark/test/org/apache/pig/test/TestNewPlanColumnPrune.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPlanColumnPrune.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestNewPlanColumnPrune.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestNewPlanColumnPrune.java Wed Feb 22 09:43:41 2017
@@ -454,6 +454,34 @@ public class TestNewPlanColumnPrune {
         }
     }
 
+    @Test
+    public void testNoAddForeach() throws Exception  {
+        // PIG-5055
+        // Need to make sure that it does not add foreach
+        // that drops all the fields from B2.
+        String query = "A = load 'd.txt' as (a0:int, a1:int, a2:int);" +
+        "B = load 'd.txt' as (b0:int, b1:int, b2:int);" +
+        "B2 = FILTER B by b0 == 0;" +
+        "C = join A by (1), B2 by (1) ;" +
+        "D = FOREACH C GENERATE A::a1, A::a2;" +
+        "store D into 'empty';";
+
+        LogicalPlan newLogicalPlan = buildPlan(query);
+
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        System.err.println(newLogicalPlan);
+        Iterator<Operator> iter = newLogicalPlan.getOperators();
+        while (iter.hasNext()) {
+            Operator o = iter.next();
+            LogicalRelationalOperator lro = (LogicalRelationalOperator)o;
+            if (lro == null || lro.getAlias() == null) continue;
+            if (lro.getAlias().equals("B2")) {
+                assertNotNull(lro.getSchema());
+            }
+        }
+    }
+
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
 
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {