You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC

svn commit: r1784224 [12/17] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Modified: pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java Fri Feb 24 03:34:37 2017
@@ -20,34 +20,16 @@ package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import junit.framework.Assert;
 
-import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
-import java.util.Iterator;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadCaster;
-import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.builtin.Utf8StorageConverter;
-import org.apache.pig.builtin.mock.Storage;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.newplan.DependencyOrderWalker;
-import org.apache.pig.newplan.OperatorPlan;
-import org.apache.pig.newplan.PlanWalker;
-import org.apache.pig.newplan.ReverseDependencyOrderWalker;
-import org.apache.pig.newplan.logical.expression.CastExpression;
-import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
-import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
-import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.visitor.LineageFindRelVisitor;
@@ -60,13 +42,6 @@ import org.junit.Test;
 
 public class TestLineageFindRelVisitor {
 
-    private PigServer pig ;
-
-    @Before
-    public void setUp() throws Exception{
-        pig = new PigServer(Util.getLocalTestMode()) ;
-    }
-
     public static class SillyLoadCasterWithExtraConstructor extends Utf8StorageConverter {
         public SillyLoadCasterWithExtraConstructor(String ignored) {
             super();
@@ -94,13 +69,6 @@ public class TestLineageFindRelVisitor {
         }
     }
 
-    public static class ToTupleWithCustomLoadCaster extends org.apache.pig.builtin.TOTUPLE {
-        @Override
-        public LoadCaster getLoadCaster() throws IOException {
-            return new SillyLoadCasterWithExtraConstructor("ignored");
-        }
-    }
-
     @Test
     public void testhaveIdenticalCasters() throws Exception {
         LogicalPlan lp = new LogicalPlan();
@@ -155,169 +123,6 @@ public class TestLineageFindRelVisitor {
                            (Boolean) testMethod.invoke(lineageFindRelVisitor,
                                      casterWithExtraConstuctorSpec, casterWithExtraConstuctorSpec) );
 
-        Assert.assertEquals("Loader should be instantiated at most once.", 1, SillyLoaderWithLoadCasterWithExtraConstructor.counter);
-    }
-
-    @Test
-    public void testIdenticalColumnUDFForwardingLoadCaster() throws Exception {
-        Storage.Data data = Storage.resetData(pig);
-        data.set("input",
-                Storage.tuple(Storage.map(
-                                 "key1",new DataByteArray("aaa"),
-                                 "key2",new DataByteArray("bbb"),
-                                 "key3",new DataByteArray("ccc"))),
-                Storage.tuple(Storage.map(
-                                 "key1",new DataByteArray("zzz"),
-                                 "key2",new DataByteArray("yyy"),
-                                 "key3",new DataByteArray("xxx"))));
-        pig.setBatchOn();
-        pig.registerQuery("A = load 'input' using mock.Storage() as (m:[bytearray]);");
-        pig.registerQuery("B = foreach A GENERATE m#'key1' as key1, m#'key2' as key2; "
-                // this equal comparison creates implicit typecast to chararray
-                // which requires loadcaster
-                + "C = FILTER B by key1 == 'aaa' and key2 == 'bbb';");
-        pig.registerQuery("store C into 'output' using mock.Storage();");
-
-        pig.executeBatch();
-
-        List<Tuple> actualResults = data.get("output");
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {"('aaa', 'bbb')"});
-        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
-    }
-
-    @Test
-    public void testUDFForwardingLoadCaster() throws Exception {
-        Storage.Data data = Storage.resetData(pig);
-        data.set("input",
-                Storage.tuple(new DataByteArray("aaa")),
-                Storage.tuple(new DataByteArray("bbb")));
-        pig.setBatchOn();
-        String query = "A = load 'input' using mock.Storage() as (a1:bytearray);"
-            + "B = foreach A GENERATE TOTUPLE(a1) as tupleA;"
-            + "C = foreach B GENERATE (chararray) tupleA.a1;"  //using loadcaster
-            + "store C into 'output' using mock.Storage();";
-
-        LogicalPlan lp = Util.parse(query, pig.getPigContext());
-        Util.optimizeNewLP(lp);
-
-        CastFinder cf = new CastFinder(lp);
-        cf.visit();
-        Assert.assertEquals("There should be only one typecast expression.", 1, cf.casts.size());
-        Assert.assertEquals("Loadcaster should be coming from the Load", "mock.Storage", cf.casts.get(0).getFuncSpec().getClassName());
-
-        pig.registerQuery(query);
-        pig.executeBatch();
-
-        List<Tuple> actualResults = data.get("output");
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {"('aaa')", "('bbb')"});
-        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
-    }
-
-    @Test
-    public void testUDFgetLoadCaster() throws Exception {
-        Storage.Data data = Storage.resetData(pig);
-        data.set("input",
-                Storage.tuple(new DataByteArray("aaa")),
-                Storage.tuple(new DataByteArray("bbb")));
-        pig.setBatchOn();
-        String query = "A = load 'input' using mock.Storage() as (a1:bytearray);"
-            + "B = foreach A GENERATE org.apache.pig.test.TestLineageFindRelVisitor$ToTupleWithCustomLoadCaster(a1) as tupleA;"
-            + "C = foreach B GENERATE (chararray) tupleA.a1;" //using loadcaster
-            + "store C into 'output' using mock.Storage();";
-
-        pig.registerQuery(query);
-        pig.executeBatch();
-
-        LogicalPlan lp = Util.parse(query, pig.getPigContext());
-        Util.optimizeNewLP(lp);
-
-        CastFinder cf = new CastFinder(lp);
-        cf.visit();
-        Assert.assertEquals("There should be only one typecast expression.", 1, cf.casts.size());
-        Assert.assertEquals("Loadcaster should be coming from the UDF", "org.apache.pig.test.TestLineageFindRelVisitor$ToTupleWithCustomLoadCaster", cf.casts.get(0).getFuncSpec().getClassName());
-
-        List<Tuple> actualResults = data.get("output");
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {"('aaa')", "('bbb')"});
-        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
-    }
-
-    @Test
-    public void testUDFForwardingLoadCasterWithMultipleParams() throws Exception{
-        File inputfile = Util.createFile(new String[]{"123","456","789"});
-
-        pig.registerQuery("A = load '"
-                + inputfile.toString()
-                + "' using PigStorage() as (a1:bytearray);\n");
-        pig.registerQuery("B = load '"
-                + inputfile.toString()
-                + "' using PigStorage() as (b1:bytearray);\n");
-        pig.registerQuery("C = join A by a1, B by b1;\n");
-        pig.registerQuery("D = FOREACH C GENERATE TOTUPLE(a1,b1) as tupleD;\n");
-        pig.registerQuery("E = FOREACH D GENERATE (chararray) tupleD.a1;\n");
-        Iterator<Tuple> iter  = pig.openIterator("E");
-
-        Assert.assertEquals("123", iter.next().get(0));
-        Assert.assertEquals("456", iter.next().get(0));
-        Assert.assertEquals("789", iter.next().get(0));
-    }
-
-    @Test
-    public void testNegativeUDFForwardingLoadCasterWithMultipleParams() throws Exception {
-        File inputfile = Util.createFile(new String[]{"123","456","789"});
-
-        pig.registerQuery("A = load '"
-                + inputfile.toString()
-                + "' using PigStorage() as (a1:bytearray);\n");
-        pig.registerQuery("B = load '"
-                + inputfile.toString()
-                + "' using org.apache.pig.test.TestLineageFindRelVisitor$SillyLoaderWithLoadCasterWithExtraConstructor2() as (b1:bytearray);\n");
-        pig.registerQuery("C = join A by a1, B by b1;\n");
-        pig.registerQuery("D = FOREACH C GENERATE TOTUPLE(a1,b1) as tupleD;\n");
-        pig.registerQuery("E = FOREACH D GENERATE (chararray) tupleD.a1;\n");
-        try {
-            Iterator<Tuple> iter  = pig.openIterator("E");
-
-            // this should fail since above typecast cannot determine which
-            // loadcaster to use (one from PigStroage and another from
-            // SillyLoaderWithLoadCasterWithExtraConstructor2)
-            fail("Above typecast should fail since it cannot determine which loadcaster to use.");
-        } catch (IOException e) {
-            Assert.assertTrue(e.getMessage().contains("Unable to open iterator for alias E"));
-        }
-
-
-    }
-
-    /**
-     * Find all casts in the plan (Copied from TestTypeCheckingValidatorNewLP.java)
-     */
-    class CastFinder extends AllExpressionVisitor {
-        List<CastExpression> casts = new ArrayList<CastExpression>();
-
-        public CastFinder(OperatorPlan plan)
-                throws FrontendException {
-            super(plan, new DependencyOrderWalker(plan));
-        }
-
-        @Override
-        protected LogicalExpressionVisitor getVisitor(
-                LogicalExpressionPlan exprPlan) throws FrontendException {
-            return new CastExpFinder(exprPlan, new ReverseDependencyOrderWalker(exprPlan));
-        }
-
-        class CastExpFinder extends LogicalExpressionVisitor{
-            protected CastExpFinder(OperatorPlan p, PlanWalker walker)
-            throws FrontendException {
-                super(p, walker);
-            }
-
-            @Override
-            public void visit(CastExpression cExp){
-                casts.add(cExp);
-            }
-        }
+        Assert.assertEquals("Loader should be instantiated at most once.", SillyLoaderWithLoadCasterWithExtraConstructor.counter, 1);
     }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLoad.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLoad.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLoad.java Fri Feb 24 03:34:37 2017
@@ -67,8 +67,6 @@ public class TestLoad {
 
     static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
-    private static final String WORKING_DIR = "/tmp/test" + java.util.UUID.randomUUID();
-
     @Before
     public void setUp() throws Exception {
         FileLocalizer.deleteTempFiles();
@@ -120,7 +118,7 @@ public class TestLoad {
     public void testLoadRemoteRel() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("test", WORKING_DIR + "/test");
+            checkLoadPath("test","/tmp/test");
         }
     }
 
@@ -129,7 +127,7 @@ public class TestLoad {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
             boolean noConversionExpected = true;
-            checkLoadPath(WORKING_DIR + "/test", WORKING_DIR + "/test", noConversionExpected);
+            checkLoadPath("/tmp/test","/tmp/test", noConversionExpected);
         }
     }
 
@@ -137,7 +135,7 @@ public class TestLoad {
     public void testLoadRemoteRelScheme() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("test", WORKING_DIR + "/test");
+            checkLoadPath("test","/tmp/test");
         }
     }
 
@@ -145,11 +143,11 @@ public class TestLoad {
     public void testLoadRemoteAbsScheme() throws Exception {
         pc = servers[0].getPigContext();
         boolean noConversionExpected = true;
-        checkLoadPath("hdfs:" + WORKING_DIR + "/test","hdfs:" + WORKING_DIR + "/test", noConversionExpected);
+        checkLoadPath("hdfs:/tmp/test","hdfs:/tmp/test", noConversionExpected);
 
         // check if a location 'hdfs:<abs path>' can actually be read using PigStorage
         String[] inputFileNames = new String[] {
-                WORKING_DIR + "/TestLoad-testLoadRemoteAbsSchema-input.txt"};
+                "/tmp/TestLoad-testLoadRemoteAbsSchema-input.txt"};
         testLoadingMultipleFiles(inputFileNames, "hdfs:" + inputFileNames[0]);
     }
 
@@ -164,7 +162,7 @@ public class TestLoad {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
             boolean noConversionExpected = true;
-            checkLoadPath(WORKING_DIR + "/foo/../././", WORKING_DIR + "/foo/.././.", noConversionExpected);
+            checkLoadPath("/tmp/foo/../././","/tmp/foo/.././.", noConversionExpected);
         }
     }
 
@@ -172,7 +170,7 @@ public class TestLoad {
     public void testGlobChars() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("t?s*", WORKING_DIR + "/t?s*");
+            checkLoadPath("t?s*","/tmp/t?s*");
         }
     }
 
@@ -180,7 +178,7 @@ public class TestLoad {
     public void testCommaSeparatedString() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("usr/pig/a,b", WORKING_DIR + "/usr/pig/a,"+ WORKING_DIR + "/b");
+            checkLoadPath("usr/pig/a,usr/pig/b","/tmp/usr/pig/a,/tmp/usr/pig/b");
         }
     }
 
@@ -188,7 +186,7 @@ public class TestLoad {
     public void testCommaSeparatedString2() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("t?s*,test", WORKING_DIR + "/t?s*,"+ WORKING_DIR + "/test");
+            checkLoadPath("t?s*,test","/tmp/t?s*,/tmp/test");
         }
     }
 
@@ -198,14 +196,14 @@ public class TestLoad {
         PigServer pig = servers[0];
         pc = pig.getPigContext();
         boolean noConversionExpected = true;
-        checkLoadPath("hdfs:"+ WORKING_DIR + "/test,hdfs:" + WORKING_DIR + "/test2,hdfs:" + WORKING_DIR + "/test3",
-                "hdfs:" + WORKING_DIR + "/test,hdfs:" + WORKING_DIR + "/test2,hdfs:" + WORKING_DIR + "/test3", noConversionExpected );
+        checkLoadPath("hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3",
+                "hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3", noConversionExpected );
 
         // check if a location 'hdfs:<abs path>,hdfs:<abs path>' can actually be
         // read using PigStorage
         String[] inputFileNames = new String[] {
-                WORKING_DIR + "/TestLoad-testCommaSeparatedString3-input1.txt",
-                WORKING_DIR + "/TestLoad-testCommaSeparatedString3-input2.txt"};
+                "/tmp/TestLoad-testCommaSeparatedString3-input1.txt",
+                "/tmp/TestLoad-testCommaSeparatedString3-input2.txt"};
         String inputString = "hdfs:" + inputFileNames[0] + ",hdfs:" +
         inputFileNames[1];
         testLoadingMultipleFiles(inputFileNames, inputString);
@@ -216,7 +214,7 @@ public class TestLoad {
     public void testCommaSeparatedString4() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("usr/pig/{a,c},usr/pig/b", WORKING_DIR + "/usr/pig/{a,c}," + WORKING_DIR + "/usr/pig/b");
+            checkLoadPath("usr/pig/{a,c},usr/pig/b","/tmp/usr/pig/{a,c},/tmp/usr/pig/b");
         }
     }
 
@@ -224,18 +222,18 @@ public class TestLoad {
     public void testCommaSeparatedString5() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("/usr/pig/{a,c},b", "/usr/pig/{a,c}," + WORKING_DIR + "/b");
+            checkLoadPath("/usr/pig/{a,c},usr/pig/b","/usr/pig/{a,c},/tmp/usr/pig/b");
         }
 
         // check if a location '<abs path>,<relative path>' can actually be
         // read using PigStorage
-        String loadLocationString = WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input{1,2}.txt," +
-        "TestLoad-testCommaSeparatedStringMixed-input3.txt"; // current working dir is set to WORKING_DIR in checkLoadPath()
+        String loadLocationString = "/tmp/TestLoad-testCommaSeparatedStringMixed-input{1,2}.txt," +
+        "TestLoad-testCommaSeparatedStringMixed-input3.txt"; // current working dir is set to /tmp in checkLoadPath()
 
         String[] inputFileNames = new String[] {
-                WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input1.txt",
-                WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input2.txt",
-                WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input3.txt",};
+                "/tmp/TestLoad-testCommaSeparatedStringMixed-input1.txt",
+                "/tmp/TestLoad-testCommaSeparatedStringMixed-input2.txt",
+                "/tmp/TestLoad-testCommaSeparatedStringMixed-input3.txt",};
         pc = servers[0].getPigContext(); // test in map reduce mode
         testLoadingMultipleFiles(inputFileNames, loadLocationString);
     }
@@ -244,7 +242,7 @@ public class TestLoad {
     public void testCommaSeparatedString6() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("usr/pig/{a,c},/usr/pig/b", WORKING_DIR + "/usr/pig/{a,c},/usr/pig/b");
+            checkLoadPath("usr/pig/{a,c},/usr/pig/b","/tmp/usr/pig/{a,c},/usr/pig/b");
         }
     }
 
@@ -252,7 +250,7 @@ public class TestLoad {
     public void testNonDfsLocation() throws Exception {
         String nonDfsUrl = "har:///user/foo/f.har";
         String query = "a = load '" + nonDfsUrl + "' using PigStorage('\t','-noschema');" +
-                       "store a into 'pigoutput';";
+                       "store a into 'output';";
         LogicalPlan lp = Util.buildLp(servers[1], query);
         LOLoad load = (LOLoad) lp.getSources().get(0);
         nonDfsUrl = nonDfsUrl.replaceFirst("/$", "");
@@ -310,7 +308,7 @@ public class TestLoad {
             pc.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + b);
 
             DataStorage dfs = pc.getDfs();
-            dfs.setActiveContainer(dfs.asContainer(WORKING_DIR));
+            dfs.setActiveContainer(dfs.asContainer("/tmp"));
             Map<String, String> fileNameMap = new HashMap<String, String>();
 
             QueryParserDriver builder = new QueryParserDriver(pc, "Test-Load", fileNameMap);

Modified: pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java Fri Feb 24 03:34:37 2017
@@ -45,8 +45,12 @@ public abstract class TestLoaderStorerSh
                 "store a into 'ooo';";
         PhysicalPlan pp = Util.buildPp(pigServer, query);
 
-        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
-                "hive-shims-0.23", "hive-shims-common", "kryo"};
+        String hadoopVersion = "20S";
+        if (Utils.isHadoop23() || Utils.isHadoop2()) {
+            hadoopVersion = "23";
+        }
+        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", 
+                "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
 
         checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
     }
@@ -57,8 +61,12 @@ public abstract class TestLoaderStorerSh
                 "store a into 'ooo' using OrcStorage;";
         PhysicalPlan pp = Util.buildPp(pigServer, query);
 
-        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
-                "hive-shims-0.23", "hive-shims-common", "kryo"};
+        String hadoopVersion = "20S";
+        if (Utils.isHadoop23() || Utils.isHadoop2()) {
+            hadoopVersion = "23";
+        }
+        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", 
+                "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
 
         checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
     }

Modified: pig/branches/spark/test/org/apache/pig/test/TestLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLocal.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLocal.java Fri Feb 24 03:34:37 2017
@@ -39,7 +39,6 @@ import org.apache.pig.builtin.PigStorage
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -167,8 +166,7 @@ public class TestLocal {
         public Tuple getNext() throws IOException {
             if (count < COUNT) {
 
-                   Tuple t = new DefaultTuple();
-                   t.append(Integer.toString(count++));
+                   Tuple t = TupleFactory.getInstance().newTuple(Integer.toString(count++));
                    return t;
 
             }

Modified: pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java Fri Feb 24 03:34:37 2017
@@ -18,9 +18,7 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -28,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import junit.framework.Assert;
 import junit.framework.AssertionFailedError;
 
 import org.apache.hadoop.fs.Path;
@@ -50,8 +49,8 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.GFAny;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.logical.expression.ConstantExpression;
@@ -71,14 +70,12 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestLogicalPlanBuilder {
-    PigContext pigContext = null;
+    PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
     private PigServer pigServer = null;
 
     @Before
     public void setUp() throws Exception {
-        pigContext = new PigContext(Util.getLocalTestMode(), new Properties());
     	pigServer = new PigServer( pigContext );
-        pigServer.setValidateEachStatement(true);
     	pigContext.connect();
     }
 
@@ -172,35 +169,60 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail1() throws Exception {
         String query = " foreach (group (A = load 'a') by $1) generate A.'1' ;";
-        buildPlan(query);
+        try {
+            buildPlan(query);
+        } catch (AssertionFailedError e) {
+            return;
+        }
+        Assert.fail("Test case should fail" );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail2() throws Exception {
         String query = "foreach group (load 'a') by $1 generate $1.* ;";
-        buildPlan(query);
+        try {
+            buildPlan(query);
+        } catch (AssertionFailedError e) {
+        	return;
+        }
+        Assert.fail("Test case should fail" );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail3() throws Exception {
         String query = "A = generate DISTINCT foreach (load 'a');";
-        LogicalPlan lp = buildPlan(query);
-        System.out.println( lp.toString() );
+        try {
+            LogicalPlan lp = buildPlan(query);
+            System.out.println( lp.toString() );
+        } catch (AssertionFailedError e) {
+        	return;
+        }
+        Assert.fail("Test case should fail" );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail4() throws Exception {
         String query = "A = generate [ORDER BY $0][$3, $4] foreach (load 'a');";
-        buildPlan(query);
+        try {
+            buildPlan(query);
+        } catch (AssertionFailedError e) {
+        	return;
+        }
+        Assert.fail("Test case should fail" );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail5() throws Exception {
         String query = "A = generate " + TestApplyFunc.class.getName() + "($2.*) foreach (load 'a');";
-        buildPlan(query);
+        try {
+            buildPlan(query);
+        } catch (AssertionFailedError e) {
+        	return;
+        }
+        Assert.fail("Test case should fail" );
     }
 
     /**
@@ -233,9 +255,11 @@ public class TestLogicalPlanBuilder {
         List<Operator> listOp = lp.getSuccessors(root);
         Operator lo = listOp.get(0);
 
-        assertTrue(lo instanceof LOCogroup);
-        assertEquals( 16, ((LOCogroup) lo).getRequestedParallelism() );//Local mode, paraallel = 1
-
+        if (lo instanceof LOCogroup) {
+            Assert.assertEquals( 16, ((LOCogroup) lo).getRequestedParallelism() );
+        } else {
+            Assert.fail("Error: Unexpected Parse Tree output");
+    }
     }
 
     @Test
@@ -270,15 +294,14 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test(expected = FrontendException.class)
-    public void testQueryFail22() throws Exception {
-        String query = "A = load 'a';" +
+    @Test
+    public void testQuery22Fail() throws Exception {
+        String query = "A = load 'a' as (a:int, b: double);" +
                        "B = group A by (*, $0);";
         try {
             buildPlan(query);
-        } catch (Exception e) {
-            assertTrue(e.getCause().getMessage().contains("Grouping attributes can either be star (*"));
-            throw e;
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
         }
     }
 
@@ -304,40 +327,50 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQuery23Fail() throws Exception {
         String query = "A = load 'a' as (a: int, b:double);" +
                        "B = load 'b';" +
                        "C = cogroup A by (*, $0), B by ($0, $1);";
+        boolean exceptionThrown = false;
         try {
             buildPlan(query);
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
                         "do not match"));
-            throw e;
+            exceptionThrown = true;
         }
+        Assert.assertTrue(exceptionThrown);
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQuery23Fail2() throws Exception {
         String query = "A = load 'a';" +
                        "B = load 'b';" +
                        "C = cogroup A by (*, $0), B by ($0, $1);";
-        buildPlan(query);
+        boolean exceptionThrown = false;
+        try {
+            buildPlan(query);
+        } catch (AssertionFailedError e) {
+            exceptionThrown = true;
+        }
+        Assert.assertTrue(exceptionThrown);
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQuery23Fail3() throws Exception {
         String query = "A = load 'a' as (a: int, b:double);" +
                        "B = load 'b' as (a:int);" +
                        "C = cogroup A by *, B by *;";
+        boolean exceptionThrown = false;
         try {
             buildPlan(query);
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
                         "do not match"));
-            throw e;
+            exceptionThrown = true;
         }
+        Assert.assertTrue(exceptionThrown);
     }
 
     @Test
@@ -466,10 +499,15 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail37() throws Exception {
         String query = "A = load 'a'; asdasdas";
-        buildPlan(query);
+        try{
+            buildPlan(query);
+        }catch(AssertionFailedError e){
+            return;
+        }
+        Assert.fail( "Query should fail." );
     }
 
     @Test
@@ -490,14 +528,18 @@ public class TestLogicalPlanBuilder {
         buildPlan( query );
     }
 
-    @Test(expected = FrontendException.class)
-    public void testQuery39Fail() throws Exception{
+    @Test
+    public void testQueryFail39() throws Exception{
         String query = "a = load 'a' as (url, host, ranking);" +
                        "b = group a by (url,host); " +
         "c = foreach b generate flatten(group.url), SUM(a.ranking) as totalRank;" +
                        "d = filter c by totalRank > '10';" +
                        "e = foreach d generate url;";
-        buildPlan(query);
+        try {
+            buildPlan(query);//url has been falttened and hence the failure
+        } catch(AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Exception"));
+        }
     }
 
     @Test
@@ -506,15 +548,20 @@ public class TestLogicalPlanBuilder {
         buildPlan( query +"a = FILTER (load 'a') BY (IsEmpty($2) AND ($3 == $2));" );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail41() throws Exception {
-        buildPlan("a = load 'a';" + "b = a as (host,url);");
+        try {
+            buildPlan("a = load 'a';" + "b = a as (host,url);");
+        } catch (AssertionFailedError e) {
+            return;
+        }
         // TODO
         // the following statement was earlier present
         // eventually when we do allow assignments of the form
         // above, we should test with the line below
         // uncommented
         //buildPlan("foreach b generate host;");
+        Assert.fail( "Query should fail." );
     }
 
     @Test
@@ -535,12 +582,17 @@ public class TestLogicalPlanBuilder {
         buildPlan( q );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail43() throws Exception {
         String q = "a = load 'a' as (name, age, gpa);" +
         "b = load 'b' as (name, height);";
-        String query = q + "c = cogroup a by (name, age), b by (height);";
-        buildPlan(query);
+        try {
+            String query = q + "c = cogroup a by (name, age), b by (height);";
+            buildPlan(query);
+        } catch (AssertionFailedError e) {
+            return;
+        }
+        Assert.fail( "Query should fail." );
     }
 
     @Test
@@ -552,13 +604,22 @@ public class TestLogicalPlanBuilder {
         buildPlan( q );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail44() throws Throwable {
         PigServer pig = null;
-        pig = new PigServer(Util.getLocalTestMode());
+        try {
+            pig = new PigServer("local");
+        } catch (IOException e) {
+            Assert.assertTrue(false);  // pig server failed for some reason
+        }
         pig.registerFunction("myTr",
             new FuncSpec(GFAny.class.getName() + "('tr o 0')"));
-        pig.registerQuery("b = foreach (load 'a') generate myTr(myTr(*));");
+        try{
+            pig.registerQuery("b = foreach (load 'a') generate myTr(myTr(*));");
+        }catch(Exception e){
+            return;
+        }
+        Assert.assertTrue(false);
     }
 
     @Test
@@ -575,11 +636,15 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test(expected = FrontendException.class)
+	@Test
     public void testQueryFail58() throws Exception{
         String query = "a = load 'a' as (url, host, ranking);" +
         "b = group a by url; ";
-        buildPlan(query + "c = foreach b generate group.url;");
+        try {
+            buildPlan(query + "c = foreach b generate group.url;");
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Exception"));
+        }
     }
 
     @Test
@@ -620,13 +685,17 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail62() throws Exception {
         String query = "a = load 'a' as (name, age, gpa);" +
         "b = load 'b' as (name, height);" +
         "c = cross a,b;" +
         "d = order c by name, b::name, height, a::gpa;";
-        buildPlan(query);
+        try {
+            buildPlan(query);
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Exception"));
+        }
     }
 
     @Test
@@ -639,9 +708,13 @@ public class TestLogicalPlanBuilder {
     }
 
     @Test
-    public void testQuery63a() throws Exception {
+    public void testQueryFail63() throws Exception {
         String query = "foreach (load 'myfile' as (col1, col2 : (sub1, sub2), col3 : (bag1))) generate col1 ;";
-        buildPlan(query);
+        try {
+        	buildPlan(query);
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Exception"));
+        }
     }
 
     @Test
@@ -655,10 +728,15 @@ public class TestLogicalPlanBuilder {
         buildPlan(query);
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail64() throws Exception {
         String query = "foreach (load 'myfile' as (col1, col2 : bag{age: int})) generate col1 ;";
-        buildPlan(query);
+        try {
+        	buildPlan(query);
+        } catch (AssertionFailedError e) {
+            return;
+        }
+        Assert.fail( "query should fail" );
     }
 
     @Test
@@ -671,12 +749,16 @@ public class TestLogicalPlanBuilder {
 	}
 
     @Test
-    public void testQuery66() throws Exception {
+    public void testQueryFail65() throws Exception {
         String q = "a = load 'a' as (name, age, gpa);" +
         "b = load 'b' as (name, height);" +
-        "c = cogroup a by (name, age), b by (name, height);" +
-        "d = foreach c generate group.name, a.name, b.height as age, a.age;";
-        buildPlan( q );
+		"c = cogroup a by (name, age), b by (name, height);" +
+	    "d = foreach c generate group.name, a.name, b.height as age, a.age;";
+        try {
+        	buildPlan( q );
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Exception"));
+        }
 	}
 
     @Test
@@ -708,23 +790,28 @@ public class TestLogicalPlanBuilder {
         buildPlan( q );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail67() throws Exception {
         String q = " a = load 'input1' as (name, age, gpa);" +
         " b = foreach a generate age, age * 10L, gpa/0.2f, {16, 4.0e-2, 'hello'};";
         try {
             buildPlan(q);
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Pig script failed to parse: MismatchedTokenException"));
-            throw e;
+        } catch (AssertionFailedError e) {
+            return;
         }
+        Assert.fail( "query should fail" );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail68() throws Exception {
         String q = " a = load 'input1' as (name, age, gpa);";
-        buildPlan( q +
-          " b = foreach a generate {(16 L, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12L, {()})};");
+        try {
+        	buildPlan( q +
+            " b = foreach a generate {(16 L, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12L, {()})};");
+        } catch (AssertionFailedError e) {
+           return;
+        }
+        Assert.fail( "query should fail" );
     }
 
     @Test
@@ -754,24 +841,24 @@ public class TestLogicalPlanBuilder {
         String q = "split (load 'a') into x if $0 > '7', y if $0 < '7';";
         try {
             buildPlan( q + "c = foreach y generate (bag)$1;");
-        } catch (FrontendException e) {
+        } catch (AssertionFailedError e) {
         	catchEx = true;
         }
-        assertTrue( catchEx );
+        Assert.assertTrue( catchEx );
         catchEx = false;
         try {
         	buildPlan( q + "c = foreach y generate (bag{int, float})$1;");
-        } catch (FrontendException e) {
+        } catch (AssertionFailedError e) {
         	catchEx = true;
         }
-        assertTrue( catchEx );
+        Assert.assertTrue( catchEx );
         catchEx = false;
         try {
         	buildPlan( q + "c = foreach y generate (tuple)$1;");
-        } catch (FrontendException e) {
+        } catch (AssertionFailedError e) {
         	catchEx = true;
         }
-        assertTrue( catchEx );
+        Assert.assertTrue( catchEx );
     }
 
     @Test
@@ -817,12 +904,12 @@ public class TestLogicalPlanBuilder {
     	buildPlan( q );
     }
 
-    @Test
+    @Test 
     public void testQuery80() throws Exception {
     	String q = "a = load 'input1' as (name, age, gpa);" +
         "b = filter a by age < '20';" +
         "c = group b by age;" +
-        "d = foreach c {"
+        "d = foreach c {" 
             + "cf = filter b by gpa < '3.0';"
             + "cp = cf.gpa;"
             + "cd = distinct cp;"
@@ -840,10 +927,15 @@ public class TestLogicalPlanBuilder {
     	buildPlan( q );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail81() throws Exception {
         String q = "a = load 'input1' using PigStorage() as (name, age, gpa);";
-        buildPlan(q + "split a into b if name lt 'f', c if (name ge 'f' and name le 'h'), d if name gt 'h';");
+        try {
+            buildPlan(q + "split a into b if name lt 'f', c if (name ge 'f' and name le 'h'), d if name gt 'h';");
+        } catch (AssertionFailedError e) {
+            return;
+        }
+        Assert.fail( "Query should fail." );
     }
 
     @Test
@@ -859,15 +951,19 @@ public class TestLogicalPlanBuilder {
     }
 
     @Test
-    public void testQuery82a() throws Exception {
+    public void testQueryFail82() throws Exception {
     	String q = "a = load 'myfile';" +
-        "b = group a by $0;" +
+        "b = group a by $0;" + 
         "c = foreach b {"
-            + "c1 = order $1 by *;"
-            + "c2 = $1;"
+            + "c1 = order $1 by *;" 
+            + "c2 = $1;" 
             + "generate flatten(c1), c2;"
             + "};";
+        try {
         buildPlan(q);
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Exception"));
+        }
     }
 
     @Test
@@ -875,7 +971,7 @@ public class TestLogicalPlanBuilder {
     	String q = "a = load 'input1' as (name, age, gpa);" +
         "b = filter a by age < '20';" +
         "c = group b by (name,age);" +
-        "d = foreach c {"
+        "d = foreach c {" 
             + "cf = filter b by gpa < '3.0';"
             + "cp = cf.gpa;"
             + "cd = distinct cp;"
@@ -912,7 +1008,7 @@ public class TestLogicalPlanBuilder {
         LogicalSchema actual = cogroup.getSchema();
         System.out.println( actual.toString( false ) );
 
-        assertEquals("group:tuple(name:bytearray,age:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}", actual.toString(false));
+        Assert.assertTrue(  actual.toString( false ).equals( "group:tuple(name:bytearray,age:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}" ) );
 
         lp = buildPlan(query +
         		       "c = foreach b generate group.name, group.age, COUNT(a.gpa);" +
@@ -921,7 +1017,7 @@ public class TestLogicalPlanBuilder {
         LOForEach foreach  = (LOForEach) lp.getPredecessors(store).get(0);
 
 
-        assertEquals("name:bytearray,age:bytearray,:long", foreach.getSchema().toString(false));
+        Assert.assertTrue( foreach.getSchema().toString( false ).equals("name:bytearray,age:bytearray,:long") );
     }
 
     @Test
@@ -950,7 +1046,7 @@ public class TestLogicalPlanBuilder {
 
         Schema cogroupExpectedSchema = new Schema(groupFs);
         cogroupExpectedSchema.add(bagFs);
-        assertEquals("group:tuple(name:chararray,age:int),a:bag{:tuple(name:chararray,age:int,gpa:float)}", cogroup.getSchema().toString(false));
+        Assert.assertTrue(cogroup.getSchema().toString(false).equals("group:tuple(name:chararray,age:int),a:bag{:tuple(name:chararray,age:int,gpa:float)}"));
     }
 
     @Test
@@ -966,7 +1062,7 @@ public class TestLogicalPlanBuilder {
         LOGenerate gen = (LOGenerate) nestedPlan.getSinks().get(0);
         LOSort nestedSort = (LOSort)nestedPlan.getPredecessors(gen).get(0);
         LogicalExpressionPlan sortPlan = nestedSort.getSortColPlans().get(0);
-        assertEquals(1, sortPlan.getSinks().size());
+        Assert.assertTrue(sortPlan.getSinks().size() == 1);
     }
 
     @Test
@@ -985,21 +1081,20 @@ public class TestLogicalPlanBuilder {
 
     @Test
     public void testQuery89() throws Exception {
-        String query = "a = load 'myfile';" +
+        String query = "a = load 'myfile';" + 
                        "b = foreach a generate $0, $100;" +
                        "c = load 'myfile' as (i: int);" +
                        "d = foreach c generate $0 as zero, i;";
         buildPlan( query );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail89() throws Exception {
         String q = "c = load 'myfile' as (i: int);";
         try {
             buildPlan(q + "d = foreach c generate $0, $5;");
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Out of bound access"));
-            throw e;
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Out of bound access"));
         }
     }
 
@@ -1011,20 +1106,20 @@ public class TestLogicalPlanBuilder {
         String query = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
                        "b = group a by (name, age);";
         //the first and second elements in group, i.e., name and age are renamed as myname and myage
-        lp = buildPlan(query +
+        lp = buildPlan(query + 
         		"c = foreach b generate flatten(group) as (myname, myage), COUNT(a) as mycount;" +
         		"store c into 'output';");
         Operator store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        assertTrue(foreach.getSchema().isEqual(Utils.parseSchema("myname: chararray, age: int, mycount: long")));
+        Assert.assertTrue(foreach.getSchema().isEqual(Utils.parseSchema("myname: chararray, age: int, mycount: long")));
 
         //the schema of group is unchanged
-        lp = buildPlan( query +
+        lp = buildPlan( query + 
         		"c = foreach b generate flatten(group), COUNT(a) as mycount;" +
         		"store c into 'output';" );
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        assertEquals("group::name:chararray,group::age:int,mycount:long", foreach.getSchema().toString(false));
+        Assert.assertTrue(foreach.getSchema().toString( false ).equals("group::name:chararray,group::age:int,mycount:long"));
 
         //group is renamed as mygroup
         lp = buildPlan(query +
@@ -1032,7 +1127,7 @@ public class TestLogicalPlanBuilder {
         		"store c into 'output';");
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        assertEquals("mygroup:tuple(name:chararray,age:int),mycount:long", foreach.getSchema().toString(false));
+        Assert.assertTrue(foreach.getSchema().toString( false ).equals("mygroup:tuple(name:chararray,age:int),mycount:long"));
 
         //group is renamed as mygroup and the elements are renamed as myname and myage
         lp = buildPlan(query +
@@ -1040,7 +1135,8 @@ public class TestLogicalPlanBuilder {
         	    "store c into 'output';");
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        assertEquals("mygroup:tuple(myname:chararray,myage:int),mycount:long",foreach.getSchema().toString(false));
+        Assert.assertTrue(foreach.getSchema().toString( false ).equals("mygroup:tuple(myname:chararray,myage:int),mycount:long"));
+        /*
         //setting the schema of flattened bag that has no schema with the user defined schema
         String q = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
                    "c = load 'another_file';" +
@@ -1048,88 +1144,73 @@ public class TestLogicalPlanBuilder {
         lp = buildPlan( q + "e = foreach d generate flatten(DIFF(a, c)) as (x, y, z), COUNT(a) as mycount;" + "store e into 'output';" );
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        assertEquals("x:bytearray,y:bytearray,z:bytearray,mycount:long",foreach.getSchema().toString(false));
+        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: bytearray, y: bytearray, z: bytearray, mycount: long")));
 
         //setting the schema of flattened bag that has no schema with the user defined schema
         q = query +
                   "c = load 'another_file';" +
                   "d = cogroup a by $0, c by $0;" +
-                  "e = foreach d generate flatten(DIFF(a, c)) as (x: int, y: float, z), COUNT(a) as mycount;" +
-                  "store e into 'output';";
+                  "e = foreach d generate flatten(DIFF(a, c)) as (x: int, y: float, z), COUNT(a) as mycount;";
         lp = buildPlan(q);
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        assertEquals("x:int,y:float,z:bytearray,mycount:long",foreach.getSchema().toString(false));
+        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: int, y: float, z: bytearray, mycount: long")));
 
         //setting the schema of flattened bag that has no schema with the user defined schema
         q = query +
             "c = load 'another_file';" +
             "d = cogroup a by $0, c by $0;" +
-            "e = foreach d generate flatten(DIFF(a, c)) as x, COUNT(a) as mycount;" +
-            "store e into 'output';";
+            "e = foreach d generate flatten(DIFF(a, c)) as x, COUNT(a) as mycount;";
         lp = buildPlan(q);
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        assertEquals("x:bytearray,mycount:long",foreach.getSchema().toString(false));
+        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: bytearray, mycount: long")));
 
         //setting the schema of flattened bag that has no schema with the user defined schema
-        q = query +
+        q = query + 
             "c = load 'another_file';" +
             "d = cogroup a by $0, c by $0;" +
-            "e = foreach d generate flatten(DIFF(a, c)) as x: int, COUNT(a) as mycount;" +
-            "store e into 'output';";
+            "e = foreach d generate flatten(DIFF(a, c)) as x: int, COUNT(a) as mycount;";
         lp = buildPlan(q);
         store = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        assertEquals("x:int,mycount:long",foreach.getSchema().toString(false));
+        Assert.assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: int, mycount: long")));
+         */
     }
 
-    /**
-     * Test for {@link org.apache.pig.newplan.logical.visitor.ForEachUserSchemaVisitor} visit() for inserting Cast operator into plan correctly.
-     * @throws Exception
-     */
     @Test
-    public void testQuery90a() throws Exception {
-        LogicalPlan lp = null;
-        LOForEach foreach = null;
-        Operator store = null;
-
+    public void testQueryFail90() throws Exception {
         String query = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
-                "b = group a by (name, age);";
-
-        // Simply should work renaming
-        lp = buildPlan( query + "c = foreach b generate group as mygroup:(myname, myage), COUNT(a) as mycount; store c into 'output';");
-        store = lp.getSinks().get(0);
-        foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        assertEquals("mygroup:tuple(myname:chararray,myage:int),mycount:long",foreach.getSchema().toString(false));
+                       "b = group a by (name, age);";
 
-        // Casting should work regardless of type mismatch
-        lp = buildPlan( query + "c = foreach b generate group as mygroup:(myname: int, myage), COUNT(a) as mycount; store c into 'output';");
-        store = lp.getSinks().get(0);
-        foreach = (LOForEach)lp.getPredecessors(store).get(0);
-        assertEquals("mygroup:tuple(myname:int,myage:int),mycount:long",foreach.getSchema().toString(false));
+        try {
+            buildPlan( query + "c = foreach b generate group as mygroup:(myname, myage), COUNT(a) as mycount;");
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Schema size mismatch"));
+        }
 
-        // Casting in final phase should work too (no succeeding operators after cast in final foreach)
-        buildPlan( query + "c = foreach b generate group as mygroup:(myname, myage: chararray), COUNT(a) as mycount;");
-    }
+        try {
+            buildPlan( query + "c = foreach b generate group as mygroup:(myname: int, myage), COUNT(a) as mycount;");
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Type mismatch"));
+        }
 
-    @Test
-    public void testQueryFail90() throws Exception {
-        String query = "a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);" +
-                       "b = group a by (name, age);";
+        try {
+            buildPlan( query + "c = foreach b generate group as mygroup:(myname, myage: chararray), COUNT(a) as mycount;");
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Type mismatch"));
+        }
 
         try {
             buildPlan( query + "c = foreach b generate group as mygroup:{t: (myname, myage)}, COUNT(a) as mycount;");
-            fail("Should have thrown error");
-        } catch (FrontendException e) {
-            assertTrue(e.getMessage().contains("Incompatible field schema"));
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Incompatible field schema"));
         }
 
         try {
             buildPlan( query + "c = foreach b generate flatten(group) as (myname, myage, mygpa), COUNT(a) as mycount;");
-            fail("Should have thrown error");
-        } catch (FrontendException e) {
-            assertTrue(e.getMessage().contains("Incompatible schema"));
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Incompatible schema"));
         }
     }
 
@@ -1165,7 +1246,7 @@ public class TestLogicalPlanBuilder {
     }
 
     @Test
-    public void testQuery93_a() throws Exception {
+    public void testQueryFail93() throws Exception {
         String query = "a = load 'one' as (name, age, gpa);" +
         "b = group a by name;"+
         "c = foreach b generate flatten(a);"+
@@ -1189,7 +1270,7 @@ public class TestLogicalPlanBuilder {
         buildPlan( query );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQueryFail94() throws Exception {
         String query = "a = load 'one' as (name, age, gpa);" +
         "b = load 'two' as (name, age, somethingelse);"+
@@ -1199,9 +1280,8 @@ public class TestLogicalPlanBuilder {
         // test that we can refer to "a::name" field and not name
         try {
             buildPlan(query);
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Invalid field projection. Projected field [name] does not exist"));
-            throw e;
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Invalid field projection. Projected field [name] does not exist"));
         }
     }
 
@@ -1216,9 +1296,9 @@ public class TestLogicalPlanBuilder {
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         LOCogroup cogroup = (LOCogroup) lp.getPredecessors(foreach).get(0);
         String s = cogroup.getSchema().toString(false);
-        assertEquals("group:bytearray,a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}", s);
+        Assert.assertTrue( s.equals("group:bytearray,a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}"));
         s = foreach.getSchema().toString(false);
-        assertEquals("d::name:bytearray,d::age:bytearray,d::gpa:bytearray,max_age:double", s);
+        Assert.assertTrue( s.equals("d::name:bytearray,d::age:bytearray,d::gpa:bytearray,max_age:double"));
     }
 
     @Test
@@ -1239,27 +1319,27 @@ public class TestLogicalPlanBuilder {
         LogicalPlan foreachPlans = foreach.getInnerPlan();
         //        LogicalPlan flattenPlan = foreachPlans.get(1);
         //        LogicalOperator project = flattenPlan.getLeaves().get(0);
-        //        assertTrue(project instanceof LOProject);
+        //        Assert.assertTrue(project instanceof LOProject);
         //        LogicalOperator sort = flattenPlan.getPredecessors(project).get(0);
-        //        assertTrue(sort instanceof LOSort);
+        //        Assert.assertTrue(sort instanceof LOSort);
         //        LogicalOperator distinct = flattenPlan.getPredecessors(sort).get(0);
-        //        assertTrue(distinct instanceof LODistinct);
+        //        Assert.assertTrue(distinct instanceof LODistinct);
         //
         //        //testing the presence of the nested foreach
         //        LogicalOperator nestedForeach = flattenPlan.getPredecessors(distinct).get(0);
-        //        assertTrue(nestedForeach instanceof LOForEach);
+        //        Assert.assertTrue(nestedForeach instanceof LOForEach);
         //        LogicalPlan nestedForeachPlan = ((LOForEach)nestedForeach).getForEachPlans().get(0);
         //        LogicalOperator nestedProject = nestedForeachPlan.getRoots().get(0);
-        //        assertTrue(nestedProject instanceof LOProject);
-        //        assertTrue(((LOProject)nestedProject).getCol() == 2);
+        //        Assert.assertTrue(nestedProject instanceof LOProject);
+        //        Assert.assertTrue(((LOProject)nestedProject).getCol() == 2);
         //
         //        //testing the filter inner plan for the absence of the project connected to project
         //        LogicalOperator filter = flattenPlan.getPredecessors(nestedForeach).get(0);
-        //        assertTrue(filter instanceof LOFilter);
+        //        Assert.assertTrue(filter instanceof LOFilter);
         //        LogicalPlan comparisonPlan = ((LOFilter)filter).getComparisonPlan();
         //        LOLesserThan lessThan = (LOLesserThan)comparisonPlan.getLeaves().get(0);
         //        LOProject filterProject = (LOProject)lessThan.getLhsOperand();
-        //        assertTrue(null == comparisonPlan.getPredecessors(filterProject));
+        //        Assert.assertTrue(null == comparisonPlan.getPredecessors(filterProject));
     }
     /*
     @Test
@@ -1272,28 +1352,28 @@ public class TestLogicalPlanBuilder {
 
         lp = buildPlan(query + "b = foreach a generate 1;" + store);
         foreach = (LOForEach)lp.getPredecessors(op);
-//        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: int"), false, true));
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: int"), false, true));
 
         lp = buildPlan(query + "b = foreach a generate 1L;" + store);
         op = lp.getSinks().get(0);
         Operator op = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(op);
-//        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: long"), false, true));
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: long"), false, true));
 
         lp = buildPlan(query + "b = foreach a generate 1.0;" + store);
         op = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(op);
-//        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: double"), false, true));
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: double"), false, true));
 
         lp = buildPlan(query + "b = foreach a generate 1.0f;" + store);
         op = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(op);
-//        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: float"), false, true));
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: float"), false, true));
 
         lp = buildPlan(query + "b = foreach a generate 'hello';" + store);
         op = lp.getSinks().get(0);
         foreach = (LOForEach)lp.getPredecessors(op);
-//        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: chararray"), false, true));
+//        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: chararray"), false, true));
     }
 
     @Test
@@ -1305,31 +1385,31 @@ public class TestLogicalPlanBuilder {
 
         lp = buildPlan("b = foreach a generate (1);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: int)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: int)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1L);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: long)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: long)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1.0);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: double)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: double)"), false, true));
 
         lp = buildPlan("b = foreach a generate (1.0f);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: float)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: float)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello');");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello', 1, 1L, 1.0f, 1.0);");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, y: int, z: long, a: float, b: double)"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, y: int, z: long, a: float, b: double)"), false, true));
 
         lp = buildPlan("b = foreach a generate ('hello', {(1), (1.0)});");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, ib:{it:(d: double)})"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, ib:{it:(d: double)})"), false, true));
 
     }
 
@@ -1342,39 +1422,39 @@ public class TestLogicalPlanBuilder {
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (2, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: int, y: chararray)}"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: int, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (1L, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: long, y: chararray)}"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: long, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (1.0f, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1, 'hello'), (1.0, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1L, 'hello'), (1.0f, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1L, 'hello'), (1.0, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1.0f, 'hello'), (1.0, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1.0, 'hello'), (1.0f, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
 
         lp = buildPlan("b = foreach a generate {(1.0, 'hello', 3.14), (1.0f, 'world')};");
         foreach = (LOForEach) lp.getLeaves().get(0);
-        assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:()}"), false, true));
+        Assert.assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:()}"), false, true));
 
     }
 
@@ -1437,7 +1517,7 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        assertEquals("name:bytearray,age:bytearray,gpa:bytearray", s);
+        Assert.assertTrue( s.equals("name:bytearray,age:bytearray,gpa:bytearray"));
     }
 
     @Test
@@ -1448,19 +1528,19 @@ public class TestLogicalPlanBuilder {
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         LOGenerate gen = (LOGenerate) foreach.getInnerPlan().getSinks().get(0);
         LogicalExpressionPlan foreachPlan = gen.getOutputPlans().get(0);
-        assertTrue(checkPlanForProjectStar(foreachPlan));
+        Assert.assertTrue(checkPlanForProjectStar(foreachPlan));
     }
 
     @Test
     public void testQuery108()  throws Exception {
-        String query = "a = load 'one' as (name, age, gpa);" +
+        String query = "a = load 'one' as (name, age, gpa);" + 
         "b = group a by *;" +
         "store b into 'output';";
         LogicalPlan lp = buildPlan(query);
         Operator store = lp.getSinks().get(0);
         LOCogroup cogroup = (LOCogroup)lp.getPredecessors(store).get(0);
         String s = cogroup.getSchema().toString(false);
-        assertEquals("group:tuple(name:bytearray,age:bytearray,gpa:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}", s);
+        Assert.assertTrue(s.equals("group:tuple(name:bytearray,age:bytearray,gpa:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)}"));
     }
 
     @Test
@@ -1473,21 +1553,22 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOCogroup cogroup = (LOCogroup)lp.getPredecessors(store).get(0);
         String s = cogroup.getSchema().toString(false);
-        assertEquals("group:tuple(name:bytearray,age:bytearray,gpa:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)},b:bag{:tuple(first_name:bytearray,enrol_age:bytearray,high_school_gpa:bytearray)}", s);
+        Assert.assertTrue(s.equals("group:tuple(name:bytearray,age:bytearray,gpa:bytearray),a:bag{:tuple(name:bytearray,age:bytearray,gpa:bytearray)},b:bag{:tuple(first_name:bytearray,enrol_age:bytearray,high_school_gpa:bytearray)}"));
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testQuery110Fail()  throws Exception {
     	String query = "a = load 'one' as (name, age, gpa);" +
     	"b = load 'two';" + "c = cogroup a by $0, b by *;";
 
         try {
             buildPlan( query );
-        } catch(Exception e) {
-            assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only allowed " +
+        } catch(AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only allowed " +
             		"if the input has a schema" ) );
-            throw e;
+            return;
         }
+        Assert.fail( "Test case should fail." );
     }
 
     @Test
@@ -1500,13 +1581,13 @@ public class TestLogicalPlanBuilder {
         LOSort sort = (LOSort)lp.getPredecessors(store).get(0);
 
         for(LogicalExpressionPlan sortPlan: sort.getSortColPlans() ) {
-            assertFalse(checkPlanForProjectStar(sortPlan));
+            Assert.assertTrue(checkPlanForProjectStar(sortPlan) == false);
         }
     }
 
     @Test
     public void testQuery112()  throws Exception {
-        String query = "a = load 'one' as (name, age, gpa);" +
+        String query = "a = load 'one' as (name, age, gpa);" + 
         "b = group a by *;" +
         "c = foreach b {a1 = order a by *; generate a1;};" +
         "store c into 'y';";
@@ -1515,7 +1596,7 @@ public class TestLogicalPlanBuilder {
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         LOGenerate gen = (LOGenerate) foreach.getInnerPlan().getSinks().get(0);
         for(LogicalExpressionPlan foreachPlan: gen.getOutputPlans()) {
-            assertTrue(checkPlanForProjectStar(foreachPlan));
+            Assert.assertTrue(checkPlanForProjectStar(foreachPlan) == true);
         }
 
         LogicalPlan foreachPlan = foreach.getInnerPlan();
@@ -1524,13 +1605,13 @@ public class TestLogicalPlanBuilder {
         // project (*) operator here is translated to a list of projection
         // operators
         for(LogicalExpressionPlan sortPlan: sort.getSortColPlans()) {
-            assertFalse(checkPlanForProjectStar(sortPlan));
+            Assert.assertTrue(checkPlanForProjectStar(sortPlan) == false);
         }
     }
 
     @Test
     public void testQuery114()  throws Exception {
-        String query = "a = load 'one' as (name, age, gpa);" +
+        String query = "a = load 'one' as (name, age, gpa);" + 
         "b = foreach a generate " + Identity.class.getName() + "(name, age);" +
         "store b into 'y';";
 
@@ -1538,7 +1619,7 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        assertEquals(":tuple(name:bytearray,age:bytearray)", s);
+        Assert.assertTrue(s.equals(":tuple(name:bytearray,age:bytearray)"));
     }
 
     @Test
@@ -1551,7 +1632,7 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        assertEquals(":tuple(name:bytearray,age:bytearray,gpa:bytearray)", s);
+        Assert.assertTrue(s.equals(":tuple(name:bytearray,age:bytearray,gpa:bytearray)"));
     }
 
     @Test
@@ -1564,19 +1645,19 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        assertEquals(":tuple(:bytearray,:bytearray)", s);
+        Assert.assertTrue(s.equals(":tuple(:bytearray,:bytearray)"));
     }
 
     @Test
     public void testQuery117()  throws Exception {
-        String query = "a = load 'one';" +
+        String query = "a = load 'one';" + 
         "b = foreach a generate " + Identity.class.getName() + "(*);" +
         "store b into 'y';";
         LogicalPlan lp = buildPlan(query);
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        assertTrue(s.equals(":tuple()"));
+        Assert.assertTrue(s.equals(":tuple()"));
     }
 
     @Test
@@ -1627,7 +1708,7 @@ public class TestLogicalPlanBuilder {
     @Test
     public void testNullConsConcatSize() throws Exception {
     	String query = "a = load 'a' as (x:int, y:double, str:chararray);" +
-        "b = foreach a generate SIZE(null), CONCAT(str, null), " +
+        "b = foreach a generate SIZE(null), CONCAT(str, null), " + 
                 "CONCAT(null, str);" +
                 "store b into 'output';";
         buildPlan(query);
@@ -1689,7 +1770,7 @@ public class TestLogicalPlanBuilder {
     @Test
     public void testCast() throws Exception {
     	String query = "a = load 'one.txt' as (x,y); " +
-        "b = foreach a generate (int)$0, (double)$1;" +
+        "b = foreach a generate (int)$0, (double)$1;" + 
         "c = group b by $0;"+
         "store c into 'output';";
     	buildPlan(query);
@@ -1775,14 +1856,14 @@ public class TestLogicalPlanBuilder {
 
     @Test
     public void testTokenizeSchema()  throws Exception {
-        String query = "a = load 'one' as (f1: chararray);" +
+        String query = "a = load 'one' as (f1: chararray);" + 
         "b = foreach a generate TOKENIZE(f1);" +
         "store b into 'output';";
         LogicalPlan lp = buildPlan(query);
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach) lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        assertEquals("bag_of_tokenTuples_from_f1:bag{tuple_of_tokens:tuple(token:chararray)}", s);
+        Assert.assertTrue( s.equals("bag_of_tokenTuples_from_f1:bag{tuple_of_tokens:tuple(token:chararray)}"));
     }
 
     @Test
@@ -1794,8 +1875,8 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach) lp.getPredecessors(store).get(0);
         String s = foreach.getSchema().toString(false);
-        assertEquals("bag_of_tokenTuples_from_f1:bag{tuple_of_tokens:tuple(token:chararray)}"
-                +",bag_of_tokenTuples_from_f2:bag{tuple_of_tokens:tuple(token:chararray)}", s);
+        assertEquals(s, "bag_of_tokenTuples_from_f1:bag{tuple_of_tokens:tuple(token:chararray)}"
+        		+",bag_of_tokenTuples_from_f2:bag{tuple_of_tokens:tuple(token:chararray)}");
     }
 
     @Test
@@ -1807,15 +1888,15 @@ public class TestLogicalPlanBuilder {
         LOGenerate gen = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
         LogicalExpressionPlan exprPlan = gen.getOutputPlans().get(0);
         Operator logOp = exprPlan.getSources().get(0);
-        assertTrue(logOp instanceof ConstantExpression);
+        Assert.assertTrue( logOp instanceof ConstantExpression);
 
         ConstantExpression loConst = (ConstantExpression)logOp;
-        assertEquals(DataType.TUPLE, loConst.getType());
-        assertTrue(loConst.getValue() instanceof Tuple);
-        assertEquals(TupleFactory.getInstance().newTuple(), loConst.getValue());
+        Assert.assertTrue(loConst.getType() == DataType.TUPLE);
+        Assert.assertTrue(loConst.getValue() instanceof Tuple);
+        Assert.assertTrue(loConst.getValue().equals(TupleFactory.getInstance().newTuple()));
 
         String s = foreach.getSchema().toString(false);
-        assertEquals(":tuple()", s);
+        Assert.assertTrue( s.equals(":tuple()"));
     }
 
     @Test
@@ -1827,15 +1908,15 @@ public class TestLogicalPlanBuilder {
         LOGenerate gen = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
         LogicalExpressionPlan exprPlan = gen.getOutputPlans().get(0);
         Operator logOp = exprPlan.getSources().get(0);
-        assertTrue(logOp instanceof ConstantExpression);
+        Assert.assertTrue( logOp instanceof ConstantExpression);
 
         ConstantExpression loConst = (ConstantExpression)logOp;
-        assertEquals(DataType.MAP, loConst.getType());
-        assertTrue(loConst.getValue() instanceof Map);
-        assertEquals(new HashMap<String,Object>(), loConst.getValue());
+        Assert.assertTrue(loConst.getType() == DataType.MAP);
+        Assert.assertTrue(loConst.getValue() instanceof Map);
+        Assert.assertTrue(loConst.getValue().equals(new HashMap<String,Object>()));
 
         String s = foreach.getSchema().toString(false);
-        assertTrue( s.equals(":map"));
+        Assert.assertTrue( s.equals(":map"));
     }
 
     @Test
@@ -1848,14 +1929,15 @@ public class TestLogicalPlanBuilder {
         LOGenerate gen = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
         LogicalExpressionPlan exprPlan = gen.getOutputPlans().get(0);
         Operator logOp = exprPlan.getSources().get(0);
-        assertTrue(logOp instanceof ConstantExpression);
+        Assert.assertTrue( logOp instanceof ConstantExpression);
 
         ConstantExpression loConst = (ConstantExpression)logOp;
-        assertEquals(DataType.BAG, loConst.getType());
-        assertTrue(loConst.getValue() instanceof DataBag);
-        assertEquals(BagFactory.getInstance().newDefaultBag(), loConst.getValue());
+        Assert.assertTrue(loConst.getType() == DataType.BAG);
+        Assert.assertTrue(loConst.getValue() instanceof DataBag);
+        Assert.assertTrue(loConst.getValue().equals(BagFactory.getInstance().newDefaultBag()));
 
-        assertEquals(":bag{}", foreach.getSchema().toString(false));
+        String s = foreach.getSchema().toString(false);
+        Assert.assertTrue( s.equals(":bag{}") );
     }
 
     @Test
@@ -1866,7 +1948,8 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach) lp.getPredecessors(store).get(0);
 
-        assertEquals(":tuple(:tuple())", foreach.getSchema().toString(false));
+        String s = foreach.getSchema().toString(false);
+        Assert.assertTrue( s.equals(":tuple(:tuple())") );
     }
 
     @Test
@@ -1877,7 +1960,8 @@ public class TestLogicalPlanBuilder {
         Operator store = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach) lp.getPredecessors(store).get(0);
 
-        assertEquals(":tuple(:map)", foreach.getSchema().toString(false));
+        String s = foreach.getSchema().toString(false);
+        Assert.assertTrue( s.equals(":tuple(:map)") );
     }
 
     @Test
@@ -1888,7 +1972,8 @@ public class TestLogicalPlanBuilder {
         Operator op = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(op).get(0);
 
-        assertEquals(":tuple(:bag{})", foreach.getSchema().toString(false));
+        String s = foreach.getSchema().toString(false);
+        Assert.assertTrue( s.equals(":tuple(:bag{})") );
     }
 
     @Test
@@ -1899,7 +1984,8 @@ public class TestLogicalPlanBuilder {
         Operator op = lp.getSinks().get(0);
         LOForEach foreach = (LOForEach)lp.getPredecessors(op).get(0);
 
-        assertEquals(":bag{:tuple()}", foreach.getSchema().toString(false));
+        String s = foreach.getSchema().toString(false);
+        Assert.assertTrue( s.equals(":bag{:tuple()}") );
     }
 
     @Test
@@ -1928,36 +2014,42 @@ public class TestLogicalPlanBuilder {
         buildPlan( query );
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testCogroupByStarFailure1() throws Exception {
+        boolean exceptionThrown = false;
         try {
             String query = " a = load '1.txt' as (a0:int, a1:int);" +
             " b = load '2.txt'; " +
             "c = cogroup a by *, b by *;" +
             "store c into 'output';";
             buildPlan(query);
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only" +
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only" +
             		" allowed if the input has a schema"));
-            throw e;
+            exceptionThrown = true;
         }
+        Assert.assertEquals("An exception was expected but did " +
+                "not occur", true, exceptionThrown);
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testCogroupByStarFailure2() throws Exception {
+        boolean exceptionThrown = false;
         try {
             String query = " a = load '1.txt' ;" +
             " b = load '2.txt' as (b0:int, b1:int); " +
             "c = cogroup a by *, b by *;" +
             "store c into 'output';";
             buildPlan( query );
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only allowed if the input has a schema"));
-            throw e;
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("Cogroup/Group by '*' or 'x..' (range of columns to the end) is only allowed if the input has a schema"));
+            exceptionThrown = true;
         }
+        Assert.assertEquals("An exception was expected but did " +
+                "not occur", true, exceptionThrown);
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testMissingSemicolon() throws Exception {
         try {
             String query = "A = load '1.txt' \n" +
@@ -1965,27 +2057,32 @@ public class TestLogicalPlanBuilder {
                            "C = union A, B;\n" +
                            "store C into 'output';";
             buildPlan( query );
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("mismatched input 'B' expecting SEMI_COLON"));
-            throw e;
+        } catch (AssertionFailedError e) {
+            Assert.assertTrue(e.getMessage().contains("mismatched input 'B' expecting SEMI_COLON"));
+           return;
         }
+        Assert.fail("An exception was expected but did not occur");
     }
 
-    @Test(expected = FrontendException.class)
+    @Test
     public void testCogroupByIncompatibleSchemaFailure() throws Exception {
+        boolean exceptionThrown = false;
         try {
             String query = " a = load '1.txt' as (a0:int, a1:int);" +
             " b = load '2.txt' as (a0:int, a1:chararray); " +
-            "c = cogroup a by (a0,a1), b by (a0,a1);";
+            "c = cogroup a by (a0,a1), b by (a0,a1);" +
+            "store c into 'output';";
             buildPlan( query );
-        } catch (Exception e) {
+        } catch (AssertionFailedError e) {
             String msg =
                 "group column no. 2 in relation no. 2 of  group statement" +
                 " has datatype chararray which is incompatible with type of" +
                 " corresponding column in earlier relation(s) in the statement";
-            assertTrue(e.getCause().getMessage().contains(msg));
-            throw e;
+            Assert.assertTrue(e.getMessage().contains(msg));
+            exceptionThrown = true;
         }
+        Assert.assertEquals("An exception was expected but did " +
+                "not occur", true, exceptionThrown);
     }
 
     @Test
@@ -1997,7 +2094,7 @@ public class TestLogicalPlanBuilder {
         LOLoad load = (LOLoad)plan.getPredecessors(op).get(0);
         // the signature is now a unique string of the format "{alias}_{scope id}-{id}" example: "a_12-0"
         String udfContextSignature = ((PigStorageWithSchema)(load).getLoadFunc()).getUDFContextSignature();
-        assertTrue(udfContextSignature, udfContextSignature.matches("a_[0-9]*-[0-9]*"));
+        Assert.assertTrue(udfContextSignature, udfContextSignature.matches("a_[0-9]*-[0-9]*"));
 
         query = " b = load '1.txt' using org.apache.pig.test.PigStorageWithSchema();" +
                 "store b into 'output';";
@@ -2005,15 +2102,20 @@ public class TestLogicalPlanBuilder {
         op = plan.getSinks().get(0);
         load = (LOLoad)plan.getPredecessors(op).get(0);
         udfContextSignature = ((PigStorageWithSchema)(load).getLoadFunc()).getUDFContextSignature();
-        assertTrue(udfContextSignature, udfContextSignature.matches("b_[0-9]*-[0-9]*"));
+        Assert.assertTrue(udfContextSignature, udfContextSignature.matches("b_[0-9]*-[0-9]*"));
     }
 
     @Test
     public void testLastAlias() throws Exception {
-        String query = "B = load '2.txt' as (b0:int, b1:int);\n" +
-                "C = ORDER B by b0;";
-        buildPlan(query);
-        assertEquals("C", pigServer.getPigContext().getLastAlias());
+        try {
+            String query = "B = load '2.txt' as (b0:int, b1:int);\n" +
+            		"C = ORDER B by b0;" ;
+            buildPlan( query );
+
+        } catch (AssertionFailedError e) {
+            // Ignore the exception
+        }
+        Assert.assertEquals("C", pigServer.getPigContext().getLastAlias());
     }
     
     @Test
@@ -2072,25 +2174,7 @@ public class TestLogicalPlanBuilder {
         assertTrue("Sink must end with output", lData.getSinks().get(0).endsWith("output"));
         assertEquals("Number of logical relational operators must be 4", lData.getNumLogicalRelationOperators(), 4);
     }
-
-    @Test
-    public void testFlattenMap() throws Exception {
-       String query = "A = LOAD 'input.txt' as (rowId:int, dataMap:map[int]);" +
-               "B = FOREACH A GENERATE rowId, FLATTEN(dataMap);";
-
-        pigServer.registerQuery(query);
-        Schema schema = pigServer.dumpSchema("B");
-
-        assertEquals(3, schema.size());
-
-        assertEquals(DataType.INTEGER, schema.getField(0).type);
-        assertEquals("rowId", schema.getField(0).alias);
-
-        assertEquals(DataType.CHARARRAY, schema.getField(1).type);
-        assertEquals("dataMap::key", schema.getField(1).alias);
-        assertEquals(DataType.INTEGER, schema.getField(2).type);
-        assertEquals("dataMap::value", schema.getField(2).alias);
-    }
+    
     /**
      * This method is not generic. Expects logical plan to have atleast
      * 1 source and returns the corresponding FuncSpec.
@@ -2132,6 +2216,19 @@ public class TestLogicalPlanBuilder {
 
     // Helper Functions
     public LogicalPlan buildPlan(String query) throws Exception {
-        return Util.buildLp(pigServer, query);
+    	try {
+            return Util.buildLp(pigServer, query);
+    	} catch(Throwable t) {
+    	    PigException pigEx = LogUtils.getPigException(t);
+    	    Throwable cause = null;
+    	    if(pigEx != null){
+    	        cause = pigEx;
+    	    }else{
+    	        cause = t.getCause();
+    	    }
+    	    String msg = cause != null ? cause.toString() : t.toString();
+    	    throw new AssertionFailedError( msg );
+    	}
     }
-}
\ No newline at end of file
+
+}