You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/05/08 22:50:06 UTC

svn commit: r1593413 [2/2] - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/tez/ test/ test/org/apache/pig/test/

Modified: pig/branches/tez/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestStreaming.java?rev=1593413&r1=1593412&r2=1593413&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestStreaming.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestStreaming.java Thu May  8 20:50:06 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.test;
 
+import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -24,7 +26,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.PigToStream;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -41,33 +42,31 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT;
-
 public class TestStreaming {
 
-    private static final MiniCluster cluster = MiniCluster.buildCluster();
-    
+    private static final MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+
     private PigServer pigServer;
-    
+
     @Before
     public void setup() throws ExecException {
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
     }
-    
+
     @After
     public void tearDown() {
         pigServer = null;
     }
-    
+
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-    
+
     private TupleFactory tf = TupleFactory.getInstance();
 
 	private static final String simpleEchoStreamingCommand;
-    
+
 	static {
         String quote = "'";
         if (Util.WINDOWS) {
@@ -79,20 +78,20 @@ public class TestStreaming {
 
     private Tuple[] setupExpectedResults(Object[] firstField, Object[] secondField) throws ExecException {
 		Assert.assertEquals(firstField.length, secondField.length);
-		
+
 		Tuple[] expectedResults = new Tuple[firstField.length];
 		for (int i=0; i < expectedResults.length; ++i) {
 			expectedResults[i] = tf.newTuple(2);
 			expectedResults[i].set(0, firstField[i]);
 			expectedResults[i].set(1, secondField[i]);
 		}
-		
+
 		return expectedResults;
 	}
-	
+
 	@Test
 	public void testSimpleMapSideStreaming() throws Exception {
-		File input = Util.createInputFile("tmp", "", 
+		File input = Util.createInputFile("tmp", "",
 				                          new String[] {"A,1", "B,2", "C,3", "D,2",
 				                                        "A,5", "B,5", "C,8", "A,8",
 				                                        "D,8", "A,9"});
@@ -104,14 +103,14 @@ public class TestStreaming {
         for (int i = 0; i < withTypes.length; i++) {
     		Tuple[] expectedResults = null;
             if(withTypes[i] == true) {
-                expectedResults = 
+                expectedResults =
                     setupExpectedResults(expectedFirstFields, expectedSecondFields);
             } else {
-                expectedResults = 
-                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                expectedResults =
+                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                          Util.toDataByteArrays(expectedSecondFields));
             }
-    
+
     		// Pig query to run
             pigServer.registerQuery("IP = load '"
                     + Util.generateURI(Util.encodeEscape(input.toString()),
@@ -127,16 +126,16 @@ public class TestStreaming {
                 pigServer.registerQuery("OP = stream S1 through `" +
     				                simpleEchoStreamingCommand + "`;");
             }
-    		
+
     		// Run the query and check the results
     		Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
         }
 	}
-    
+
 	@Test
-	public void testSimpleMapSideStreamingWithOutputSchema() 
+	public void testSimpleMapSideStreamingWithOutputSchema()
 	throws Exception {
-		File input = Util.createInputFile("tmp", "", 
+		File input = Util.createInputFile("tmp", "",
 				                          new String[] {"A,1", "B,2", "C,3", "D,2",
 				                                        "A,5", "B,5", "C,8", "A,8",
 				                                        "D,8", "A,9"});
@@ -144,16 +143,16 @@ public class TestStreaming {
 		// Expected results
 		Object[] expectedFirstFields = new String[] {"C", "A", "D", "A"};
 		Object[] expectedSecondFields = new Integer[] {8, 8, 8, 9};
-		
+
 		boolean[] withTypes = {true, false};
 		for (int i = 0; i < withTypes.length; i++) {
 		    Tuple[] expectedResults = null;
 		    if(withTypes[i] == true) {
-		        expectedResults = 
+		        expectedResults =
 	                setupExpectedResults(expectedFirstFields, expectedSecondFields);
 		    } else {
-		        expectedResults = 
-                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+		        expectedResults =
+                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                          Util.toDataByteArrays(expectedSecondFields));
 		    }
 	        // Pig query to run
@@ -170,16 +169,16 @@ public class TestStreaming {
 	                                simpleEchoStreamingCommand + "` as (f0, f1);");
 	        }
 	        pigServer.registerQuery("OP = filter STREAMED_DATA by f1 > 6;");
-	        
+
 	        // Run the query and check the results
 	        Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
         }
 	}
 
 	@Test
-	public void testSimpleReduceSideStreamingAfterFlatten() 
+	public void testSimpleReduceSideStreamingAfterFlatten()
 	throws Exception {
-		File input = Util.createInputFile("tmp", "", 
+		File input = Util.createInputFile("tmp", "",
 				                          new String[] {"A,1", "B,2", "C,3", "D,2",
 				                                        "A,5", "B,5", "C,8", "A,8",
 				                                        "D,8", "A,9"});
@@ -191,11 +190,11 @@ public class TestStreaming {
         for (int i = 0; i < withTypes.length; i++) {
             Tuple[] expectedResults = null;
             if(withTypes[i] == true) {
-                expectedResults = 
+                expectedResults =
                     setupExpectedResults(expectedFirstFields, expectedSecondFields);
             } else {
-                expectedResults = 
-                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                expectedResults =
+                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                          Util.toDataByteArrays(expectedSecondFields));
             }
 
@@ -217,7 +216,7 @@ public class TestStreaming {
                 pigServer.registerQuery("OP = stream S1 through `" +
                                     simpleEchoStreamingCommand + "`;");
             }
-    		
+
     		// Run the query and check the results
     		Util.checkQueryOutputsAfterSort(pigServer.openIterator("OP"), expectedResults);
         }
@@ -225,7 +224,7 @@ public class TestStreaming {
 
     @Test
 	public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws Exception {
-		File input = Util.createInputFile("tmp", "", 
+		File input = Util.createInputFile("tmp", "",
 				                          new String[] {"A,1,2,3", "B,2,4,5",
 				                                        "C,3,1,2", "D,2,5,2",
 				                                        "A,5,5,1", "B,5,7,4",
@@ -234,7 +233,7 @@ public class TestStreaming {
 		                                 );
 
 		// Expected results
-		String[] expectedFirstFields = 
+		String[] expectedFirstFields =
 			new String[] {"A", "A", "A", "A", "B", "B", "C", "C", "D", "D"};
 		Integer[] expectedSecondFields = new Integer[] {1, 9, 8, 5, 2, 5, 3, 8, 2, 8};
 		Integer[] expectedThirdFields = new Integer[] {2, 2, 4, 5, 4, 7, 1, 9, 5, 8};
@@ -268,7 +267,7 @@ public class TestStreaming {
                                 simpleEchoStreamingCommand + "`;");
 		pigServer.registerQuery("OP = stream S3 through `" +
 				                simpleEchoStreamingCommand + "` as (f0:chararray, f1:int, f2:int, f3:int);");
-		
+
 		// Run the query and check the results
 		Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
 	}
@@ -276,14 +275,14 @@ public class TestStreaming {
     @Test
     public void testInputShipSpecs() throws Exception {
 
-        File input = Util.createInputFile("tmp", "", 
-                                          new String[] {"A,1", "B,2", "C,3", 
-                                                        "D,2", "A,5", "B,5", 
-                                                        "C,8", "A,8", "D,8", 
+        File input = Util.createInputFile("tmp", "",
+                                          new String[] {"A,1", "B,2", "C,3",
+                                                        "D,2", "A,5", "B,5",
+                                                        "C,8", "A,8", "D,8",
                                                         "A,9"});
 
-        // Perl script 
-        String[] script = 
+        // Perl script
+        String[] script =
             new String[] {
                           "#!/usr/bin/perl",
                           "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -295,29 +294,29 @@ public class TestStreaming {
                          };
         File command1 = Util.createInputFile("script", "pl", script);
         File command2 = Util.createInputFile("script", "pl", script);
-        
+
         // Expected results
-        String[] expectedFirstFields = 
+        String[] expectedFirstFields =
             new String[] {"A", "B", "C", "A", "D", "A"};
         Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
         Tuple[] expectedResults =
-                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                      Util.toDataByteArrays(expectedSecondFields));
 
         // Pig query to run
-        
+
         pigServer.registerQuery(
                 "define CMD1 `perl " + command1.getName() + " foo` " +
                 "ship ('" + Util.encodeEscape(command1.toString()) + "') " +
                 "input('foo' using " + PigStreaming.class.getName() + "(',')) " +
                 "output(stdout using " + PigStreaming.class.getName() + "(',')) " +
-                "stderr();"); 
+                "stderr();");
         pigServer.registerQuery(
                 "define CMD2 `perl " + command2.getName() + " bar` " +
                 "ship ('" + Util.encodeEscape(command2.toString()) + "') " +
                 "input('bar' using " + PigStreaming.class.getName() + "(',')) " +
-                "output(stdout using " + PigStreaming.class.getName() + "(',')) " +        
-                "stderr();"); 
+                "output(stdout using " + PigStreaming.class.getName() + "(',')) " +
+                "stderr();");
         pigServer.registerQuery("IP = load '"
                 + Util.generateURI(Util.encodeEscape(input.toString()),
                         pigServer.getPigContext())
@@ -326,17 +325,17 @@ public class TestStreaming {
         pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
         		                "through CMD1;");
         pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
-        
+
         String output = "/pig/out";
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-        
+
         pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');");
         Iterator<Tuple> iter = pigServer.openIterator("A");
-        
+
         List<Tuple> outputs = new ArrayList<Tuple>();
         while (iter.hasNext()) {
-            outputs.add(iter.next());            
+            outputs.add(iter.next());
         }
 
         // Run the query and check the results
@@ -345,14 +344,14 @@ public class TestStreaming {
 
     @Test
     public void testInputShipSpecsWithUDFDefine() throws Exception {
-        File input = Util.createInputFile("tmp", "", 
-                                          new String[] {"A,1", "B,2", "C,3", 
-                                                        "D,2", "A,5", "B,5", 
-                                                        "C,8", "A,8", "D,8", 
+        File input = Util.createInputFile("tmp", "",
+                                          new String[] {"A,1", "B,2", "C,3",
+                                                        "D,2", "A,5", "B,5",
+                                                        "C,8", "A,8", "D,8",
                                                         "A,9"});
 
-        // Perl script 
-        String[] script = 
+        // Perl script
+        String[] script =
             new String[] {
                           "#!/usr/bin/perl",
                           "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -364,13 +363,13 @@ public class TestStreaming {
                          };
         File command1 = Util.createInputFile("script", "pl", script);
         File command2 = Util.createInputFile("script", "pl", script);
-        
+
         // Expected results
-        String[] expectedFirstFields = 
+        String[] expectedFirstFields =
             new String[] {"A", "B", "C", "A", "D", "A"};
         Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
         Tuple[] expectedResults =
-                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                      Util.toDataByteArrays(expectedSecondFields));
 
         // Pig query to run
@@ -381,31 +380,31 @@ public class TestStreaming {
                 "ship ('" + Util.encodeEscape(command1.toString()) + "') " +
                 "input('foo' using PS )" +
                 "output(stdout using PS ) " +
-                "stderr();"); 
+                "stderr();");
         pigServer.registerQuery(
                 "define CMD2 `perl " + command2.getName() + " bar` " +
                 "ship ('" + Util.encodeEscape(command2.toString()) + "') " +
                 "input('bar' using PS ) " +
-                "output(stdout using PS ) " +        
-                "stderr();"); 
-        pigServer.registerQuery("IP = load '" 
+                "output(stdout using PS ) " +
+                "stderr();");
+        pigServer.registerQuery("IP = load '"
                 + Util.generateURI(Util.encodeEscape(input.toString()),
                         pigServer.getPigContext()) + "' using PigStorage(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
         pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
         		                "through CMD1;");
         pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
-        
+
         String output = "/pig/out";
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-        
+
         pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');");
         Iterator<Tuple> iter = pigServer.openIterator("A");
-        
+
         List<Tuple> outputs = new ArrayList<Tuple>();
         while (iter.hasNext()) {
-            outputs.add(iter.next());    
+            outputs.add(iter.next());
         }
 
         // Run the query and check the results
@@ -413,15 +412,15 @@ public class TestStreaming {
     }
 
     @Test
-    public void testInputCacheSpecs() throws Exception {        
-        File input = Util.createInputFile("tmp", "", 
-                                          new String[] {"A,1", "B,2", "C,3", 
-                                                        "D,2", "A,5", "B,5", 
-                                                        "C,8", "A,8", "D,8", 
+    public void testInputCacheSpecs() throws Exception {
+        File input = Util.createInputFile("tmp", "",
+                                          new String[] {"A,1", "B,2", "C,3",
+                                                        "D,2", "A,5", "B,5",
+                                                        "C,8", "A,8", "D,8",
                                                         "A,9"});
 
-        // Perl script 
-        String[] script = 
+        // Perl script
+        String[] script =
             new String[] {
                           "#!/usr/bin/perl",
                           "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -434,17 +433,17 @@ public class TestStreaming {
         // Copy the scripts to HDFS
         File command1 = Util.createInputFile("script", "pl", script);
         File command2 = Util.createInputFile("script", "pl", script);
-        String c1 = FileLocalizer.hadoopify(command1.toString(), 
+        String c1 = FileLocalizer.hadoopify(command1.toString(),
                                             pigServer.getPigContext());
-        String c2 = FileLocalizer.hadoopify(command2.toString(), 
+        String c2 = FileLocalizer.hadoopify(command2.toString(),
                                             pigServer.getPigContext());
-        
+
         // Expected results
-        String[] expectedFirstFields = 
+        String[] expectedFirstFields =
             new String[] {"A", "B", "C", "A", "D", "A"};
         Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
-        Tuple[] expectedResults = 
-                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+        Tuple[] expectedResults =
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                      Util.toDataByteArrays(expectedSecondFields));
 
         // Pig query to run
@@ -452,12 +451,12 @@ public class TestStreaming {
                 "define CMD1 `perl script1.pl foo` " +
                 "cache ('" + c1 + "#script1.pl') " +
                 "input('foo' using " + PigStreaming.class.getName() + "(',')) " +
-                "stderr();"); 
+                "stderr();");
         pigServer.registerQuery(
                 "define CMD2 `perl script2.pl bar` " +
                 "cache ('" + c2 + "#script2.pl') " +
                 "input('bar' using " + PigStreaming.class.getName() + "(',')) " +
-                "stderr();"); 
+                "stderr();");
         pigServer.registerQuery("IP = load '"
                 + Util.generateURI(Util.encodeEscape(input.toString()),
                         pigServer.getPigContext()) + "' using "
@@ -465,34 +464,34 @@ public class TestStreaming {
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
         pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
                                 "through CMD1;");
-        pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");                
+        pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
 
         String output = "/pig/out";
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-        
+
         pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');");
         Iterator<Tuple> iter = pigServer.openIterator("A");
-        
+
         List<Tuple> outputs = new ArrayList<Tuple>();
         while (iter.hasNext()) {
-            outputs.add(iter.next());            
+            outputs.add(iter.next());
         }
-        
+
         // Run the query and check the results
         Util.checkQueryOutputs(outputs.iterator(), expectedResults);
     }
 
     @Test
 	public void testOutputShipSpecs() throws Exception {
-	    File input = Util.createInputFile("tmp", "", 
-	                                      new String[] {"A,1", "B,2", "C,3", 
-	                                                    "D,2", "A,5", "B,5", 
-	                                                    "C,8", "A,8", "D,8", 
+	    File input = Util.createInputFile("tmp", "",
+	                                      new String[] {"A,1", "B,2", "C,3",
+	                                                    "D,2", "A,5", "B,5",
+	                                                    "C,8", "A,8", "D,8",
 	                                                    "A,9"});
 
-	    // Perl script 
-	    String[] script = 
+	    // Perl script
+	    String[] script =
 	        new String[] {
 	                      "#!/usr/bin/perl",
                           "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
@@ -506,11 +505,11 @@ public class TestStreaming {
 	    File command = Util.createInputFile("script", "pl", script);
 
         // Expected results
-        String[] expectedFirstFields = 
+        String[] expectedFirstFields =
             new String[] {"A", "A", "A", "A", "A", "A"};
         Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10};
-        Tuple[] expectedResults = 
-                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+        Tuple[] expectedResults =
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                      Util.toDataByteArrays(expectedSecondFields));
 
         // Pig query to run
@@ -519,40 +518,40 @@ public class TestStreaming {
                 "ship ('" + Util.encodeEscape(command.toString()) + "') " +
         		"output('foo' using " + PigStreaming.class.getName() + "(','), " +
         		"'bar' using " + PigStreaming.class.getName() + "(',')) " +
-        		"stderr();"); 
-        pigServer.registerQuery("IP = load '" 
+        		"stderr();");
+        pigServer.registerQuery("IP = load '"
                 + Util.generateURI(Util.encodeEscape(input.toString()),
-                        pigServer.getPigContext()) + "' using " 
+                        pigServer.getPigContext()) + "' using "
                 + PigStorage.class.getName() + "(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
-        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");                
-        
+        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
         String output = "/pig/out";
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-        
+
         pigServer.registerQuery("A = load '" + output + "/bar" + "' using PigStorage(',');");
         Iterator<Tuple> iter = pigServer.openIterator("A");
-        
+
         List<Tuple> outputs = new ArrayList<Tuple>();
         while (iter.hasNext()) {
-            outputs.add(iter.next());         
+            outputs.add(iter.next());
         }
-        
+
         // Run the query and check the results
         Util.checkQueryOutputs(outputs.iterator(), expectedResults);
     }
 
     @Test
 	public void testOutputShipSpecsWithUDFDefine() throws Exception {
-	    File input = Util.createInputFile("tmp", "", 
-	                                      new String[] {"A,1", "B,2", "C,3", 
-	                                                    "D,2", "A,5", "B,5", 
-	                                                    "C,8", "A,8", "D,8", 
+	    File input = Util.createInputFile("tmp", "",
+	                                      new String[] {"A,1", "B,2", "C,3",
+	                                                    "D,2", "A,5", "B,5",
+	                                                    "C,8", "A,8", "D,8",
 	                                                    "A,9"});
 
-	    // Perl script 
-	    String[] script = 
+	    // Perl script
+	    String[] script =
 	        new String[] {
 	                      "#!/usr/bin/perl",
                           "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
@@ -566,11 +565,11 @@ public class TestStreaming {
 	    File command = Util.createInputFile("script", "pl", script);
 
         // Expected results
-        String[] expectedFirstFields = 
+        String[] expectedFirstFields =
             new String[] {"A", "A", "A", "A", "A", "A"};
         Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10};
-        Tuple[] expectedResults = 
-                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+        Tuple[] expectedResults =
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                      Util.toDataByteArrays(expectedSecondFields));
 
         // Pig query to run
@@ -582,39 +581,39 @@ public class TestStreaming {
                 "ship ('" + Util.encodeEscape(command.toString()) + "') " +
         		"output('foo' using PS, " +
         		"'bar' using PS) " +
-        		"stderr();"); 
-        pigServer.registerQuery("IP = load '" 
+        		"stderr();");
+        pigServer.registerQuery("IP = load '"
                 + Util.generateURI(Util.encodeEscape(input.toString()),
                         pigServer.getPigContext()) + "' using PigStorage(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
-        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");                
-        
+        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
         String output = "/pig/out";
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-        
+
         pigServer.registerQuery("A = load '" + output + "/bar" + "' using PigStorage(',');");
         Iterator<Tuple> iter = pigServer.openIterator("A");
-        
+
         List<Tuple> outputs = new ArrayList<Tuple>();
         while (iter.hasNext()) {
-            outputs.add(iter.next());          
+            outputs.add(iter.next());
         }
-        
+
         // Run the query and check the results
         Util.checkQueryOutputs(outputs.iterator(), expectedResults);
     }
-    
+
     @Test
     public void testInputOutputSpecs() throws Exception {
-        File input = Util.createInputFile("tmp", "", 
-                                          new String[] {"A,1", "B,2", "C,3", 
-                                                        "D,2", "A,5", "B,5", 
-                                                        "C,8", "A,8", "D,8", 
+        File input = Util.createInputFile("tmp", "",
+                                          new String[] {"A,1", "B,2", "C,3",
+                                                        "D,2", "A,5", "B,5",
+                                                        "C,8", "A,8", "D,8",
                                                         "A,9"});
 
-        // Perl script 
-        String[] script = 
+        // Perl script
+        String[] script =
             new String[] {
                           "#!/usr/bin/perl",
                           "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -630,11 +629,11 @@ public class TestStreaming {
         File command = Util.createInputFile("script", "pl", script);
 
         // Expected results
-        String[] expectedFirstFields = 
+        String[] expectedFirstFields =
             new String[] {"A", "B", "C", "A", "D", "A"};
         Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
-        Tuple[] expectedResults = 
-                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+        Tuple[] expectedResults =
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                      Util.toDataByteArrays(expectedSecondFields));
         // Pig query to run
         pigServer.registerQuery(
@@ -643,79 +642,79 @@ public class TestStreaming {
                 "input('foo' using " + PigStreaming.class.getName() + "(',')) " +
                 "output('bar', " +
                 "'foobar' using " + PigStreaming.class.getName() + "(',')) " +
-                "stderr();"); 
-        pigServer.registerQuery("IP = load '" 
+                "stderr();");
+        pigServer.registerQuery("IP = load '"
                 + Util.generateURI(Util.encodeEscape(input.toString()),
-                        pigServer.getPigContext()) + "' using " 
+                        pigServer.getPigContext()) + "' using "
                 + PigStorage.class.getName() + "(',');");
         pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
-        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");                
-        
+        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
         String output = "/pig/out";
         pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
-        
+
         pigServer.registerQuery("A = load '" + output + "/foobar" + "' using PigStorage(',');");
         Iterator<Tuple> iter = pigServer.openIterator("A");
-        
+
         List<Tuple> outputs = new ArrayList<Tuple>();
         while (iter.hasNext()) {
-            outputs.add(iter.next());            
+            outputs.add(iter.next());
         }
-        
+
         // Run the query and check the results
         Util.checkQueryOutputs(outputs.iterator(), expectedResults);
-        
+
         // Cleanup
         pigServer.deleteFile(output);
     }
 
     @Test
-    public void testSimpleMapSideStreamingWithUnixPipes() 
+    public void testSimpleMapSideStreamingWithUnixPipes()
     throws Exception {
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                                           new String[] {"A,1", "B,2", "C,3", "D,2",
                                                         "A,5", "B,5", "C,8", "A,8",
                                                         "D,8", "A,9"});
 
         // Expected results
-        String[] expectedFirstFields = 
+        String[] expectedFirstFields =
             new String[] {"A", "B", "C", "D", "A", "B", "C", "A", "D", "A"};
         Integer[] expectedSecondFields = new Integer[] {1, 2, 3, 2, 5, 5, 8, 8, 8, 9};
         boolean[] withTypes = {true, false};
         for (int i = 0; i < withTypes.length; i++) {
             Tuple[] expectedResults = null;
             if(withTypes[i] == true) {
-                expectedResults = 
+                expectedResults =
                     setupExpectedResults(expectedFirstFields, expectedSecondFields);
             } else {
-                expectedResults = 
-                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                expectedResults =
+                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                          Util.toDataByteArrays(expectedSecondFields));
             }
 
             // Pig query to run
-            pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand + 
+            pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand +
                                     " | " + simpleEchoStreamingCommand + "`;");
-            pigServer.registerQuery("IP = load '" 
+            pigServer.registerQuery("IP = load '"
                     + Util.generateURI(Util.encodeEscape(input.toString()),
-                            pigServer.getPigContext()) + "' using " 
+                            pigServer.getPigContext()) + "' using "
                     + PigStorage.class.getName() + "(',');");
             if(withTypes[i] == true) {
                 pigServer.registerQuery("OP = stream IP through CMD as (f0:chararray, f1:int);");
             } else {
-                pigServer.registerQuery("OP = stream IP through CMD;");                
+                pigServer.registerQuery("OP = stream IP through CMD;");
             }
-            
+
             // Run the query and check the results
             Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
         }
     }
 
     @Test
-    public void testNegativeLoadStoreOptimization() 
+    public void testNegativeLoadStoreOptimization()
     throws Exception {
-        File input = Util.createInputFile("tmp", "", 
+        File input = Util.createInputFile("tmp", "",
                                           new String[] {"A,1", "B,2", "C,3", "D,2",
                                                         "A,5", "B,5", "C,8", "A,8",
                                                         "D,8", "A,9"});
@@ -727,20 +726,20 @@ public class TestStreaming {
         for (int i = 0; i < withTypes.length; i++) {
             Tuple[] expectedResults = null;
             if(withTypes[i] == true) {
-                expectedResults = 
+                expectedResults =
                     setupExpectedResults(expectedFirstFields, expectedSecondFields);
             } else {
-                expectedResults = 
-                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
+                expectedResults =
+                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields),
                                          Util.toDataByteArrays(expectedSecondFields));
             }
 
             // Pig query to run
-            pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand + 
+            pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand +
                                     "` input(stdin using " + PigStreamDump.class.getName() + ");");
-            pigServer.registerQuery("IP = load '" 
+            pigServer.registerQuery("IP = load '"
                     + Util.generateURI(Util.encodeEscape(input.toString()),
-                            pigServer.getPigContext()) + "' using " 
+                            pigServer.getPigContext()) + "' using "
                     + PigStorage.class.getName() + "(',');");
             pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
             if(withTypes[i] == true) {
@@ -748,18 +747,18 @@ public class TestStreaming {
                                     simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);");
             } else {
                 pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
-                                    simpleEchoStreamingCommand + "`;");                
+                                    simpleEchoStreamingCommand + "`;");
             }
-            
+
             // Run the query and check the results
             Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
         }
     }
-    
+
     @Test
     public void testNegativeMultipleInput() throws IOException {
-        // Perl script 
-        String[] script = 
+        // Perl script
+        String[] script =
             new String[] {
                           "#!/usr/bin/perl",
                           "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -770,14 +769,14 @@ public class TestStreaming {
                           "}",
                          };
         File command1 = Util.createInputFile("script", "pl", script);
-    	String query = 
+    	String query =
                 "define CMD1 `perl " + command1.getName() + " foo` " +
                 "ship ('" + Util.encodeEscape(command1.toString()) + "') " +
                 "input('foo' using " + PigStreaming.class.getName() + "(',')) " +
                 "output(stdout using " + PigStreaming.class.getName() + "(',')) " +
                 "input('foo' using " + PigStreaming.class.getName() + "(',')) " +
-                "stderr();"; 
-    	
+                "stderr();";
+
     	try {
     		pigServer.registerQuery( query );
     	} catch(FrontendException ex) {
@@ -786,16 +785,16 @@ public class TestStreaming {
     		Assert.assertTrue( ex.getMessage().contains( expectedMsg ) );
     		return;
     	}
-    	
+
     	Assert.fail( "Testcase is supposed to fail." );
     }
-    
+
     @Test
     public void testStreamingStderrLogsShouldNotBePersistedByDefault() throws Exception {
 
         Util.createInputFile(cluster, "mydummyinput.txt", new String[] { "dummy"});
 
-        PigServer pig = new PigServer(ExecType.MAPREDUCE,cluster.getProperties());
+        PigServer pig = new PigServer(cluster.getExecType(),cluster.getProperties());
         pig.setBatchOn();
 
         pig.registerQuery("define mycmd `echo dummy` ;");
@@ -887,6 +886,6 @@ public class TestStreaming {
         public byte[] serialize(Tuple t) throws IOException {
             return (TupleFormat.format(t) + recordDelimiter).getBytes();
         }
-        
+
     }
 }

Modified: pig/branches/tez/test/org/apache/pig/test/TestUDF.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestUDF.java?rev=1593413&r1=1593412&r2=1593413&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestUDF.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestUDF.java Thu May  8 20:50:06 2014
@@ -55,11 +55,11 @@ public class TestUDF {
 
     static File TempScriptFile = null;
 
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     @Before
     public void setUp() throws Exception {
-        FileLocalizer.setInitialized(false);
+        Util.resetStateForExecModeSwitch();
         TempScriptFile = File.createTempFile("temp_jira_851", ".pig");
         FileWriter writer = new FileWriter(TempScriptFile);
         for (String line : ScriptStatement) {
@@ -82,6 +82,7 @@ public class TestUDF {
         Iterator<Tuple> iterator = pig.openIterator("B");
         while (iterator.hasNext()) {
             Tuple tuple = iterator.next();
+            @SuppressWarnings("unchecked")
             Map<Object, Object> result = (Map<Object, Object>) tuple.get(0);
             assertEquals(result, MyUDFReturnMap.map);
         }
@@ -92,7 +93,7 @@ public class TestUDF {
         Util.createInputFile(cluster, "a.txt", new String[] { "dummy",
                 "dummy" });
         FileLocalizer.deleteTempFiles();
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
+        PigServer pig = new PigServer(cluster.getExecType(), cluster
                 .getProperties());
         pig.registerQuery("A = LOAD 'a.txt';");
         pig.registerQuery("B = FOREACH A GENERATE org.apache.pig.test.utils.MyUDFReturnMap();");
@@ -100,6 +101,7 @@ public class TestUDF {
         Iterator<Tuple> iterator = pig.openIterator("B");
         while (iterator.hasNext()) {
             Tuple tuple = iterator.next();
+            @SuppressWarnings("unchecked")
             Map<Object, Object> result = (Map<Object, Object>) tuple.get(0);
             assertEquals(result, MyUDFReturnMap.map);
         }
@@ -107,7 +109,7 @@ public class TestUDF {
 
     @Test
     public void testUDFMultiLevelOutputSchema() throws Exception {
-        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pig = new PigServer(cluster.getExecType(), cluster.getProperties());
         pig.registerQuery("A = LOAD 'a.txt';");
         pig.registerQuery("B = FOREACH A GENERATE org.apache.pig.test.utils.MultiLevelDerivedUDF1();");
         pig.registerQuery("C = FOREACH A GENERATE org.apache.pig.test.utils.MultiLevelDerivedUDF2();");
@@ -211,7 +213,7 @@ public class TestUDF {
         pig.dumpSchema("c");
         pig.dumpSchema("d");
     }
-    
+
     @Test
     public void testEvalFuncGetVarArgToFunc() throws Exception {
         String input = "udf_test_jira_3444.txt";
@@ -254,6 +256,7 @@ public class TestUDF {
             return schemaString;
         }
 
+        @Override
         public Schema outputSchema(Schema input) {
             return schema;
         }
@@ -287,7 +290,7 @@ public class TestUDF {
             return l;
         }
     }
-    
+
     public static class UdfWithFuncSpecWithVarArgs extends EvalFunc<Integer> {
         public UdfWithFuncSpecWithVarArgs() {}
 
@@ -302,7 +305,7 @@ public class TestUDF {
             }
             return res;
         }
-        
+
         @Override
         public SchemaType getSchemaType() {
             return SchemaType.VARARG;

Modified: pig/branches/tez/test/org/apache/pig/test/TestUDFContext.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestUDFContext.java?rev=1593413&r1=1593412&r2=1593413&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestUDFContext.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestUDFContext.java Thu May  8 20:50:06 2014
@@ -17,7 +17,9 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.FileWriter;
 import java.util.Iterator;
@@ -29,24 +31,10 @@ import org.apache.pig.builtin.PigStorage
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.util.UDFContext;
-import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.Test;
 
 public class TestUDFContext {
-    
-    static MiniCluster cluster = null;
-    
-    @Before
-    public void setUp() throws Exception {
-        cluster = MiniCluster.buildCluster();
-    }
 
-    @AfterClass
-    public static void oneTimeTearDown() throws Exception {
-        cluster.shutDown();
-    }
-    
     @Test
     public void testUDFContext() throws Exception {
         File a = Util.createLocalInputFile("a.txt", new String[] { "dumb" });
@@ -67,7 +55,7 @@ public class TestUDFContext {
             writer.write(line + "\n");
         }
         writer.close();
-        
+
         pig.registerScript(tmpFile.getAbsolutePath());
         Iterator<Tuple> iterator = pig.openIterator("D");
         while (iterator.hasNext()) {
@@ -81,26 +69,26 @@ public class TestUDFContext {
         	assertEquals(tuple.get(3).toString(), "five");
         }
     }
-    
-    
+
+
     /**
-     * Test that UDFContext is reset each time the plan is regenerated 
+     * Test that UDFContext is reset each time the plan is regenerated
      * @throws Exception
      */
     @Test
     public void testUDFContextReset() throws Exception {
         PigServer pig = new PigServer(ExecType.LOCAL);
         pig.registerQuery(" l = load 'file' as (a :int, b : int, c : int);");
-        pig.registerQuery(" f = foreach l generate a, b;");        
+        pig.registerQuery(" f = foreach l generate a, b;");
         pig.explain("f", System.out);
         Properties props = UDFContext.getUDFContext().getUDFProperties(PigStorage.class);
 
         // required fields property should be set because f results does not
         // require the third column c, so properties should not be null
         assertTrue(
-                "properties in udf context for load should not be empty: "+props, 
+                "properties in udf context for load should not be empty: "+props,
                 props.keySet().size()>0);
-        
+
         // the new statement for alias f below will require all columns,
         // so this time required fields property for loader should not be set
         pig.registerQuery(" f = foreach l generate a, b, c;");
@@ -108,10 +96,10 @@ public class TestUDFContext {
         props = UDFContext.getUDFContext().getUDFProperties(PigStorage.class);
 
         assertTrue(
-                "properties in udf context for load should be empty: "+props, 
+                "properties in udf context for load should be empty: "+props,
                 props.keySet().size() == 0);
 
-        
+
     }
-    
+
 }

Modified: pig/branches/tez/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/Util.java?rev=1593413&r1=1593412&r2=1593413&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/Util.java Thu May  8 20:50:06 2014
@@ -45,8 +45,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -105,6 +103,8 @@ import org.apache.pig.newplan.logical.vi
 import org.apache.pig.parser.ParserException;
 import org.apache.pig.parser.QueryParserDriver;
 import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.junit.Assert;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
@@ -486,10 +486,8 @@ public class Util {
     */
     static public void checkQueryOutputs(Iterator<Tuple> actualResults,
                                     Tuple[] expectedResults) {
-        for (Tuple expected : expectedResults) {
-            Tuple actual = actualResults.next();
-            Assert.assertEquals(expected.toString(), actual.toString());
-        }
+        checkQueryOutputs(actualResults, Arrays.asList(expectedResults));
+
     }
 
     /**
@@ -501,8 +499,13 @@ public class Util {
      */
      static public void checkQueryOutputs(Iterator<Tuple> actualResults,
                                      List<Tuple> expectedResults) {
-
-         checkQueryOutputs(actualResults,expectedResults.toArray(new Tuple[expectedResults.size()]));
+         int count = 0;
+         for (Tuple expected : expectedResults) {
+             Tuple actual = actualResults.next();
+             count++;
+             Assert.assertEquals(expected.toString(), actual.toString());
+         }
+         Assert.assertEquals(expectedResults.size(), count);
      }
 
     /**
@@ -1288,7 +1291,7 @@ public class Util {
     }
 
     /**
-     * 
+     *
      * @param expected
      *            Exception class that is expected to be thrown
      * @param found
@@ -1301,4 +1304,18 @@ public class Util {
         assertEquals(expected, found.getClass());
         assertEquals(found.getMessage(), message);
     }
+
+    /**
+     * Called to reset ThreadLocal or static states that PigServer depends on
+     * when a test suite has testcases switching between LOCAL and MAPREDUCE/TEZ
+     * execution modes
+     */
+    public static void resetStateForExecModeSwitch() {
+        FileLocalizer.setInitialized(false);
+        // TODO: once we have Tez local mode, we can get rid of this. For now,
+        // if we run this test suite in Tez mode and there are some tests
+        // in LOCAL mode, we need to set ScriptState to
+        // null to force ScriptState gets initialized every time.
+        ScriptState.start(null);
+    }
 }

Modified: pig/branches/tez/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/tez-tests?rev=1593413&r1=1593412&r2=1593413&view=diff
==============================================================================
--- pig/branches/tez/test/tez-tests (original)
+++ pig/branches/tez/test/tez-tests Thu May  8 20:50:06 2014
@@ -9,13 +9,28 @@
 **/TestCompressedFiles.java
 **/TestCustomPartitioner.java
 **/TestEvalPipeline.java
+**/TestFilterUDF.java
+**/TestFinish.java
+**/TestForEachNestedPlan.java
+**/TestLoad.java
+**/TestLocalRearrange.java
+**/TestMapReduce.java
+**/TestMapReduce2.java
 **/TestNestedForeach.java
 **/TestPigContext.java
 **/TestPigServer.java
 **/TestPigStorage.java
-**/TestSecondarySortTez.java
+**/TestSample.java
+**/TestSchema.java
+**/TestScriptLanguageJavaScript.java
+**/TestScriptUDF.java
 **/TestSkewedJoin.java
 **/TestSplitStore.java
+**/TestStoreOld.java
+**/TestStreaming.java
+**/TestUDF.java
+**/TestUDFContext.java
+**/TestSecondarySortTez.java
 **/TestTezCompiler.java
 **/TestTezJobControlCompiler.java
-**/TestTezLauncher.java
+**/TestTezLauncher.java
\ No newline at end of file