You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC

svn commit: r1598702 [16/23] - in /pig/trunk: ./ ivy/ shims/src/hadoop23/org/apache/pig/backend/hadoop23/ shims/test/hadoop20/org/apache/pig/test/ shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/ src/org/apache/pig/ src/org/apache/pig/ba...

Modified: pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAccumulator.java Fri May 30 19:07:23 2014
@@ -31,49 +31,65 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Properties;
 
-import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.parser.ParserException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestAccumulator {
-    private static final String INPUT_FILE = "AccumulatorInput.txt";
+    private static final String INPUT_FILE1 = "AccumulatorInput1.txt";
     private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
     private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
     private static final String INPUT_FILE4 = "AccumulatorInput4.txt";
+    private static final String INPUT_DIR = "build/test/data";
 
-    private PigServer pigServer;
-    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static PigServer pigServer;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+        properties.setProperty("pig.accumulative.batchsize", "2");
+        properties.setProperty("pig.exec.nocombiner", "true");
+        // Reducing the number of retry attempts to speed up test completion
+        properties.setProperty("mapred.map.max.attempts","1");
+        properties.setProperty("mapred.reduce.max.attempts","1");
+        createFiles();
+    }
 
-    public TestAccumulator() throws ExecException, IOException{
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-        // pigServer = new PigServer(ExecType.LOCAL);
-        pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "2");
-        pigServer.getPigContext().getProperties().setProperty("pig.exec.nocombiner", "true");
-        // reducing the number of retry attempts to speed up test completion
-        pigServer.getPigContext().getProperties().setProperty("mapred.map.max.attempts","1");
-        pigServer.getPigContext().getProperties().setProperty("mapred.reduce.max.attempts","1");
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        deleteFiles();
+        cluster.shutDown();
     }
 
     @Before
     public void setUp() throws Exception {
-        pigServer.getPigContext().getProperties().remove("opt.accumulator");
-        createFiles();
+        Util.resetStateForExecModeSwitch();
+        // Drop stale configuration from previous test run
+        properties.remove(PigConfiguration.OPT_ACCUMULATOR);
+        pigServer = new PigServer(cluster.getExecType(), properties);
     }
 
-    @AfterClass
-    public static void oneTimeTearDown() throws Exception {
-        cluster.shutDown();
+    @After
+    public void tearDown() throws Exception {
+        pigServer.shutdown();
+        pigServer = null;
     }
 
-    private void createFiles() throws IOException {
-        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+    private static void createFiles() throws IOException {
+        new File(INPUT_DIR).mkdir();
+
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE1));
 
         w.println("100\tapple");
         w.println("200\torange");
@@ -84,9 +100,9 @@ public class TestAccumulator {
         w.println("400\tapple");
         w.close();
 
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
 
-        w = new PrintWriter(new FileWriter(INPUT_FILE2));
+        w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
 
         w.println("100\t");
         w.println("100\t");
@@ -95,9 +111,9 @@ public class TestAccumulator {
         w.println("300\tstrawberry");
         w.close();
 
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2);
 
-        w = new PrintWriter(new FileWriter(INPUT_FILE3));
+        w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE3));
 
         w.println("100\t1.0");
         w.println("100\t2.0");
@@ -112,9 +128,9 @@ public class TestAccumulator {
         w.println("400\t");
         w.close();
 
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3);
 
-        w = new PrintWriter(new FileWriter(INPUT_FILE4));
+        w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE4));
 
         w.println("100\thttp://ibm.com,ibm");
         w.println("100\thttp://ibm.com,ibm");
@@ -122,25 +138,17 @@ public class TestAccumulator {
         w.println("300\thttp://sun.com,sun");
         w.close();
 
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE4, INPUT_FILE4);
     }
 
-    @After
-    public void tearDown() throws Exception {
-        new File(INPUT_FILE).delete();
-        Util.deleteFile(cluster, INPUT_FILE);
-        new File(INPUT_FILE2).delete();
-        Util.deleteFile(cluster, INPUT_FILE2);
-        new File(INPUT_FILE3).delete();
-        Util.deleteFile(cluster, INPUT_FILE3);
-        new File(INPUT_FILE4).delete();
-        Util.deleteFile(cluster, INPUT_FILE4);
+    private static void deleteFiles() {
+        Util.deleteDirectory(new File(INPUT_DIR));
     }
 
     @Test
     public void testAccumBasic() throws IOException{
         // test group by
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B generate group,  org.apache.pig.test.utils.AccumulatorBagCount(A);");
 
@@ -150,7 +158,6 @@ public class TestAccumulator {
         expected.put(300, 3);
         expected.put(400, 1);
 
-
         Iterator<Tuple> iter = pigServer.openIterator("C");
 
         while(iter.hasNext()) {
@@ -175,8 +182,8 @@ public class TestAccumulator {
         }
 
         // test cogroup
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
-        pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("C = cogroup A by id, B by id;");
         pigServer.registerQuery("D = foreach C generate group,  " +
                 "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.AccumulatorBagCount(B);");
@@ -187,7 +194,6 @@ public class TestAccumulator {
         expected2.put(300, "3,3");
         expected2.put(400, "1,1");
 
-
         iter = pigServer.openIterator("D");
 
         while(iter.hasNext()) {
@@ -198,7 +204,7 @@ public class TestAccumulator {
 
     @Test
     public void testAccumWithNegative() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B generate group,  -org.apache.pig.test.utils.AccumulatorBagCount(A);");
 
@@ -219,7 +225,7 @@ public class TestAccumulator {
 
     @Test
     public void testAccumWithAdd() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B generate group,  org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;");
 
@@ -261,7 +267,7 @@ public class TestAccumulator {
 
     @Test
     public void testAccumWithMinus() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B generate group, " +
                 " org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);");
@@ -283,7 +289,7 @@ public class TestAccumulator {
 
     @Test
     public void testAccumWithMod() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B generate group,  " +
                 "org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;");
@@ -305,7 +311,7 @@ public class TestAccumulator {
 
     @Test
     public void testAccumWithDivide() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B generate group,  " +
                 "org.apache.pig.test.utils.AccumulatorBagCount(A)/2;");
@@ -327,7 +333,7 @@ public class TestAccumulator {
 
     @Test
     public void testAccumWithAnd() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B generate group,  " +
                 "((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " +
@@ -350,7 +356,7 @@ public class TestAccumulator {
 
     @Test
     public void testAccumWithOr() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B generate group,  " +
                 "((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " +
@@ -373,7 +379,7 @@ public class TestAccumulator {
 
     @Test
     public void testAccumWithRegexp() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B generate group,  " +
                 "(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);");
@@ -417,7 +423,7 @@ public class TestAccumulator {
     @Test
     public void testAccumWithIsEmpty() throws IOException{
         pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "1");
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
         pigServer.registerQuery("C = cogroup A by id outer, B by id outer;");
         pigServer.registerQuery("D = foreach C generate group," +
@@ -442,9 +448,10 @@ public class TestAccumulator {
 
     @Test
     public void testAccumWithDistinct() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
         pigServer.registerQuery("B = group A by id;");
-        pigServer.registerQuery("C = foreach B { D = distinct A; generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");
+        pigServer.registerQuery("C = foreach B { D = distinct A;" +
+                "generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");
 
         HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
         expected.put(100, 2);
@@ -454,18 +461,22 @@ public class TestAccumulator {
 
         Iterator<Tuple> iter = pigServer.openIterator("C");
 
+        int count = 0;
         while(iter.hasNext()) {
+            count++;
             Tuple t = iter.next();
             assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
         }
+        assertEquals(4, count);
     }
 
     @Test
     public void testAccumWithSort() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
         pigServer.registerQuery("B = foreach A generate id, f, id as t;");
         pigServer.registerQuery("C = group B by id;");
-        pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f; generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");
+        pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f;" +
+                "generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");
 
         HashMap<Integer, String> expected = new HashMap<Integer, String>();
         expected.put(100, "(apple)(apple)");
@@ -475,13 +486,17 @@ public class TestAccumulator {
 
         Iterator<Tuple> iter = pigServer.openIterator("D");
 
+        int count = 0;
         while(iter.hasNext()) {
+            count++;
             Tuple t = iter.next();
             assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));
         }
+        assertEquals(4, count);
     }
 
-    public void testAccumWithBuildinAvg() throws IOException {
+    @Test
+    public void testAccumWithBuiltinAvg() throws IOException {
       HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
       expected.put(100, 3.0);
       expected.put(200, 2.1);
@@ -490,31 +505,31 @@ public class TestAccumulator {
       // Test all the averages for correct behaviour with null values
       String[] types = { "double", "float", "int", "long" };
       for (int i = 0; i < types.length; i++) {
-        if (i > 1) { // adjust decimal error for non real types
-          expected.put(200, 2.0);
-          expected.put(300, 3.0);
-        }
-        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:"
-            + types[i] + ");");
-        pigServer.registerQuery("C = group A by id;");
-        pigServer.registerQuery("D = foreach C generate group, AVG(A.v);");
-        Iterator<Tuple> iter = pigServer.openIterator("D");
-
-        while (iter.hasNext()) {
-          Tuple t = iter.next();
-          Double v = expected.get((Integer) t.get(0));
-          if (v != null) {
-            assertEquals(v.doubleValue(), ((Number) t.get(1)).doubleValue(),
-                0.0001);
-          } else {
-            assertNull(t.get(1));
+          if (i > 1) { // adjust decimal error for non real types
+              expected.put(200, 2.0);
+              expected.put(300, 3.0);
+          }
+          pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:"
+                  + types[i] + ");");
+          pigServer.registerQuery("C = group A by id;");
+          pigServer.registerQuery("D = foreach C generate group, AVG(A.v);");
+          Iterator<Tuple> iter = pigServer.openIterator("D");
+
+          while (iter.hasNext()) {
+              Tuple t = iter.next();
+              Double v = expected.get((Integer) t.get(0));
+              if (v != null) {
+                  assertEquals(v.doubleValue(), ((Number) t.get(1)).doubleValue(),
+                          0.0001);
+              } else {
+                  assertNull(t.get(1));
+              }
           }
-        }
       }
     }
 
     @Test
-    public void testAccumWithBuildin() throws IOException{
+    public void testAccumWithBuiltin() throws IOException{
         pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
         pigServer.registerQuery("C = group A by id;");
         // moving AVG accumulator test to separate test case
@@ -532,19 +547,19 @@ public class TestAccumulator {
             Tuple t = iter.next();
             Double[] v = expected.get((Integer)t.get(0));
             for(int i=0; i<v.length; i++) {
-              if (v[i] != null) {
-                assertEquals(v[i].doubleValue(), ((Number) t.get(i + 1))
-                    .doubleValue(), 0.0001);
-              } else {
-                assertNull(t.get(i + 1));
-              }
+                if (v[i] != null) {
+                    assertEquals(v[i].doubleValue(), ((Number) t.get(i + 1))
+                            .doubleValue(), 0.0001);
+                } else {
+                    assertNull(t.get(i + 1));
+                }
             }
         }
     }
 
     @Test
-    public void testAccumWithMultiBuildin() throws IOException{
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, c:chararray);");
+    public void testAccumWithMultiBuiltin() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, c:chararray);");
         pigServer.registerQuery("C = group A by 1;");
         pigServer.registerQuery("D = foreach C generate SUM(A.id), 1+SUM(A.id)+SUM(A.id);");
 
@@ -564,7 +579,7 @@ public class TestAccumulator {
         pigServer.registerQuery("C = group A by id;");
         pigServer.registerQuery("D = foreach C generate group, COUNT_STAR(A.id);");
 
-        Iterator<Tuple> iter = pigServer.openIterator("D");
+        pigServer.openIterator("D");
     }
 
     /**
@@ -615,24 +630,25 @@ public class TestAccumulator {
 
     @Test
     public void testAccumulatorOff() throws IOException{
-        pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "false");
+        pigServer.getPigContext().getProperties().setProperty(
+                PigConfiguration.OPT_ACCUMULATOR, "false");
 
         pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
-        pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulativeSumBag(A);");
+        pigServer.registerQuery("C = foreach B generate group," +
+                "org.apache.pig.test.utils.AccumulativeSumBag(A);");
 
         checkAccumulatorOff("C");
-        pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "true");
+        pigServer.getPigContext().getProperties().setProperty(
+                PigConfiguration.OPT_ACCUMULATOR, "true");
 
     }
 
     private void checkAccumulatorOff(String alias) {
         try {
             Iterator<Tuple> iter = pigServer.openIterator(alias);
-            int c = 0;
             while(iter.hasNext()) {
                 iter.next();
-                c++;
             }
             fail("Accumulator should be off.");
         }catch(Exception e) {
@@ -644,7 +660,8 @@ public class TestAccumulator {
     public void testAccumWithMap() throws IOException{
         pigServer.registerQuery("A = load '" + INPUT_FILE4 + "' as (id, url);");
         pigServer.registerQuery("B = group A by (id, url);");
-        pigServer.registerQuery("C = foreach B generate COUNT(A), org.apache.pig.test.utils.URLPARSE(group.url)#'url';");
+        pigServer.registerQuery("C = foreach B generate COUNT(A)," +
+                "org.apache.pig.test.utils.URLPARSE(group.url)#'url';");
 
         HashMap<Integer, String> expected = new HashMap<Integer, String>();
         expected.put(2, "http://ibm.com");
@@ -679,7 +696,8 @@ public class TestAccumulator {
         pigServer.registerQuery("A = load 'data1' as (x:int, y:int);");
         pigServer.registerQuery("B = load 'data2' as (x:int, z:int);");
         pigServer.registerQuery("C = cogroup A by x, B by x;");
-        pigServer.registerQuery("D = foreach C generate group, SUM((IsEmpty(A.y) ? {(0)} : A.y)) + SUM((IsEmpty(B.z) ? {(0)} : B.z));");
+        pigServer.registerQuery("D = foreach C generate group," +
+                "SUM((IsEmpty(A.y) ? {(0)} : A.y)) + SUM((IsEmpty(B.z) ? {(0)} : B.z));");
 
         HashMap<Integer, Long> expected = new HashMap<Integer, Long>();
         expected.put(1, 21l);
@@ -704,7 +722,7 @@ public class TestAccumulator {
     @Test
     public void testAccumAfterNestedOp() throws IOException, ParserException{
         // test group by
-        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
         pigServer.registerQuery("B = group A by id;");
         pigServer.registerQuery("C = foreach B " +
                         "{ o = order A by id; " +

Modified: pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java Fri May 30 19:07:23 2014
@@ -23,26 +23,35 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.Iterator;
+import java.util.Properties;
 import java.util.Random;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestAlgebraicEval {
+    private static PigServer pig;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
 
     private int LOOP_COUNT = 1024;
-
-    private PigServer pig;
+    private Boolean[] nullFlags = new Boolean[]{ false, true};
 
     @Before
     public void setUp() throws Exception {
-        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pig = new PigServer(cluster.getExecType(), properties);
+    }
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
     }
 
     @AfterClass
@@ -50,9 +59,6 @@ public class TestAlgebraicEval {
         cluster.shutDown();
     }
 
-    Boolean[] nullFlags = new Boolean[]{ false, true};
-
-    static MiniCluster cluster = MiniCluster.buildCluster();
     @Test
     public void testGroupCountWithMultipleFields() throws Throwable {
         File tmpFile = File.createTempFile("test", "txt");

Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBZip.java Fri May 30 19:07:23 2014
@@ -33,15 +33,12 @@ import java.util.Iterator;
 import java.util.Properties;
 import java.util.Map.Entry;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.data.DataType;
@@ -50,34 +47,39 @@ import org.apache.pig.impl.PigContext;
 import org.apache.tools.bzip2r.CBZip2InputStream;
 import org.apache.tools.bzip2r.CBZip2OutputStream;
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
 
-@RunWith(JUnit4.class)
 public class TestBZip {
-    static MiniCluster cluster = MiniCluster.buildCluster();
-    
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+    }
+
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-    
+
    /**
     * Tests the end-to-end writing and reading of a BZip file.
     */
     @Test
     public void testBzipInPig() throws Exception {
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-       
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
+
         File in = File.createTempFile("junit", ".bz2");
         in.deleteOnExit();
-        
+
         File out = File.createTempFile("junit", ".bz2");
         out.delete();
         String clusterOutput = Util.removeColon(out.getAbsolutePath());
-               
-        CBZip2OutputStream cos = 
+
+        CBZip2OutputStream cos =
             new CBZip2OutputStream(new FileOutputStream(in));
         for (int i = 1; i < 100; i++) {
             StringBuffer sb = new StringBuffer();
@@ -86,7 +88,7 @@ public class TestBZip {
             cos.write(bytes);
         }
         cos.close();
-                       
+
         pig.registerQuery("AA = load '"
                 + Util.generateURI(Util.encodeEscape(in.getAbsolutePath()), pig.getPigContext())
                 + "';");
@@ -94,49 +96,50 @@ public class TestBZip {
         pig.registerQuery("store A into '" + Util.encodeEscape(clusterOutput) + "';");
         FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
                 pig.getPigContext().getProperties()));
-        FSDataInputStream is = fs.open(new Path(clusterOutput +
-                "/part-r-00000.bz2"));
+        FileStatus[] outputFiles = fs.listStatus(new Path(clusterOutput),
+                Util.getSuccessMarkerPathFilter());
+        FSDataInputStream is = fs.open(outputFiles[0].getPath());
         CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());
-        
+
         // Just a sanity check, to make sure it was a bzip file; we
         // will do the value verification later
         assertEquals(100, cis.read(new byte[100]));
         cis.close();
-        
+
         pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutput) + "';");
-        
+
         Iterator<Tuple> i = pig.openIterator("B");
         HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
         while (i.hasNext()) {
             Integer val = DataType.toInteger(i.next().get(0));
-            map.put(val, val);            
+            map.put(val, val);
         }
-        
+
         assertEquals(new Integer(99), new Integer(map.keySet().size()));
-        
+
         for (int j = 1; j < 100; j++) {
             assertEquals(new Integer(j), map.get(j));
         }
-        
+
         in.delete();
         Util.deleteFile(cluster, clusterOutput);
     }
-    
+
    /**
     * Tests the end-to-end writing and reading of a BZip file using absolute path with a trailing /.
     */
     @Test
     public void testBzipInPig2() throws Exception {
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-       
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
+
         File in = File.createTempFile("junit", ".bz2");
         in.deleteOnExit();
-        
+
         File out = File.createTempFile("junit", ".bz2");
         out.delete();
         String clusterOutput = Util.removeColon(out.getAbsolutePath());
-               
-        CBZip2OutputStream cos = 
+
+        CBZip2OutputStream cos =
             new CBZip2OutputStream(new FileOutputStream(in));
         for (int i = 1; i < 100; i++) {
             StringBuffer sb = new StringBuffer();
@@ -145,7 +148,7 @@ public class TestBZip {
             cos.write(bytes);
         }
         cos.close();
-                       
+
         pig.registerQuery("AA = load '"
                 + Util.generateURI(in.getAbsolutePath(), pig.getPigContext())
                 + "';");
@@ -153,30 +156,31 @@ public class TestBZip {
         pig.registerQuery("store A into '" + Util.encodeEscape(clusterOutput) + "/';");
         FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
                 pig.getPigContext().getProperties()));
-        FSDataInputStream is = fs.open(new Path(clusterOutput +
-                "/part-r-00000.bz2"));
+        FileStatus[] outputFiles = fs.listStatus(new Path(clusterOutput),
+                Util.getSuccessMarkerPathFilter());
+        FSDataInputStream is = fs.open(outputFiles[0].getPath());
         CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());
-        
+
         // Just a sanity check, to make sure it was a bzip file; we
         // will do the value verification later
         assertEquals(100, cis.read(new byte[100]));
         cis.close();
-        
+
         pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutput) + "';");
-        
+
         Iterator<Tuple> i = pig.openIterator("B");
         HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
         while (i.hasNext()) {
             Integer val = DataType.toInteger(i.next().get(0));
-            map.put(val, val);            
+            map.put(val, val);
         }
-        
+
         assertEquals(new Integer(99), new Integer(map.keySet().size()));
-        
+
         for (int j = 1; j < 100; j++) {
             assertEquals(new Integer(j), map.get(j));
         }
-        
+
         in.delete();
         out.delete();
     }
@@ -190,15 +194,15 @@ public class TestBZip {
                 "7\t8", // '\n' case
                 "9\t10\r" // '\r\n' at the end of file
         };
-        
+
         // bzip compressed input
         File in = File.createTempFile("junit", ".bz2");
         String compressedInputFileName = in.getAbsolutePath();
         String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
         in.deleteOnExit();
-        
+
         try {
-            CBZip2OutputStream cos = 
+            CBZip2OutputStream cos =
                 new CBZip2OutputStream(new FileOutputStream(in));
             for (int i = 0; i < inputData.length; i++) {
                 StringBuffer sb = new StringBuffer();
@@ -207,31 +211,30 @@ public class TestBZip {
                 cos.write(bytes);
             }
             cos.close();
-            
+
             Util.copyFromLocalToCluster(cluster, compressedInputFileName,
-            		clusterCompressedFilePath);
-            
+                    clusterCompressedFilePath);
+
             // pig script to read compressed input
-            PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
-                    .getProperties());
-            
+            PigServer pig = new PigServer(cluster.getExecType(), properties);
+
             // pig script to read compressed input
             String script ="a = load '" + Util.encodeEscape(clusterCompressedFilePath) +"';";
             pig.registerQuery(script);
-            
+
             pig.registerQuery("store a into 'intermediate.bz';");
             pig.registerQuery("b = load 'intermediate.bz';");
             Iterator<Tuple> it2 = pig.openIterator("b");
-			while (it2.hasNext()) {
-				it2.next();
-			}
+            while (it2.hasNext()) {
+                it2.next();
+            }
         } finally {
             in.delete();
             Util.deleteFile(cluster, "intermediate.bz");
             Util.deleteFile(cluster, "final.bz");
         }
     }
-    /** 
+    /**
      * Tests that '\n', '\r' and '\r\n' are treated as record delims when using
      * bzip just like they are when using uncompressed text
      */
@@ -243,7 +246,7 @@ public class TestBZip {
                 "7\t8", // '\n' case
                 "9\t10\r" // '\r\n' at the end of file
         };
-        
+
         // bzip compressed input
         File in = File.createTempFile("junit", ".bz2");
         String compressedInputFileName = in.getAbsolutePath();
@@ -252,9 +255,9 @@ public class TestBZip {
 
         String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
         Util.createInputFile(cluster, unCompressedInputFileName, inputData);
-        
+
         try {
-            CBZip2OutputStream cos = 
+            CBZip2OutputStream cos =
                 new CBZip2OutputStream(new FileOutputStream(in));
             for (int i = 0; i < inputData.length; i++) {
                 StringBuffer sb = new StringBuffer();
@@ -263,58 +266,56 @@ public class TestBZip {
                 cos.write(bytes);
             }
             cos.close();
-            
+
             Util.copyFromLocalToCluster(cluster, compressedInputFileName,
                     clusterCompressedFilePath);
-            
+
             // pig script to read uncompressed input
             String script = "a = load '" + unCompressedInputFileName +"';";
-            PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
-                    .getProperties());
+            PigServer pig = new PigServer(cluster.getExecType(), properties);
             pig.registerQuery(script);
             Iterator<Tuple> it1 = pig.openIterator("a");
-            
+
             // pig script to read compressed input
             script = "a = load '" + Util.encodeEscape(clusterCompressedFilePath) +"';";
             pig.registerQuery(script);
             Iterator<Tuple> it2 = pig.openIterator("a");
-            
+
             while(it1.hasNext()) {
                 Tuple t1 = it1.next();
                 Tuple t2 = it2.next();
-                Assert.assertEquals(t1, t2);
+                assertEquals(t1, t2);
             }
-            
-            Assert.assertFalse(it2.hasNext());
-        
+
+            assertFalse(it2.hasNext());
+
         } finally {
             in.delete();
             Util.deleteFile(cluster, unCompressedInputFileName);
             Util.deleteFile(cluster, clusterCompressedFilePath);
         }
-        
+
     }
-    
+
     /**
      * Tests the end-to-end writing and reading of an empty BZip file.
      */
      @Test
      public void testEmptyBzipInPig() throws Exception {
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
-                .getProperties());
- 
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
+
         File in = File.createTempFile("junit", ".tmp");
         in.deleteOnExit();
 
         File out = File.createTempFile("junit", ".bz2");
         out.delete();
         String clusterOutputFilePath = Util.removeColon(out.getAbsolutePath());
-        
+
         FileOutputStream fos = new FileOutputStream(in);
         fos.write("55\n".getBytes());
         fos.close();
         System.out.println(in.getAbsolutePath());
-        
+
         pig.registerQuery("AA = load '"
                 + Util.generateURI(Util.encodeEscape(in.getAbsolutePath()), pig.getPigContext())
                 + "';");
@@ -322,21 +323,22 @@ public class TestBZip {
         pig.registerQuery("store A into '" + Util.encodeEscape(clusterOutputFilePath) + "';");
         FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
                 pig.getPigContext().getProperties()));
-        FSDataInputStream is = fs.open(new Path(clusterOutputFilePath +
-                "/part-r-00000.bz2"));
+        FileStatus[] outputFiles = fs.listStatus(new Path(clusterOutputFilePath),
+                Util.getSuccessMarkerPathFilter());
+        FSDataInputStream is = fs.open(outputFiles[0].getPath());
         CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());
-        
+
         // Just a sanity check, to make sure it was a bzip file; we
         // will do the value verification later
         assertEquals(-1, cis.read(new byte[100]));
         cis.close();
-        
+
         pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutputFilePath) + "';");
         pig.openIterator("B");
-        
+
         in.delete();
         Util.deleteFile(cluster, clusterOutputFilePath);
-        
+
     }
 
     /**
@@ -357,7 +359,7 @@ public class TestBZip {
         cis.close();
         tmp.delete();
     }
-    
+
     /**
      * Tests the case where a bzip block ends exactly at the end of the {@link InputSplit}
      * with the block header ending a few bits into the last byte of current
@@ -371,11 +373,11 @@ public class TestBZip {
         // test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
         // In this test we will load test/org/apache/pig/test/data/bzipdir1.bz2 to also
         // test that the BZip2TextInputFormat can read subdirs recursively
-        String inputFileName = 
+        String inputFileName =
             "test/org/apache/pig/test/data/bzipdir1.bz2";
         Long expectedCount = 74999L; // number of lines in above file
-        // the first block in the above file exactly ends a few bits into the 
-        // byte at position 136500 
+        // the first block in the above file exactly ends a few bits into the
+        // byte at position 136500
         int splitSize = 136500;
         try {
             Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
@@ -385,21 +387,21 @@ public class TestBZip {
             Util.deleteFile(cluster, inputFileName);
         }
     }
-    
+
     /**
-     *  Tests the case where a bzip block ends exactly at the end of the input 
+     *  Tests the case where a bzip block ends exactly at the end of the input
      *  split (byte aligned with the last byte) and the last byte is a carriage
      *  return.
      */
     @Test
     public void testBlockHeaderEndingWithCR() throws IOException {
-        String inputFileName = 
+        String inputFileName =
             "test/org/apache/pig/test/data/blockEndingInCR.txt.bz2";
         // number of lines in above file (the value is 1 more than bzcat | wc -l
         // since there is a '\r' which is also treated as a record delim
-        Long expectedCount = 82094L; 
-        // the first block in the above file exactly ends at the byte at 
-        // position 136498 and the last byte is a carriage return ('\r') 
+        Long expectedCount = 82094L;
+        // the first block in the above file exactly ends at the byte at
+        // position 136498 and the last byte is a carriage return ('\r')
         try {
             int splitSize = 136498;
             Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
@@ -408,21 +410,21 @@ public class TestBZip {
             Util.deleteFile(cluster, inputFileName);
         }
     }
-    
+
     /**
      * Tests the case where a bzip block ends exactly at the end of the input
      * split and has more data which results in overcounting (record duplication)
      * in Pig 0.6
-     * 
+     *
      */
     @Test
     public void testBlockHeaderEndingAtSplitOverCounting() throws IOException {
-       
-        String inputFileName = 
+
+        String inputFileName =
             "test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2";
         Long expectedCount = 1041046L; // number of lines in above file
-        // the first block in the above file exactly ends a few bits into the 
-        // byte at position 136500 
+        // the first block in the above file exactly ends a few bits into the
+        // byte at position 136500
         int splitSize = 136500;
         try {
             Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
@@ -431,29 +433,28 @@ public class TestBZip {
             Util.deleteFile(cluster, inputFileName);
         }
     }
-    
-    private void testCount(String inputFileName, Long expectedCount, 
+
+    private void testCount(String inputFileName, Long expectedCount,
             int splitSize, String loadFuncSpec) throws IOException {
         String outputFile = "/tmp/bz-output";
         // simple load-store script to verify that the bzip input is getting
         // split
         String scriptToTestSplitting = "a = load '" +inputFileName + "' using " +
         loadFuncSpec + "; store a into '" + outputFile + "';";
-        
+
         String script = "a = load '" + inputFileName + "';" +
-        		"b = group a all;" +
-        		"c = foreach b generate COUNT_STAR(a);";
+                "b = group a all;" +
+                "c = foreach b generate COUNT_STAR(a);";
         Properties props = new Properties();
-        for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
+        for (Entry<Object, Object> entry : properties.entrySet()) {
             props.put(entry.getKey(), entry.getValue());
         }
         props.setProperty("mapred.max.split.size", Integer.toString(splitSize));
-        PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
-        PigServer pig = new PigServer(pigContext);
+        PigServer pig = new PigServer(cluster.getExecType(), props);
         FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(props));
         fs.delete(new Path(outputFile), true);
         Util.registerMultiLineQuery(pig, scriptToTestSplitting);
-        
+
         // verify that > 1 maps were launched due to splitting of the bzip input
         FileStatus[] files = fs.listStatus(new Path(outputFile));
         int numPartFiles = 0;
@@ -463,40 +464,41 @@ public class TestBZip {
             }
         }
         assertEquals(true, numPartFiles > 0);
-        
+
         // verify record count to verify we read bzip data correctly
         Util.registerMultiLineQuery(pig, script);
         Iterator<Tuple> it = pig.openIterator("c");
         Long result = (Long) it.next().get(0);
         assertEquals(expectedCount, result);
-        
+
     }
-    
+
     @Test
     public void testBzipStoreInMultiQuery() throws Exception {
         String[] inputData = new String[] {
                 "1\t2\r3\t4"
         };
-        
+
         String inputFileName = "input.txt";
         Util.createInputFile(cluster, inputFileName, inputData);
-        
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
-                .getProperties());
-        
+
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
+
         pig.setBatchOn();
         pig.registerQuery("a = load '" +  inputFileName + "';");
         pig.registerQuery("store a into 'output.bz2';");
         pig.registerQuery("store a into 'output';");
         pig.executeBatch();
-        
+
         FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
                 pig.getPigContext().getProperties()));
-        FileStatus stat = fs.getFileStatus(new Path("output/part-m-00000"));        
-        assertTrue(stat.getLen() > 0);     
-        
-        stat = fs.getFileStatus(new Path("output.bz2/part-m-00000.bz2"));
-        assertTrue(stat.getLen() > 0);     
+        FileStatus[] outputFiles = fs.listStatus(new Path("output"),
+                Util.getSuccessMarkerPathFilter());
+        assertTrue(outputFiles[0].getLen() > 0);
+
+        outputFiles = fs.listStatus(new Path("output.bz2"),
+                Util.getSuccessMarkerPathFilter());
+        assertTrue(outputFiles[0].getLen() > 0);
     }
 
     @Test
@@ -504,34 +506,35 @@ public class TestBZip {
         String[] inputData = new String[] {
                 "1\t2\r3\t4"
         };
-        
+
         String inputFileName = "input2.txt";
         Util.createInputFile(cluster, inputFileName, inputData);
-        
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
-                .getProperties());
+
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
         PigContext pigContext = pig.getPigContext();
         pigContext.getProperties().setProperty( "output.compression.enabled", "true" );
         pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec" );
-        
+
         pig.setBatchOn();
         pig.registerQuery("a = load '" +  inputFileName + "';");
         pig.registerQuery("store a into 'output2.bz2';");
         pig.registerQuery("store a into 'output2';");
         pig.executeBatch();
-        
+
         FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
                 pig.getPigContext().getProperties()));
-        FileStatus stat = fs.getFileStatus(new Path("output2/part-m-00000.bz2"));        
-        assertTrue(stat.getLen() > 0);     
-        
-        stat = fs.getFileStatus(new Path("output2.bz2/part-m-00000.bz2"));
-        assertTrue(stat.getLen() > 0);     
+        FileStatus[] outputFiles = fs.listStatus(new Path("output2"),
+                Util.getSuccessMarkerPathFilter());
+        assertTrue(outputFiles[0].getLen() > 0);
+
+        outputFiles = fs.listStatus(new Path("output2.bz2"),
+                Util.getSuccessMarkerPathFilter());
+        assertTrue(outputFiles[0].getLen() > 0);
     }
-    
-    /** 
+
+    /**
      * Tests that Pig throws an Exception when the input files to be loaded are actually
-     * a result of concatenating 2 or more bz2 files. Pig should not silently ignore part 
+     * a result of concatenating 2 or more bz2 files. Pig should not silently ignore part
      * of the input data.
      */
     @Test (expected=IOException.class)
@@ -550,12 +553,12 @@ public class TestBZip {
                 "1\tb",
                 "2\tbb"
         };
-       
+
         // bzip compressed input file1
         File in1 = File.createTempFile("junit", ".bz2");
         String compressedInputFileName1 = in1.getAbsolutePath();
         in1.deleteOnExit();
-        
+
         // file2
         File in2 = File.createTempFile("junit", ".bz2");
         String compressedInputFileName2 = in2.getAbsolutePath();
@@ -563,9 +566,9 @@ public class TestBZip {
 
         String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
         Util.createInputFile(cluster, unCompressedInputFileName, inputDataMerged);
-        
+
         try {
-            CBZip2OutputStream cos = 
+            CBZip2OutputStream cos =
                 new CBZip2OutputStream(new FileOutputStream(in1));
             for (int i = 0; i < inputData1.length; i++) {
                 StringBuffer sb = new StringBuffer();
@@ -574,8 +577,8 @@ public class TestBZip {
                 cos.write(bytes);
             }
             cos.close();
-            
-            CBZip2OutputStream cos2 = 
+
+            CBZip2OutputStream cos2 =
                 new CBZip2OutputStream(new FileOutputStream(in2));
             for (int i = 0; i < inputData2.length; i++) {
                 StringBuffer sb = new StringBuffer();
@@ -589,56 +592,55 @@ public class TestBZip {
             catInto(compressedInputFileName2, compressedInputFileName1);
             Util.copyFromLocalToCluster(cluster, compressedInputFileName1,
                     compressedInputFileName1);
-            
+
             // pig script to read uncompressed input
             String script = "a = load '" + Util.encodeEscape(unCompressedInputFileName) +"';";
-            PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
-                    .getProperties());
+            PigServer pig = new PigServer(cluster.getExecType(), properties);
             pig.registerQuery(script);
             Iterator<Tuple> it1 = pig.openIterator("a");
-            
+
             // pig script to read compressed concatenated input
             script = "a = load '" + Util.encodeEscape(compressedInputFileName1) +"';";
             pig.registerQuery(script);
             Iterator<Tuple> it2 = pig.openIterator("a");
-            
+
             while(it1.hasNext()) {
                 Tuple t1 = it1.next();
                 Tuple t2 = it2.next();
-                Assert.assertEquals(t1, t2);
+                assertEquals(t1, t2);
             }
-            
-            Assert.assertFalse(it2.hasNext());
-        
+
+            assertFalse(it2.hasNext());
+
         } finally {
             in1.delete();
             in2.delete();
             Util.deleteFile(cluster, unCompressedInputFileName);
         }
-        
+
     }
-    
+
     /*
      * Concatenate the contents of src file to the contents of dest file
      */
     private void catInto(String src, String dest) throws IOException {
-    	BufferedWriter out = new BufferedWriter(new FileWriter(dest, true));
-    	BufferedReader in = new BufferedReader(new FileReader(src));
-    	String str;
-    	while ((str = in.readLine()) != null) {
-    		out.write(str);
-    	}
-    	in.close();
-    	out.close();
+        BufferedWriter out = new BufferedWriter(new FileWriter(dest, true));
+        BufferedReader in = new BufferedReader(new FileReader(src));
+        String str;
+        while ((str = in.readLine()) != null) {
+            out.write(str);
+        }
+        in.close();
+        out.close();
     }
-    
+
     // See PIG-1714
     @Test
     public void testBzipStoreInMultiQuery3() throws Exception {
         String[] inputData = new String[] {
                 "1\t2\r3\t4"
         };
-        
+
         String inputFileName = "input3.txt";
         Util.createInputFile(cluster, inputFileName, inputData);
 
@@ -649,25 +651,27 @@ public class TestBZip {
                 "a = load '" + inputFileName + "';\n" +
                 "store a into 'output3.bz2';\n" +
                 "store a into 'output3';";
-        
+
         String inputScriptName = "script3.txt";
         PrintWriter pw = new PrintWriter(new FileWriter(inputScriptName));
         pw.println(inputScript);
         pw.close();
-        
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
-                .getProperties());
-        
+
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
+
         FileInputStream fis = new FileInputStream(inputScriptName);
         pig.registerScript(fis);
-        
+
         FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
                 pig.getPigContext().getProperties()));
-        FileStatus stat = fs.getFileStatus(new Path("output3/part-m-00000.bz2"));        
-        assertTrue(stat.getLen() > 0);     
-        
-        stat = fs.getFileStatus(new Path("output3.bz2/part-m-00000.bz2"));
-        assertTrue(stat.getLen() > 0);     
+        FileStatus[] outputFiles = fs.listStatus(new Path("output3"),
+                Util.getSuccessMarkerPathFilter());
+        assertTrue(outputFiles[0].getLen() > 0);
+
+        outputFiles = fs.listStatus(new Path("output3.bz2"),
+                Util.getSuccessMarkerPathFilter());
+        assertTrue(outputFiles[0].getLen() > 0);
     }
- 
+
 }
+

Modified: pig/trunk/test/org/apache/pig/test/TestBatchAliases.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBatchAliases.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBatchAliases.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBatchAliases.java Fri May 30 19:07:23 2014
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -37,7 +38,7 @@ public class TestBatchAliases {
 
     @Before
     public void setUp() throws Exception {
-        System.setProperty("opt.multiquery", ""+true);
+        System.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true);
         myPig = new PigServer(ExecType.LOCAL, new Properties());
         deleteOutputFiles();
     }

Modified: pig/trunk/test/org/apache/pig/test/TestBestFitCast.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBestFitCast.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBestFitCast.java Fri May 30 19:07:23 2014
@@ -17,16 +17,15 @@ package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
@@ -40,17 +39,20 @@ import org.apache.pig.impl.util.LogUtils
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestBestFitCast {
-    private PigServer pigServer;
-    private static MiniCluster cluster = MiniCluster.buildCluster();
-    String inputFile, inputFile2;
-    int LOOP_SIZE = 20;
+    private static PigServer pigServer;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
+
+    private String inputFile, inputFile2;
+    private int LOOP_SIZE = 20;
 
     @Before
     public void setUp() throws Exception {
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), properties);
         inputFile = "TestBestFitCast-input.txt";
         String[] input = new String[LOOP_SIZE];
         long l = 0;
@@ -73,6 +75,12 @@ public class TestBestFitCast {
         Util.deleteFile(cluster, inputFile2);
     }
 
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+    }
+
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
@@ -233,7 +241,7 @@ public class TestBestFitCast {
         try {
             pigServer.registerQuery("A = LOAD '" + inputFile + "' as (x:float, y);");
             pigServer.registerQuery("B = FOREACH A generate x, " + UDF3.class.getName() + "(x,y);");
-            Iterator<Tuple> iter = pigServer.openIterator("B");
+            pigServer.openIterator("B");
         } catch (Exception e) {
             exceptionCaused = true;
             PigException pe = LogUtils.getPigException(e);
@@ -319,7 +327,7 @@ public class TestBestFitCast {
             pigServer.registerQuery("A = LOAD '" + inputFile + "' as (x, y:int);");
             pigServer.registerQuery("B = FOREACH A generate x, " + UDF3.class.getName()
                     + "(x,y, y);");
-            Iterator<Tuple> iter = pigServer.openIterator("B");
+            pigServer.openIterator("B");
         } catch (Exception e) {
             exceptionCaused = true;
             PigException pe = LogUtils.getPigException(e);
@@ -341,7 +349,7 @@ public class TestBestFitCast {
             pigServer.registerQuery("A = LOAD '" + inputFile + "' as (x, y:long);");
             pigServer.registerQuery("B = FOREACH A generate x, " + UDF3.class.getName()
                     + "(x,y, y);");
-            Iterator<Tuple> iter = pigServer.openIterator("B");
+            pigServer.openIterator("B");
         } catch (Exception e) {
             exceptionCaused = true;
             PigException pe = LogUtils.getPigException(e);
@@ -363,7 +371,7 @@ public class TestBestFitCast {
             pigServer.registerQuery("A = LOAD '" + inputFile + "' as (x, y:double);");
             pigServer.registerQuery("B = FOREACH A generate x, " + UDF3.class.getName()
                     + "(x,y, y);");
-            Iterator<Tuple> iter = pigServer.openIterator("B");
+            pigServer.openIterator("B");
         } catch (Exception e) {
             exceptionCaused = true;
             PigException pe = LogUtils.getPigException(e);
@@ -476,7 +484,7 @@ public class TestBestFitCast {
             pigServer.registerQuery("A = LOAD '" + inputFile2 + "' as (x:float, y, z:int);");
             pigServer.registerQuery("B = FOREACH A generate x, " + UDF3.class.getName()
                     + "(x,y, y);");
-            Iterator<Tuple> iter = pigServer.openIterator("B");
+            pigServer.openIterator("B");
         } catch (Exception e) {
             exceptionCaused = true;
             PigException pe = LogUtils.getPigException(e);

Modified: pig/trunk/test/org/apache/pig/test/TestBinaryExpressionOps.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBinaryExpressionOps.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBinaryExpressionOps.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBinaryExpressionOps.java Fri May 30 19:07:23 2014
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,11 +19,10 @@
 package org.apache.pig.test;
 
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
 import org.junit.AfterClass;
@@ -32,12 +30,11 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestBinaryExpressionOps {
-
-    private static MiniCluster cluster = MiniCluster.buildCluster();
-    
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
     private static final String INPUT_1 = "input1";
     private static final String INPUT_2 = "input2";
-    
+
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
         String[] inputData1 = new String[] {
@@ -46,34 +43,39 @@ public class TestBinaryExpressionOps {
         String[] inputData2 = new String[] {
                 "id1\t2", "id2\t2"
         };
-        Util.createInputFile(cluster, INPUT_1, inputData1);        
+        Util.createInputFile(cluster, INPUT_1, inputData1);
         Util.createInputFile(cluster, INPUT_2, inputData2);
     }
 
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+    }
+
     @AfterClass
     public static void tearDownAfterClass() throws Exception {
         cluster.shutDown();
     }
-    
+
     @Test
     public void testArithmeticOperators() throws Exception {
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-        
+        PigServer pig = new PigServer(cluster.getExecType(), properties);
+
         pig.registerQuery("A = LOAD '" + INPUT_1 + "' AS (id:chararray, val:long);");
         pig.registerQuery("B = LOAD '" + INPUT_2 + "' AS (id:chararray, val:long);");
         pig.registerQuery("C = COGROUP A BY id, B BY id;");
         pig.registerQuery("D = FOREACH C GENERATE group, SUM(B.val), SUM(A.val), "
-                + "(SUM(A.val) - SUM(B.val)), (SUM(A.val) + SUM(B.val)), " 
-                + "(SUM(A.val) * SUM(B.val)), (SUM(A.val) / SUM(B.val)), " 
+                + "(SUM(A.val) - SUM(B.val)), (SUM(A.val) + SUM(B.val)), "
+                + "(SUM(A.val) * SUM(B.val)), (SUM(A.val) / SUM(B.val)), "
                 + "(SUM(A.val) % SUM(B.val)), (SUM(A.val) < 0 ? SUM(A.val) : SUM(B.val));");
-               
-        String[] expectedResults = new String[] {"(id1,2,,,,,,,)", "(id2,2,10,8,12,20,5,0,2)"};
-        Iterator<Tuple> iter = pig.openIterator("D");             
-        int counter = 0;
-        while (iter.hasNext()) { 
-            assertEquals(expectedResults[counter++], iter.next().toString());      
-        }
-        assertEquals(expectedResults.length, counter);
+
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                        "('id1',2L,null,null,null,null,null,null,null)",
+                        "('id2',2L,10L,8L,12L,20L,5L,0L,2L)" });
+        Iterator<Tuple> iter = pig.openIterator("D");
+        Util.checkQueryOutputsAfterSort(iter, expectedResults);
     }
-    
+
 }

Modified: pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri May 30 19:07:23 2014
@@ -121,7 +121,6 @@ import org.apache.pig.data.DefaultBagFac
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -132,17 +131,16 @@ import org.joda.time.DateTimeZone;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestBuiltin {
+    private static PigServer pigServer;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
 
-    PigServer pigServer;
-
-    // This should only be used when absolutely necessary -- eg, when using ReadToEndLoader.
-    private static MiniCluster cluster = MiniCluster.buildCluster();
-
-    TupleFactory tupleFactory = TupleFactory.getInstance();
-    BagFactory bagFactory = DefaultBagFactory.getInstance();
+    private TupleFactory tupleFactory = TupleFactory.getInstance();
+    private BagFactory bagFactory = DefaultBagFactory.getInstance();
 
     // some inputs
     private static Integer[] intInput = { 3, 1, 2, 4, 5, 7, null, 6, 8, 9, 10 };
@@ -203,10 +201,7 @@ public class TestBuiltin {
 
     @Before
     public void setUp() throws Exception {
-        // re initialize FileLocalizer so that each test will run correctly
-        // without any side effect of other tests - this is needed since some
-        // tests are in mapred and some in local mode.
-        FileLocalizer.setInitialized(false);
+        Util.resetStateForExecModeSwitch();
 
         pigServer = new PigServer(ExecType.LOCAL, new Properties());
         pigServer.setValidateEachStatement(true);
@@ -347,14 +342,17 @@ public class TestBuiltin {
         DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.UTC.getOffset(null)));
     }
 
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+    }
+
     @AfterClass
     public static void shutDown() {
         cluster.shutDown();
     }
 
-    /**
-     *
-     */
     private void setupEvalFuncMap() {
         for (String[] aggGroup : aggs) {
             for (String agg : aggGroup) {
@@ -2114,13 +2112,6 @@ public class TestBuiltin {
         assertEquals(0.582222509739582, (Double)ans.get(2) ,0.0005);
     }
 
-    private void checkItemsGT(Iterable<Tuple> tuples, int field, int limit) throws ExecException {
-        for (Tuple t : tuples) {
-            Long val = (Long) t.get(field);
-            assertTrue("Value "+ val + " exceeded the expected limit", val > limit);
-        }
-    }
-
     @Test
     public void testToBag() throws Exception {
         //TEST TOBAG
@@ -2599,7 +2590,7 @@ public class TestBuiltin {
 
         String input3 = "this:has:a:trailing:colon:\n";
         int arity3 = 6;
-        Util.createInputFile(cluster, "input.txt", new String[] {input2});
+        Util.createInputFile(cluster, "input.txt", new String[] {input3});
         LoadFunc p3 = new ReadToEndLoader(new PigStorage(":"), ConfigurationUtil.
             toConfiguration(cluster.getProperties()), "input.txt", 0);
         Tuple f3 = p3.getNext();
@@ -2637,10 +2628,10 @@ public class TestBuiltin {
         assertTrue(f3 == null);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testSFPig() throws Exception {
-        PigServer mrPigServer = new PigServer(ExecType.MAPREDUCE);
+        Util.resetStateForExecModeSwitch();
+        PigServer mrPigServer = new PigServer(cluster.getExecType(), properties);
         String inputStr = "amy\tbob\tcharlene\tdavid\terin\tfrank";
         Util.createInputFile(cluster, "testSFPig-input.txt", new String[]
                                                                     {inputStr});
@@ -2670,7 +2661,6 @@ public class TestBuiltin {
      * unit tests are done in TestStringUDFs
      */
     @Test
-    @SuppressWarnings("unchecked")
     public void testStringUDFs() throws Exception {
         String inputStr = "amy smith ";
         File inputFile = Util.createInputFile("tmp", "testStrUDFsIn.txt", new String[] {inputStr});

Modified: pig/trunk/test/org/apache/pig/test/TestCharArrayToNumeric.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCharArrayToNumeric.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCharArrayToNumeric.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCharArrayToNumeric.java Fri May 30 19:07:23 2014
@@ -25,9 +25,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -40,13 +40,14 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestCharArrayToNumeric {
-    private Double dummyDouble = null;
-    private Float dummyFloat = null;
-    private Long dummyLong = null;
-    private Integer dummyInteger = null;
+    private static PigServer pig;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
+
     private Double MaxDouble = Double.MIN_VALUE;
     private Double MinDouble = Double.MIN_VALUE;
     private Float MaxFloat = Float.MAX_VALUE;
@@ -56,12 +57,15 @@ public class TestCharArrayToNumeric {
     private Integer MaxInteger = Integer.MAX_VALUE;
     private Integer MinInteger = Integer.MIN_VALUE;
 
-    static MiniCluster cluster = MiniCluster.buildCluster();
-    PigServer pig;
-
     @Before
     public void setUp() throws Exception {
-        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pig = new PigServer(cluster.getExecType(), properties);
+    }
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
     }
 
     @AfterClass

Modified: pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCombiner.java Fri May 30 19:07:23 2014
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
@@ -41,27 +40,40 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestCombiner {
+    private static MiniGenericCluster cluster;
+    private static Properties properties;
 
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+    }
 
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
 
+    @Before
+    public void setUp() throws Exception {
+        Util.resetStateForExecModeSwitch();
+    }
+
     @Test
     public void testSuccessiveUserFuncs1() throws Exception {
         String query = "a = load 'students.txt' as (c1,c2,c3,c4); " +
                 "c = group a by c2; " +
                 "f = foreach c generate COUNT(org.apache.pig.builtin.Distinct($1.$2)); " +
                 "store f into 'out';";
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
         PigContext pc = pigServer.getPigContext();
         assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
                 .isEmpty()));
+        pigServer.shutdown();
     }
 
     @Test
@@ -72,41 +84,27 @@ public class TestCombiner {
                 "f = foreach c generate COUNT(" + dummyUDF + "" +
                 "(org.apache.pig.builtin.Distinct($1.$2)," + dummyUDF + "())); " +
                 "store f into 'out';";
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
         PigContext pc = pigServer.getPigContext();
         assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
                 .isEmpty()));
+        pigServer.shutdown();
     }
 
     @Test
     public void testOnCluster() throws Exception {
         // run the test on cluster
-        String inputFileName = runTest(new PigServer(
-                ExecType.MAPREDUCE, cluster.getProperties()));
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
+        String inputFileName = runTest(pigServer);
         Util.deleteFile(cluster, inputFileName);
-
-    }
-
-    /*
-     * (non-Javadoc)
-     * @see junit.framework.TestCase#setUp()
-     */
-    @Before
-    public void setUp() throws Exception {
-        // cause a re initialization of FileLocalizer's
-        // internal state before each test run
-        // A previous test might have been in a different
-        // mode than the test which is about to run. To
-        // ensure each test runs correctly in it's exectype
-        // mode, let's re initialize.
-        FileLocalizer.setInitialized(false);
+        pigServer.shutdown();
     }
 
     @Test
     public void testLocal() throws Exception {
         // run the test locally
         FileLocalizer.deleteTempFiles();
-        runTest(new PigServer(ExecType.LOCAL, new Properties()));
+        runTest(new PigServer("local"));
         FileLocalizer.deleteTempFiles();
     }
 
@@ -133,7 +131,7 @@ public class TestCombiner {
         File inputFile = File.createTempFile("test", "txt");
         inputFile.deleteOnExit();
         String inputFileName = inputFile.getAbsolutePath();
-        if (pig.getPigContext().getExecType() == ExecType.LOCAL) {
+        if (pig.getPigContext().getExecType().isLocal()) {
             PrintStream ps = new PrintStream(new FileOutputStream(inputFile));
             for (String line : inputLines) {
                 ps.println(line);
@@ -171,9 +169,11 @@ public class TestCombiner {
             }
         }
         Util.createInputFile(cluster, "MultiCombinerUseInput.txt", input);
-        Properties props = cluster.getProperties();
-        props.setProperty("io.sort.mb", "1");
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, props);
+        String oldValue = properties.getProperty("io.sort.mb");
+        properties.setProperty("io.sort.mb", "1");
+
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
+        pigServer.getPigContext().getProperties().setProperty("mapred.child.java.opts", "-Xmx1024m");
         pigServer.registerQuery("a = load 'MultiCombinerUseInput.txt' as (x:int);");
         pigServer.registerQuery("b = group a all;");
         pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), " +
@@ -198,6 +198,13 @@ public class TestCombiner {
 
         assertFalse(it.hasNext());
         Util.deleteFile(cluster, "MultiCombinerUseInput.txt");
+        // Reset io.sort.mb to the original value before exit
+        if (oldValue == null) {
+            properties.remove("io.sort.mb");
+        } else {
+            properties.setProperty("io.sort.mb", oldValue);
+        }
+        pigServer.shutdown();
     }
 
     @Test
@@ -213,7 +220,7 @@ public class TestCombiner {
                         "pig1\t20\t3.1" };
 
         Util.createInputFile(cluster, "distinctAggs1Input.txt", input);
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
         pigServer.registerQuery("a = load 'distinctAggs1Input.txt' as (name:chararray, age:int, gpa:double);");
         pigServer.registerQuery("b = group a by name;");
         pigServer.registerQuery("c = foreach b  {" +
@@ -237,14 +244,14 @@ public class TestCombiner {
         while (it.hasNext()) {
             Tuple t = it.next();
             List<Object> fields = t.getAll();
-            Object[] expected = results.get((String)fields.get(0));
+            Object[] expected = results.get(fields.get(0));
             int i = 0;
             for (Object field : fields) {
                 assertEquals(expected[i++], field);
             }
         }
         Util.deleteFile(cluster, "distinctAggs1Input.txt");
-
+        pigServer.shutdown();
     }
 
     @Test
@@ -260,7 +267,7 @@ public class TestCombiner {
         };
 
         Util.createInputFile(cluster, "testGroupElements.txt", input);
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
         pigServer.registerQuery("a = load 'testGroupElements.txt' as (str:chararray, num1:int, alph : chararray, num2 : int);");
         pigServer.registerQuery("b = group a by (str, num1);");
 
@@ -305,7 +312,7 @@ public class TestCombiner {
         Util.checkQueryOutputsAfterSort(it, expectedRes);
 
         Util.deleteFile(cluster, "distinctAggs1Input.txt");
-
+        pigServer.shutdown();
     }
 
     @Test
@@ -321,7 +328,7 @@ public class TestCombiner {
         };
 
         Util.createInputFile(cluster, "testGroupLimit.txt", input);
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
         pigServer.registerQuery("a = load 'testGroupLimit.txt'  using PigStorage(' ') " +
                 "as (str:chararray, num1:int) ;");
         pigServer.registerQuery("b = group a by str;");
@@ -341,7 +348,7 @@ public class TestCombiner {
 
         Iterator<Tuple> it = pigServer.openIterator("d");
         Util.checkQueryOutputsAfterSort(it, expectedRes);
-
+        pigServer.shutdown();
     }
 
     private void checkCombinerUsed(PigServer pigServer, String string, boolean combineExpected)
@@ -370,7 +377,7 @@ public class TestCombiner {
                         "pig1\t20\t3.1" };
 
         Util.createInputFile(cluster, "distinctNoCombinerInput.txt", input);
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
         pigServer.registerQuery("a = load 'distinctNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
         pigServer.registerQuery("b = group a by name;");
         pigServer.registerQuery("c = foreach b  {" +
@@ -391,7 +398,7 @@ public class TestCombiner {
         while (it.hasNext()) {
             Tuple t = it.next();
             List<Object> fields = t.getAll();
-            Object[] expected = results.get((String)fields.get(0));
+            Object[] expected = results.get(fields.get(0));
             int i = 0;
             for (Object field : fields) {
                 if (i == 1) {
@@ -403,7 +410,7 @@ public class TestCombiner {
             }
         }
         Util.deleteFile(cluster, "distinctNoCombinerInput.txt");
-
+        pigServer.shutdown();
     }
 
     @Test
@@ -421,7 +428,7 @@ public class TestCombiner {
                         "pig1\t20\t3.1" };
 
         Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
         pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
         pigServer.registerQuery("b = group a by name;");
         pigServer.registerQuery("c = foreach b  {" +
@@ -442,7 +449,7 @@ public class TestCombiner {
         while (it.hasNext()) {
             Tuple t = it.next();
             List<Object> fields = t.getAll();
-            Object[] expected = results.get((String)fields.get(0));
+            Object[] expected = results.get(fields.get(0));
             int i = 0;
             for (Object field : fields) {
                 if (i == 1) {
@@ -454,7 +461,7 @@ public class TestCombiner {
             }
         }
         Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
-
+        pigServer.shutdown();
     }
 
     @Test
@@ -480,7 +487,7 @@ public class TestCombiner {
         try {
             Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
 
-            PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+            PigServer pigServer = new PigServer(cluster.getExecType(), properties);
             pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
             pigServer.registerQuery("b = group a by name;");
             pigServer.registerQuery("c = foreach b generate group, SUM(a.age), a;");
@@ -494,6 +501,7 @@ public class TestCombiner {
             Iterator<Tuple> it = pigServer.openIterator("c");
             Util.checkQueryOutputsAfterSortRecursive(it, expected,
                     "group:chararray,age:long,b:{t:(name:chararray,age:int,gpa:double)}");
+            pigServer.shutdown();
         } finally {
             Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
         }
@@ -501,6 +509,7 @@ public class TestCombiner {
 
     public static class JiraPig1030 extends EvalFunc<DataBag> {
 
+        @Override
         public DataBag exec(Tuple input) throws IOException {
             return new DefaultDataBag();
         }
@@ -524,7 +533,7 @@ public class TestCombiner {
 
         try {
             Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
-            PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+            PigServer pigServer = new PigServer(cluster.getExecType(), properties);
             pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
             pigServer.registerQuery("b = group a all;");
             pigServer.registerQuery("c = foreach b  {" +
@@ -536,6 +545,7 @@ public class TestCombiner {
             PrintStream ps = new PrintStream(baos);
             pigServer.explain("c", ps);
             assertFalse(baos.toString().matches("(?si).*combine plan.*"));
+            pigServer.shutdown();
         } finally {
             Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
         }

Modified: pig/trunk/test/org/apache/pig/test/TestCompressedFiles.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCompressedFiles.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCompressedFiles.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCompressedFiles.java Fri May 30 19:07:23 2014
@@ -24,10 +24,10 @@ import static org.junit.Assert.assertTru
 import java.io.File;
 import java.io.FileOutputStream;
 import java.util.Iterator;
+import java.util.Properties;
 import java.util.Random;
 import java.util.zip.GZIPOutputStream;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.DIFF;
 import org.apache.pig.data.BagFactory;
@@ -37,13 +37,16 @@ import org.apache.pig.test.utils.TestHel
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestCompressedFiles {
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    private static PigServer pig;
+    private static Properties properties;
+    private static MiniGenericCluster cluster;
 
-    File datFile;
-    File gzFile;
+    private File datFile;
+    private File gzFile;
 
     @Before
     public void setUp() throws Exception {
@@ -74,6 +77,12 @@ public class TestCompressedFiles {
         gzFile.delete();
     }
 
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        properties = cluster.getProperties();
+    }
+
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
@@ -81,7 +90,7 @@ public class TestCompressedFiles {
 
     @Test
     public void testCompressed1() throws Throwable {
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("A = foreach (cogroup (load '"
                 + Util.generateURI(gzFile.toString(), pig.getPigContext())
                 + "') by $1, (load '"
@@ -94,7 +103,7 @@ public class TestCompressedFiles {
 
     @Test
     public void testCompressed2() throws Throwable {
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("A = load '"
                 + Util.generateURI(gzFile.toString(), pig.getPigContext())
                 + "';");