You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2011/05/04 02:11:51 UTC

svn commit: r1099299 [1/2] - in /pig/trunk: CHANGES.txt test/org/apache/pig/test/TestMultiQuery.java test/org/apache/pig/test/TestMultiQueryBasic.java test/org/apache/pig/test/Util.java

Author: rding
Date: Wed May  4 00:11:51 2011
New Revision: 1099299

URL: http://svn.apache.org/viewvc?rev=1099299&view=rev
Log:
PIG-2028: Speed up multiquery unit tests

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
    pig/trunk/test/org/apache/pig/test/TestMultiQueryBasic.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1099299&r1=1099298&r2=1099299&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed May  4 00:11:51 2011
@@ -196,6 +196,8 @@ PIG-1696: Performance: Use System.arrayc
 
 BUG FIXES
 
+PIG-2028: Speed up multiquery unit tests (rding)
+
 PIG-1990: support casting of complex types with empty inner schema
  to complex type with non-empty inner schema (thejas)
 

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1099299&r1=1099298&r2=1099299&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Wed May  4 00:11:51 2011
@@ -21,19 +21,15 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
 import java.util.Iterator;
 import java.util.List;
-
-import junit.framework.Assert;
+import java.util.Properties;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.PigContext;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -41,972 +37,752 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+
 @RunWith(JUnit4.class)
 public class TestMultiQuery {
 
-    private static final MiniCluster cluster = MiniCluster.buildCluster();
-
-    private PigServer myPig;
+    private static PigServer myPig;
 
     @BeforeClass
-    public static void setUpBeforeClass() throws IOException {
-        Util.copyFromLocalToCluster(cluster,
+    public static void setUpBeforeClass() throws Exception {
+        Util.copyFromLocalToLocal(
                 "test/org/apache/pig/test/data/passwd", "passwd");
-        Util.copyFromLocalToCluster(cluster,
+        Util.copyFromLocalToLocal(
                 "test/org/apache/pig/test/data/passwd2", "passwd2");
+        Properties props = new Properties();
+        props.setProperty("opt.multiquery", ""+true);
+        myPig = new PigServer(ExecType.LOCAL, props);
     }
     
     @AfterClass
-    public static void tearDownAfterClass() throws IOException {
-        Util.deleteFile(cluster, "passwd");
-        Util.deleteFile(cluster, "passwd2");
-        cluster.shutDown();
+    public static void tearDownAfterClass() throws Exception {
+        Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd");
+        Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd2");
+        deleteOutputFiles();
     }
     
     @Before
     public void setUp() throws Exception {
-        cluster.setProperty("opt.multiquery", ""+true);
-        myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         deleteOutputFiles();
     }
 
     @After
     public void tearDown() throws Exception {
-        myPig = null;
+
     }
 
     @Test
-    public void testMultiQueryJiraPig1438() {
+    public void testMultiQueryJiraPig1438() throws Exception {
 
         // test case: merge multiple distinct jobs
         
         String INPUT_FILE = "abc";
         
-        try {
-    
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-            w.println("1\t2\t3");
-            w.println("2\t3\t4");
-            w.println("1\t2\t3");
-            w.println("2\t3\t4");
-            w.println("1\t2\t3");
-            w.close();
-    
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-           
-            myPig.setBatchOn();
-    
-            myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
-            myPig.registerQuery("B1 = foreach A generate col1, col2;");
-            myPig.registerQuery("B2 = foreach A generate col2, col3;");
-            myPig.registerQuery("C1 = distinct B1;");
-            myPig.registerQuery("C2 = distinct B2;");
-            myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
-            myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
-            myPig.registerQuery("store D1 into '/tmp/output1';");
-            myPig.registerQuery("store D2 into '/tmp/output2';");            
-            
-            myPig.executeBatch();
-            
-            myPig.registerQuery("E = load '/tmp/output1' as (a:int, b:int);");            
-            Iterator<Tuple> iter = myPig.openIterator("E");
+        String[] inputData = {
+                "1\t2\t3",
+                "2\t3\t4",
+                "1\t2\t3",
+                "2\t3\t4",
+                "1\t2\t3"
+        };
+        
+        Util.createLocalInputFile(INPUT_FILE, inputData);
+       
+        myPig.setBatchOn();
 
-            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "(1,2)",
-                            "(2,3)"
-                    });
-            
-            int counter = 0;
-            while (iter.hasNext()) {
-                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
-            }
-            assertEquals(expectedResults.size(), counter);
-                        
-            myPig.registerQuery("E = load '/tmp/output2' as (a:int, b:int);");            
-            iter = myPig.openIterator("E");
+        myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
+        myPig.registerQuery("B1 = foreach A generate col1, col2;");
+        myPig.registerQuery("B2 = foreach A generate col2, col3;");
+        myPig.registerQuery("C1 = distinct B1;");
+        myPig.registerQuery("C2 = distinct B2;");
+        myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
+        myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
+        myPig.registerQuery("store D1 into 'output1';");
+        myPig.registerQuery("store D2 into 'output2';");            
+        
+        myPig.executeBatch();
+        
+        myPig.registerQuery("E = load 'output1' as (a:int, b:int);");            
+        Iterator<Tuple> iter = myPig.openIterator("E");
 
-            expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "(2,3)",
-                            "(3,4)"
-                    });
-            
-            counter = 0;
-            while (iter.hasNext()) {
-                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
-            }
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "(1,2)",
+                        "(2,3)"
+                });
+        
+        int counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+        }
+        assertEquals(expectedResults.size(), counter);
+                    
+        myPig.registerQuery("E = load 'output2' as (a:int, b:int);");            
+        iter = myPig.openIterator("E");
 
-            assertEquals(expectedResults.size(), counter);
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
+        expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "(2,3)",
+                        "(3,4)"
+                });
+        
+        counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
         }
+
+        assertEquals(expectedResults.size(), counter);
     }
     
     @Test
-    public void testMultiQueryJiraPig1252() {
+    public void testMultiQueryJiraPig1252() throws Exception {
 
         // test case: Problems with secondary key optimization and multiquery
         // diamond optimization
         
         String INPUT_FILE = "abc";
         
-        try {
-    
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-            w.println("1\t2\t3");
-            w.println("2\t3\t4");
-            w.println("3\t\t5");
-            w.println("5\t6\t6");
-            w.println("6\t\t7");
-            w.close();
-    
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-           
-            myPig.setBatchOn();
-    
-            myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1, col2, col3);");
-            myPig.registerQuery("B = foreach A generate (chararray) col1, " +
-            		"(chararray) ((col2 is not null) ?  " +
-            		"col2 : (col3 < 6 ? col3 : '')) as splitcond;");
-            myPig.registerQuery("split B into C if splitcond !=  '', D if splitcond == '';");
-            myPig.registerQuery("E = group C by splitcond;");
-            myPig.registerQuery("F = foreach E { orderedData = order C by $1, $0; generate flatten(orderedData); };");
-       
-            Iterator<Tuple> iter = myPig.openIterator("F");
-
-            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "(1,2)",
-                            "(2,3)",
-                            "(3,5)",
-                            "(5,6)"
-                    });
-            
-            int counter = 0;
-            while (iter.hasNext()) {
-                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                  
-            }
+        String[] inputData = {
+            "1\t2\t3",
+            "2\t3\t4",
+            "3\t\t5",
+            "5\t6\t6",
+            "6\t\t7"       
+        };
+        
+        Util.createLocalInputFile(INPUT_FILE, inputData);
+
+        myPig.setBatchOn();
+
+        myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1, col2, col3);");
+        myPig.registerQuery("B = foreach A generate (chararray) col1, " +
+        		"(chararray) ((col2 is not null) ?  " +
+        		"col2 : (col3 < 6 ? col3 : '')) as splitcond;");
+        myPig.registerQuery("split B into C if splitcond !=  '', D if splitcond == '';");
+        myPig.registerQuery("E = group C by splitcond;");
+        myPig.registerQuery("F = foreach E { orderedData = order C by $1, $0; generate flatten(orderedData); };");
+   
+        Iterator<Tuple> iter = myPig.openIterator("F");
 
-            assertEquals(expectedResults.size(), counter);
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "(1,2)",
+                        "(2,3)",
+                        "(3,5)",
+                        "(5,6)"
+                });
+        
+        int counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                  
         }
+
+        assertEquals(expectedResults.size(), counter);
     }
 
     @Test
-    public void testMultiQueryJiraPig1169() {
+    public void testMultiQueryJiraPig1169() throws Exception {
 
         // test case: Problems with some top N queries
         
         String INPUT_FILE = "abc";
         
-        try {
-    
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-            w.println("1\t2\t3");
-            w.println("2\t3\t4");
-            w.println("3\t4\t5");
-            w.println("5\t6\t7");
-            w.println("6\t7\t8");
-            w.close();
-    
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-           
-            myPig.setBatchOn();
-    
-            myPig.registerQuery("A = load '" + INPUT_FILE 
-                    + "' as (a:int, b, c);");
-            myPig.registerQuery("A1 = Order A by a desc parallel 3;");
-            myPig.registerQuery("A2 = limit A1 2;");
-            myPig.registerQuery("store A1 into '/tmp/input1';");
-            myPig.registerQuery("store A2 into '/tmp/input2';");
+        String[] inputData = {
+                "1\t2\t3",
+                "2\t3\t4",
+                "3\t4\t5",
+                "5\t6\t7",
+                "6\t7\t8"       
+        };
+        
+        Util.createLocalInputFile(INPUT_FILE, inputData);
+       
+        myPig.setBatchOn();
 
-            myPig.executeBatch();
+        myPig.registerQuery("A = load '" + INPUT_FILE 
+                + "' as (a:int, b, c);");
+        myPig.registerQuery("A1 = Order A by a desc parallel 3;");
+        myPig.registerQuery("A2 = limit A1 2;");
+        myPig.registerQuery("store A1 into 'output1';");
+        myPig.registerQuery("store A2 into 'output2';");
 
-            myPig.registerQuery("B = load '/tmp/input2' as (a:int, b, c);");
-            
-            Iterator<Tuple> iter = myPig.openIterator("B");
+        myPig.executeBatch();
 
-            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "(6,7,8)",
-                            "(5,6,7)"
-                    });
-            
-            int counter = 0;
-            while (iter.hasNext()) {
-                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
-            }
+        myPig.registerQuery("B = load 'output2' as (a:int, b, c);");
+        
+        Iterator<Tuple> iter = myPig.openIterator("B");
 
-            assertEquals(expectedResults.size(), counter);
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "(6,7,8)",
+                        "(5,6,7)"
+                });
+        
+        int counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
         }
+
+        assertEquals(expectedResults.size(), counter);
     }
   
     @Test
-    public void testMultiQueryJiraPig1171() {
+    public void testMultiQueryJiraPig1171() throws Exception {
 
         // test case: Problems with some top N queries
         
         String INPUT_FILE = "abc";
         
-        try {
-    
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-            w.println("1\tapple\t3");
-            w.println("2\torange\t4");
-            w.println("3\tpersimmon\t5");
-            w.close();
-    
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-           
-            myPig.setBatchOn();
-    
-            myPig.registerQuery("A = load '" + INPUT_FILE 
-                    + "' as (a:long, b, c);");
-            myPig.registerQuery("A1 = Order A by a desc;");
-            myPig.registerQuery("A2 = limit A1 1;");
-            myPig.registerQuery("B = load '" + INPUT_FILE 
-                    + "' as (a:long, b, c);");
-            myPig.registerQuery("B1 = Order B by a desc;");
-            myPig.registerQuery("B2 = limit B1 1;");
-            
-            myPig.registerQuery("C = cross A2, B2;");
-            
-            Iterator<Tuple> iter = myPig.openIterator("C");
+        String[] inputData = {
+            "1\tapple\t3",
+            "2\torange\t4",
+            "3\tpersimmon\t5"    
+        };
+        
+        Util.createLocalInputFile(INPUT_FILE, inputData);
 
-            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "(3L,'persimmon',5,3L,'persimmon',5)"
-                    });
-            
-            int counter = 0;
-            while (iter.hasNext()) {
-                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
-            }
+        myPig.setBatchOn();
 
-            assertEquals(expectedResults.size(), counter);
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
+        myPig.registerQuery("A = load '" + INPUT_FILE 
+                + "' as (a:long, b, c);");
+        myPig.registerQuery("A1 = Order A by a desc;");
+        myPig.registerQuery("A2 = limit A1 1;");
+        myPig.registerQuery("B = load '" + INPUT_FILE 
+                + "' as (a:long, b, c);");
+        myPig.registerQuery("B1 = Order B by a desc;");
+        myPig.registerQuery("B2 = limit B1 1;");
+        
+        myPig.registerQuery("C = cross A2, B2;");
+        
+        Iterator<Tuple> iter = myPig.openIterator("C");
+
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "(3L,'persimmon',5,3L,'persimmon',5)"
+                });
+        
+        int counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
         }
+
+        assertEquals(expectedResults.size(), counter);
     }
     
     @Test
-    public void testMultiQueryJiraPig1157() {
+    public void testMultiQueryJiraPig1157() throws Exception {
 
         // test case: Sucessive replicated joins do not generate Map Reduce plan and fails due to OOM
         
         String INPUT_FILE = "abc";
-        String INPUT_FILE_1 = "xyz";
+        String INPUT_FILE_1 = "abc";
         
-        try {
-    
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-            w.println("1\tapple\t3");
-            w.println("2\torange\t4");
-            w.println("3\tpersimmon\t5");
-            w.close();
-    
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE_1);
-    
-            myPig.setBatchOn();
-    
-            myPig.registerQuery("A = load '" + INPUT_FILE 
-                    + "' as (a:long, b, c);");
-            myPig.registerQuery("A1 = FOREACH A GENERATE a;");
-            myPig.registerQuery("B = GROUP A1 BY a;");
-            myPig.registerQuery("C = load '" + INPUT_FILE_1 
-                    + "' as (x:long, y);");
-            myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");  
-            myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");  
+        String[] inputData = {
+                "1\tapple\t3",
+                "2\torange\t4",
+                "3\tpersimmon\t5"    
+        };
             
-            Iterator<Tuple> iter = myPig.openIterator("E");
+        Util.createLocalInputFile(INPUT_FILE, inputData);
 
-            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "(1L,'apple',3,1L,'apple',1L,{(1L)})",
-                            "(2L,'orange',4,2L,'orange',2L,{(2L)})",
-                            "(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
-                    });
-            
-            int counter = 0;
-            while (iter.hasNext()) {
-                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                  
-            }
+        myPig.setBatchOn();
 
-            assertEquals(expectedResults.size(), counter);
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE);
-                Util.deleteFile(cluster, INPUT_FILE_1);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
+        myPig.registerQuery("A = load '" + INPUT_FILE 
+                + "' as (a:long, b, c);");
+        myPig.registerQuery("A1 = FOREACH A GENERATE a;");
+        myPig.registerQuery("B = GROUP A1 BY a;");
+        myPig.registerQuery("C = load '" + INPUT_FILE_1 
+                + "' as (x:long, y);");
+        myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");  
+        myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");  
+        
+        Iterator<Tuple> iter = myPig.openIterator("E");
+
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "(1L,'apple',3,1L,'apple',1L,{(1L)})",
+                        "(2L,'orange',4,2L,'orange',2L,{(2L)})",
+                        "(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
+                });
+        
+        int counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                  
         }
+
+        assertEquals(expectedResults.size(), counter);
     }
     
     @Test
-    public void testMultiQueryJiraPig1068() {
+    public void testMultiQueryJiraPig1068() throws Exception {
 
         // test case: COGROUP fails with 'Type mismatch in key from map: 
         // expected org.apache.pig.impl.io.NullableText, recieved org.apache.pig.impl.io.NullableTuple'
 
         String INPUT_FILE = "pig-1068.txt";
 
-        try {
-
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-            w.println("10\tapple\tlogin\tjar");
-            w.println("20\torange\tlogin\tbox");
-            w.println("30\tstrawberry\tquit\tbot");
-
-            w.close();
-
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
-            myPig.setBatchOn();
-
-            myPig.registerQuery("logs = load '" + INPUT_FILE 
-                    + "' as (ts:int, id:chararray, command:chararray, comments:chararray);");
-            myPig.registerQuery("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';");
-            myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");  
-            myPig.registerQuery("logins_grouped = GROUP login_info BY (id, client);");
-            myPig.registerQuery("count_logins_by_client = FOREACH logins_grouped "
-                    + "{ generate group.id AS id, group.client AS client, COUNT($1) AS count; };");
-            myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
-            myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE FLATTEN(all_quits); };");
-            myPig.registerQuery("joined_session_info = COGROUP quits BY id, count_logins_by_client BY id;");
-            
-            Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
-
-            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "('apple',{},{('apple','jar',1L)})",
-                            "('orange',{},{('orange','box',1L)})",
-                            "('strawberry',{(30,'strawberry','quit','bot')},{})"
-                    });
-            
-            int counter = 0;
-            while (iter.hasNext()) {
-                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                
-            }
-
-            assertEquals(expectedResults.size(), counter);
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
-        }
+        String[] inputData = {
+            "10\tapple\tlogin\tjar",
+            "20\torange\tlogin\tbox",
+            "30\tstrawberry\tquit\tbot"    
+        };
+            
+        Util.createLocalInputFile(INPUT_FILE, inputData);
+
+        myPig.setBatchOn();
+
+        myPig.registerQuery("logs = load '" + INPUT_FILE 
+                + "' as (ts:int, id:chararray, command:chararray, comments:chararray);");
+        myPig.registerQuery("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';");
+        myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");  
+        myPig.registerQuery("logins_grouped = GROUP login_info BY (id, client);");
+        myPig.registerQuery("count_logins_by_client = FOREACH logins_grouped "
+                + "{ generate group.id AS id, group.client AS client, COUNT($1) AS count; };");
+        myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
+        myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE FLATTEN(all_quits); };");
+        myPig.registerQuery("joined_session_info = COGROUP quits BY id, count_logins_by_client BY id;");
+        
+        Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
+
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "('apple',{},{('apple','jar',1L)})",
+                        "('orange',{},{('orange','box',1L)})",
+                        "('strawberry',{(30,'strawberry','quit','bot')},{})"
+                });
+        
+        int counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                
+        }
+
+        assertEquals(expectedResults.size(), counter);
     }
     
     @Test
-    public void testMultiQueryJiraPig1108() {
-        try {
-            myPig.setBatchOn();
-
-            myPig.registerQuery("a = load 'passwd' " 
-                    + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-            myPig.registerQuery("split a into plan1 if (uid > 5), plan2 if ( uid < 5);");
-            myPig.registerQuery("b = group plan1 by uname;");
-            myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; " 
-                    + "generate flatten(group) as foo, tmp; };");
-            myPig.registerQuery("d = filter c BY foo is not null;");
-            myPig.registerQuery("store d into '/tmp/output1';");
-            myPig.registerQuery("store plan2 into '/tmp/output2';");
-             
-            List<ExecJob> jobs = myPig.executeBatch();
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } 
+    public void testMultiQueryJiraPig1108() throws Exception {
+
+        myPig.setBatchOn();
+
+        myPig.registerQuery("a = load 'passwd' " 
+                + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("split a into plan1 if (uid > 5), plan2 if ( uid < 5);");
+        myPig.registerQuery("b = group plan1 by uname;");
+        myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; " 
+                + "generate flatten(group) as foo, tmp; };");
+        myPig.registerQuery("d = filter c BY foo is not null;");
+        myPig.registerQuery("store d into 'output1';");
+        myPig.registerQuery("store plan2 into 'output2';");
+         
+        List<ExecJob> jobs = myPig.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+        }
     }    
     
     @Test
-    public void testMultiQueryJiraPig1114() {
+    public void testMultiQueryJiraPig1114() throws Exception {
 
         // test case: MultiQuery optimization throws error when merging 2 level splits
 
         String INPUT_FILE = "data.txt";
 
-        try {
-
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-            w.println("10\tjar");
-            w.println("20\tbox");
-            w.println("30\tbot");
-            w.close();
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
-            myPig.setBatchOn();
-
-            myPig.registerQuery("data = load '" + INPUT_FILE
-                    + "' USING PigStorage as (id:int, name:chararray);");
-            myPig.registerQuery("ids = FOREACH data GENERATE id;");
-            myPig.registerQuery("allId = GROUP ids all;");
-            myPig.registerQuery("allIdCount = FOREACH allId GENERATE group as allId, COUNT(ids) as total;");
-            myPig.registerQuery("idGroup = GROUP ids by id;");
-            myPig.registerQuery("idGroupCount = FOREACH idGroup GENERATE group as id, COUNT(ids) as count;");
-            myPig.registerQuery("countTotal = cross idGroupCount, allIdCount;");
-            myPig.registerQuery("idCountTotal = foreach countTotal generate id, count, total, (double)count / (double)total as proportion;");
-            myPig.registerQuery("orderedCounts = order idCountTotal by count desc;");
-            myPig.registerQuery("STORE orderedCounts INTO '/tmp/output1';");
-
-            myPig.registerQuery("names = FOREACH data GENERATE name;");
-            myPig.registerQuery("allNames = GROUP names all;");
-            myPig.registerQuery("allNamesCount = FOREACH allNames GENERATE group as namesAll, COUNT(names) as total;");
-            myPig.registerQuery("nameGroup = GROUP names by name;");
-            myPig.registerQuery("nameGroupCount = FOREACH nameGroup GENERATE group as name, COUNT(names) as count;");
-            myPig.registerQuery("namesCrossed = cross nameGroupCount, allNamesCount;");
-            myPig.registerQuery("nameCountTotal = foreach namesCrossed generate name, count, total, (double)count / (double)total as proportion;");
-            myPig.registerQuery("nameCountsOrdered = order nameCountTotal by count desc;");
-            myPig.registerQuery("STORE nameCountsOrdered INTO '/tmp/output2';");
-
-            List<ExecJob> jobs = myPig.executeBatch();
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
+        String[] inputData = {
+            "10\tjar",
+            "20\tbox",
+            "30\tbot"   
+        };
+                
+        Util.createLocalInputFile(INPUT_FILE, inputData);
+
+        myPig.setBatchOn();
+
+        myPig.registerQuery("data = load '" + INPUT_FILE
+                + "' USING PigStorage as (id:int, name:chararray);");
+        myPig.registerQuery("ids = FOREACH data GENERATE id;");
+        myPig.registerQuery("allId = GROUP ids all;");
+        myPig.registerQuery("allIdCount = FOREACH allId GENERATE group as allId, COUNT(ids) as total;");
+        myPig.registerQuery("idGroup = GROUP ids by id;");
+        myPig.registerQuery("idGroupCount = FOREACH idGroup GENERATE group as id, COUNT(ids) as count;");
+        myPig.registerQuery("countTotal = cross idGroupCount, allIdCount;");
+        myPig.registerQuery("idCountTotal = foreach countTotal generate id, count, total, (double)count / (double)total as proportion;");
+        myPig.registerQuery("orderedCounts = order idCountTotal by count desc;");
+        myPig.registerQuery("STORE orderedCounts INTO 'output1';");
+
+        myPig.registerQuery("names = FOREACH data GENERATE name;");
+        myPig.registerQuery("allNames = GROUP names all;");
+        myPig.registerQuery("allNamesCount = FOREACH allNames GENERATE group as namesAll, COUNT(names) as total;");
+        myPig.registerQuery("nameGroup = GROUP names by name;");
+        myPig.registerQuery("nameGroupCount = FOREACH nameGroup GENERATE group as name, COUNT(names) as count;");
+        myPig.registerQuery("namesCrossed = cross nameGroupCount, allNamesCount;");
+        myPig.registerQuery("nameCountTotal = foreach namesCrossed generate name, count, total, (double)count / (double)total as proportion;");
+        myPig.registerQuery("nameCountsOrdered = order nameCountTotal by count desc;");
+        myPig.registerQuery("STORE nameCountsOrdered INTO 'output2';");
+
+        List<ExecJob> jobs = myPig.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
     }
     
     @Test
-    public void testMultiQueryJiraPig1113() {
+    public void testMultiQueryJiraPig1113() throws Exception {
 
         // test case: Diamond query optimization throws error in JOIN
 
         String INPUT_FILE_1 = "set1.txt";
         String INPUT_FILE_2 = "set2.txt";
-        try {
-
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE_1));
-            w.println("login\t0\tjar");
-            w.println("login\t1\tbox");
-            w.println("quit\t0\tmany");
-            w.close();
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE_1, INPUT_FILE_1);
-
-            PrintWriter w2 = new PrintWriter(new FileWriter(INPUT_FILE_2));
-            w2.println("apple\tlogin\t{(login)}");
-            w2.println("orange\tlogin\t{(login)}");
-            w2.println("strawberry\tquit\t{(login)}");
-            w2.close();
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE_2, INPUT_FILE_2);
-            
-            myPig.setBatchOn();
+        
 
-            myPig.registerQuery("set1 = load '" + INPUT_FILE_1 
-                    + "' USING PigStorage as (a:chararray, b:chararray, c:chararray);");
-            myPig.registerQuery("set2 = load '" + INPUT_FILE_2
-                    + "' USING PigStorage as (a: chararray, b:chararray, c:bag{});");
-            myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, " 
-                    + "(chararray) 0 as f3;");
-            myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, "
-                    + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");  
-            myPig.registerQuery("all_set2 = UNION set2_1, set2_2;");
-            myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY (f2,f3);");
-          
-            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "('quit','0','many','strawberry','quit','0')",
-                            "('login','0','jar','apple','login','0')",
-                            "('login','0','jar','orange','login','0')",
-                            "('login','1','box','apple','login','1')",
-                            "('login','1','box','orange','login','1')",
-                            "('login','1','box','strawberry','login','1')"
-                    });
-            
-            Iterator<Tuple> iter = myPig.openIterator("joined_sets");
-            int count = 0;
-            while (iter.hasNext()) {
-                assertEquals(expectedResults.get(count++).toString(), iter.next().toString());
-            }
-            assertEquals(expectedResults.size(), count);
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE_1).delete();
-            new File(INPUT_FILE_2).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE_1);
-                Util.deleteFile(cluster, INPUT_FILE_2);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
+        String[] inputData_1 = {
+            "login\t0\tjar",
+            "login\t1\tbox",
+            "quit\t0\tmany" 
+        };
+                
+        Util.createLocalInputFile(INPUT_FILE_1, inputData_1);
+        
+        String[] inputData_2 = {
+            "apple\tlogin\t{(login)}",
+            "orange\tlogin\t{(login)}",
+            "strawberry\tquit\t{(login)}"  
+        };
+                
+        Util.createLocalInputFile(INPUT_FILE_2, inputData_2);
+            
+        myPig.setBatchOn();
+
+        myPig.registerQuery("set1 = load '" + INPUT_FILE_1 
+                + "' USING PigStorage as (a:chararray, b:chararray, c:chararray);");
+        myPig.registerQuery("set2 = load '" + INPUT_FILE_2
+                + "' USING PigStorage as (a: chararray, b:chararray, c:bag{});");
+        myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, " 
+                + "(chararray) 0 as f3;");
+        myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, "
+                + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");  
+        myPig.registerQuery("all_set2 = UNION set2_1, set2_2;");
+        myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY (f2,f3);");
+      
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "('quit','0','many','strawberry','quit','0')",
+                        "('login','0','jar','apple','login','0')",
+                        "('login','0','jar','orange','login','0')",
+                        "('login','1','box','apple','login','1')",
+                        "('login','1','box','orange','login','1')",
+                        "('login','1','box','strawberry','login','1')"
+                });
+        
+        Iterator<Tuple> iter = myPig.openIterator("joined_sets");
+        int count = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(count++).toString(), iter.next().toString());
         }
+        assertEquals(expectedResults.size(), count);
     }
  
     @Test
-    public void testMultiQueryJiraPig1060_2() {
+    public void testMultiQueryJiraPig1060_2() throws Exception {
 
         // test case: 
 
         String INPUT_FILE = "pig-1060.txt";
 
-        try {
+        String[] inputData = {
+            "apple\t2",
+            "apple\t12",
+            "orange\t3",
+            "orange\t23",
+            "strawberry\t10",
+            "strawberry\t34"  
+        };
+                    
+        Util.createLocalInputFile(INPUT_FILE, inputData);
+
+        myPig.setBatchOn();
+
+        myPig.registerQuery("data = load '" + INPUT_FILE +
+        "' as (name:chararray, gid:int);");
+        myPig.registerQuery("f1 = filter data by gid < 5;");
+        myPig.registerQuery("g1 = group f1 by name;");
+        myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);");
+        myPig.registerQuery("store p1 into 'output1';");
+
+        myPig.registerQuery("f2 = filter data by gid > 5;");
+        myPig.registerQuery("g2 = group f2 by name;");
+        myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);");
+        myPig.registerQuery("store p2 into 'output2';");
+
+        myPig.registerQuery("f3 = filter f2 by gid > 10;");
+        myPig.registerQuery("g3 = group f3 by name;");
+        myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);");
+        myPig.registerQuery("store p3 into 'output3';");
+
+        myPig.registerQuery("f4 = filter f3 by gid < 20;");
+        myPig.registerQuery("g4 = group f4 by name;");
+        myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);");
+        myPig.registerQuery("store p4 into 'output4';");
+
+        List<ExecJob> jobs = myPig.executeBatch();
+        assertEquals(4, jobs.size());
 
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-            w.println("apple\t2");
-            w.println("apple\t12");
-            w.println("orange\t3");
-            w.println("orange\t23");
-            w.println("strawberry\t10");
-            w.println("strawberry\t34");
-
-            w.close();
-
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
-            myPig.setBatchOn();
-
-            myPig.registerQuery("data = load '" + INPUT_FILE +
-            "' as (name:chararray, gid:int);");
-            myPig.registerQuery("f1 = filter data by gid < 5;");
-            myPig.registerQuery("g1 = group f1 by name;");
-            myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);");
-            myPig.registerQuery("store p1 into '/tmp/output1';");
-
-            myPig.registerQuery("f2 = filter data by gid > 5;");
-            myPig.registerQuery("g2 = group f2 by name;");
-            myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);");
-            myPig.registerQuery("store p2 into '/tmp/output2';");
-
-            myPig.registerQuery("f3 = filter f2 by gid > 10;");
-            myPig.registerQuery("g3 = group f3 by name;");
-            myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);");
-            myPig.registerQuery("store p3 into '/tmp/output3';");
-
-            myPig.registerQuery("f4 = filter f3 by gid < 20;");
-            myPig.registerQuery("g4 = group f4 by name;");
-            myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);");
-            myPig.registerQuery("store p4 into '/tmp/output4';");
-
-            List<ExecJob> jobs = myPig.executeBatch();
-            assertEquals(4, jobs.size());
-
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
     } 
 
     @Test
-    public void testMultiQueryJiraPig920_2() {
+    public void testMultiQueryJiraPig920_2() throws Exception {
 
         // test case: execution of a query with two diamonds
-        try {
-            myPig.setBatchOn();
 
-            myPig.registerQuery("a = load 'passwd' " +
-                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-            myPig.registerQuery("b = filter a by uid < 5;");
-            myPig.registerQuery("c = filter a by gid >= 5;");
-            myPig.registerQuery("d = filter a by uid >= 5;");
-            myPig.registerQuery("e = filter a by gid < 5;");
-            myPig.registerQuery("f = cogroup c by $0, b by $0;");
-            myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
-            myPig.registerQuery("store f1 into '/tmp/output1';");
-            myPig.registerQuery("g = cogroup d by $0, e by $0;");
-            myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
-            myPig.registerQuery("store g1 into '/tmp/output2';");
-             
-            List<ExecJob> jobs = myPig.executeBatch();
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } 
+        myPig.setBatchOn();
+
+        myPig.registerQuery("a = load 'passwd' " +
+                             "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("b = filter a by uid < 5;");
+        myPig.registerQuery("c = filter a by gid >= 5;");
+        myPig.registerQuery("d = filter a by uid >= 5;");
+        myPig.registerQuery("e = filter a by gid < 5;");
+        myPig.registerQuery("f = cogroup c by $0, b by $0;");
+        myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
+        myPig.registerQuery("store f1 into 'output1';");
+        myPig.registerQuery("g = cogroup d by $0, e by $0;");
+        myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
+        myPig.registerQuery("store g1 into 'output2';");
+         
+        List<ExecJob> jobs = myPig.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+        }
     }            
     
     @Test
-    public void testMultiQueryJiraPig920_3() {
+    public void testMultiQueryJiraPig920_3() throws Exception {
 
         // test case: execution of a simple diamond query
         
         String INPUT_FILE = "pig-920.txt";
         
-        try {
-            
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-            w.println("apple\tapple\t100\t10");
-            w.println("apple\tapple\t200\t20");
-            w.println("orange\torange\t100\t10");
-            w.println("orange\torange\t300\t20");
-   
-            w.close();
-            
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-        
-            myPig.setBatchOn();
+        String[] inputData = {
+            "apple\tapple\t100\t10",
+            "apple\tapple\t200\t20",
+            "orange\torange\t100\t10",
+            "orange\torange\t300\t20"  
+        };
+                        
+        Util.createLocalInputFile(INPUT_FILE, inputData);
 
-            myPig.registerQuery("a = load '" + INPUT_FILE +
-                                "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-            myPig.registerQuery("b = filter a by uid < 300;");
-            myPig.registerQuery("c = filter a by gid > 10;");
-            myPig.registerQuery("d = cogroup c by $0, b by $0;");
-            myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
-                                   
-            Iterator<Tuple> iter = myPig.openIterator("e");
-
-            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "('apple',1L,2L)",
-                            "('orange',1L,1L)"
-                    });
-            
-            int counter = 0;
-            while (iter.hasNext()) {
-                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
-            }
+        myPig.setBatchOn();
 
-            assertEquals(expectedResults.size(), counter);
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
-            
+        myPig.registerQuery("a = load '" + INPUT_FILE +
+                            "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("b = filter a by uid < 300;");
+        myPig.registerQuery("c = filter a by gid > 10;");
+        myPig.registerQuery("d = cogroup c by $0, b by $0;");
+        myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
+                               
+        Iterator<Tuple> iter = myPig.openIterator("e");
+
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { 
+                        "('apple',1L,2L)",
+                        "('orange',1L,1L)"
+                });
+        
+        int counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
+
+        assertEquals(expectedResults.size(), counter);
     }        
 
     @Test
-    public void testMultiQueryJiraPig976() {
+    public void testMultiQueryJiraPig976() throws Exception {
 
         // test case: key ('group') isn't part of foreach output
         // and keys have the same type.
 
-        try {
-            myPig.setBatchOn();
+        myPig.setBatchOn();
 
-            myPig.registerQuery("a = load 'passwd' " +
-                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-            myPig.registerQuery("b = group a by uid;");
-            myPig.registerQuery("c = group a by gid;");
-            myPig.registerQuery("d = foreach b generate SUM(a.gid);");
-            myPig.registerQuery("e = foreach c generate group, COUNT(a);");
-            myPig.registerQuery("store d into '/tmp/output1';");
-            myPig.registerQuery("store e into '/tmp/output2';");
-
-            List<ExecJob> jobs = myPig.executeBatch();
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } 
+        myPig.registerQuery("a = load 'passwd' " +
+                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("b = group a by uid;");
+        myPig.registerQuery("c = group a by gid;");
+        myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+        myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+        myPig.registerQuery("store d into 'output1';");
+        myPig.registerQuery("store e into 'output2';");
+
+        List<ExecJob> jobs = myPig.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+        }
     }
 
     @Test
-    public void testMultiQueryJiraPig976_2() {
+    public void testMultiQueryJiraPig976_2() throws Exception {
 
         // test case: key ('group') isn't part of foreach output 
         // and keys have different types
 
-        try {
-            myPig.setBatchOn();
+        myPig.setBatchOn();
 
-            myPig.registerQuery("a = load 'passwd' " +
-                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-            myPig.registerQuery("b = group a by uname;");
-            myPig.registerQuery("c = group a by gid;");
-            myPig.registerQuery("d = foreach b generate SUM(a.gid);");
-            myPig.registerQuery("e = foreach c generate group, COUNT(a);");
-            myPig.registerQuery("store d into '/tmp/output1';");
-            myPig.registerQuery("store e into '/tmp/output2';");
-
-            List<ExecJob> jobs = myPig.executeBatch();
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } 
+        myPig.registerQuery("a = load 'passwd' " +
+                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("b = group a by uname;");
+        myPig.registerQuery("c = group a by gid;");
+        myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+        myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+        myPig.registerQuery("store d into 'output1';");
+        myPig.registerQuery("store e into 'output2';");
+
+        List<ExecJob> jobs = myPig.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+        }
     }
 
     @Test
-    public void testMultiQueryJiraPig976_3() {
+    public void testMultiQueryJiraPig976_3() throws Exception {
 
         // test case: group all and key ('group') isn't part of output
 
-        try {
-            myPig.setBatchOn();
+        myPig.setBatchOn();
 
-            myPig.registerQuery("a = load 'passwd' " +
-                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-            myPig.registerQuery("b = group a all;");
-            myPig.registerQuery("c = group a by gid;");
-            myPig.registerQuery("d = foreach b generate SUM(a.gid);");
-            myPig.registerQuery("e = foreach c generate group, COUNT(a);");
-            myPig.registerQuery("store d into '/tmp/output1';");
-            myPig.registerQuery("store e into '/tmp/output2';");
-
-            List<ExecJob> jobs = myPig.executeBatch();
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } 
+        myPig.registerQuery("a = load 'passwd' " +
+                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("b = group a all;");
+        myPig.registerQuery("c = group a by gid;");
+        myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+        myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+        myPig.registerQuery("store d into 'output1';");
+        myPig.registerQuery("store e into 'output2';");
+
+        List<ExecJob> jobs = myPig.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+        }
     }
 
     @Test
-    public void testMultiQueryJiraPig976_4() {
+    public void testMultiQueryJiraPig976_4() throws Exception {
 
         // test case: group by multi-cols and key ('group') isn't part of output
-         
-        try {
-            myPig.setBatchOn();
+     
+        myPig.setBatchOn();
 
-            myPig.registerQuery("a = load 'passwd' " +
-                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-            myPig.registerQuery("b = group a by uid;");
-            myPig.registerQuery("c = group a by (uname, gid);");
-            myPig.registerQuery("d = foreach b generate SUM(a.gid);");
-            myPig.registerQuery("e = foreach c generate group.uname, group.gid, COUNT(a);");
-            myPig.registerQuery("store d into '/tmp/output1';");
-            myPig.registerQuery("store e into '/tmp/output2';");
-
-            List<ExecJob> jobs = myPig.executeBatch();
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } 
+        myPig.registerQuery("a = load 'passwd' " +
+                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("b = group a by uid;");
+        myPig.registerQuery("c = group a by (uname, gid);");
+        myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+        myPig.registerQuery("e = foreach c generate group.uname, group.gid, COUNT(a);");
+        myPig.registerQuery("store d into 'output1';");
+        myPig.registerQuery("store e into 'output2';");
+
+        List<ExecJob> jobs = myPig.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+        }
     }
    
     @Test
-    public void testMultiQueryJiraPig976_5() {
+    public void testMultiQueryJiraPig976_5() throws Exception {
 
         // test case: key ('group') in multiple positions.
 
-        try {
-            myPig.setBatchOn();
+        myPig.setBatchOn();
 
-            myPig.registerQuery("a = load 'passwd' " +
-                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-            myPig.registerQuery("b = group a by uid;");
-            myPig.registerQuery("c = group a by (uname, gid);");
-            myPig.registerQuery("d = foreach b generate SUM(a.gid), group, group as foo;");
-            myPig.registerQuery("d1 = foreach d generate $1 + $2;");
-            myPig.registerQuery("e = foreach c generate group, COUNT(a);");
-            myPig.registerQuery("store d1 into '/tmp/output1';");
-            myPig.registerQuery("store e into '/tmp/output2';");
-
-            List<ExecJob> jobs = myPig.executeBatch();
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } 
+        myPig.registerQuery("a = load 'passwd' " +
+                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("b = group a by uid;");
+        myPig.registerQuery("c = group a by (uname, gid);");
+        myPig.registerQuery("d = foreach b generate SUM(a.gid), group, group as foo;");
+        myPig.registerQuery("d1 = foreach d generate $1 + $2;");
+        myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+        myPig.registerQuery("store d1 into 'output1';");
+        myPig.registerQuery("store e into 'output2';");
+
+        List<ExecJob> jobs = myPig.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+        }
     }
     
     @Test
-    public void testMultiQueryJiraPig976_6() {
+    public void testMultiQueryJiraPig976_6() throws Exception {
 
         // test case: key ('group') has null values.
 
         String INPUT_FILE = "pig-976.txt";
         
-        try {
-            
-            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-            w.println("apple\tapple\t100\t10");
-            w.println("apple\tapple\t\t20");
-            w.println("orange\torange\t100\t10");
-            w.println("orange\torange\t\t20");
-            w.println("strawberry\tstrawberry\t300\t10");
-   
-            w.close();
-            
-            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-        
-            myPig.setBatchOn();
-
-            myPig.registerQuery("a = load '" + INPUT_FILE +
-                                "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-            myPig.registerQuery("b = group a by uid;");
-            myPig.registerQuery("c = group a by gid;");
-            myPig.registerQuery("d = foreach b generate group, SUM(a.gid);");
-            myPig.registerQuery("e = foreach c generate COUNT(a), group;");
-            myPig.registerQuery("store d into '/tmp/output1';");
-            myPig.registerQuery("store e into '/tmp/output2';");
+        String[] inputData = {
+            "apple\tapple\t100\t10",
+            "apple\tapple\t\t20",
+            "orange\torange\t100\t10",
+            "orange\torange\t\t20",
+            "strawberry\tstrawberry\t300\t10"
+        };
+                            
+        Util.createLocalInputFile(INPUT_FILE, inputData);
+    
+        myPig.setBatchOn();
+
+        myPig.registerQuery("a = load '" + INPUT_FILE +
+                            "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("b = group a by uid;");
+        myPig.registerQuery("c = group a by gid;");
+        myPig.registerQuery("d = foreach b generate group, SUM(a.gid);");
+        myPig.registerQuery("e = foreach c generate COUNT(a), group;");
+        myPig.registerQuery("store d into 'output1';");
+        myPig.registerQuery("store e into 'output2';");
 
-            List<ExecJob> jobs = myPig.executeBatch();
-            assertTrue(jobs.size() == 2);
-            
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-            
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } finally {
-            new File(INPUT_FILE).delete();
-            try {
-                Util.deleteFile(cluster, INPUT_FILE);
-            } catch (IOException e) {
-                e.printStackTrace();
-                Assert.fail();
-            }
-            
+        List<ExecJob> jobs = myPig.executeBatch();
+        assertTrue(jobs.size() == 2);
+        
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
     }    
 
     @Test
-    public void testMultiQueryJiraPig983_2() {
+    public void testMultiQueryJiraPig983_2() throws Exception {
 
         System.out.println("===== multi-query Jira Pig-983_2 =====");
 
-        try {
-            myPig.setBatchOn();
+        myPig.setBatchOn();
 
-            myPig.registerQuery("a = load 'passwd' " +
-                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-            myPig.registerQuery("b = filter a by uid < 5;");
-            myPig.registerQuery("c = filter a by uid >= 5;");
-            myPig.registerQuery("d = join b by uname, c by uname;");
-            myPig.registerQuery("e = group d by b::gid;");
-            myPig.registerQuery("e1 = foreach e generate group, COUNT(d.b::uid);");
-            myPig.registerQuery("store e1 into '/tmp/output1';");
-            myPig.registerQuery("f = group d by c::gid;");
-            myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
-            myPig.registerQuery("store f1 into '/tmp/output2';");
-             
-            List<ExecJob> jobs = myPig.executeBatch();
+        myPig.registerQuery("a = load 'passwd' " +
+                             "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("b = filter a by uid < 5;");
+        myPig.registerQuery("c = filter a by uid >= 5;");
+        myPig.registerQuery("d = join b by uname, c by uname;");
+        myPig.registerQuery("e = group d by b::gid;");
+        myPig.registerQuery("e1 = foreach e generate group, COUNT(d.b::uid);");
+        myPig.registerQuery("store e1 into 'output1';");
+        myPig.registerQuery("f = group d by c::gid;");
+        myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
+        myPig.registerQuery("store f1 into 'output2';");
+         
+        List<ExecJob> jobs = myPig.executeBatch();
 
-            assertTrue(jobs.size() == 2);
-            
-            for (ExecJob job : jobs) {
-                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail();
-        } 
+        assertTrue(jobs.size() == 2);
+        
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+        }
     }     
 
     // --------------------------------------------------------------------------
     // Helper methods
 
-    private void deleteOutputFiles() {
-        try {
-            FileLocalizer.delete("/tmp/output1", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output2", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output3", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output4", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output5", myPig.getPigContext());
-        } catch (IOException e) {
-            e.printStackTrace();
-            Assert.fail();
-        }
+    private static void deleteOutputFiles() {
+        Util.deleteDirectory(new File("output1"));
+        Util.deleteDirectory(new File("output2"));
+        Util.deleteDirectory(new File("output3"));
+        Util.deleteDirectory(new File("output4"));
+        Util.deleteDirectory(new File("output5"));
     }
 }