You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 08:19:46 UTC

svn commit: r1784237 [14/22] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Modified: pig/branches/spark/test/e2e/pig/tests/turing_jython.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/turing_jython.conf?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/turing_jython.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/turing_jython.conf Fri Feb 24 08:19:42 2017
@@ -50,7 +50,7 @@ d = filter b by age < 50;
 e = cogroup c by (name, age), d by (name, age) ;
 f = foreach e generate flatten(c), flatten(d);
 g = group f by registration;
-h = foreach g generate group, SUM(f.d::contributions);
+h = foreach g generate group, (float) ROUND(SUM(f.d::contributions) * 100) / 100.0;
 i = order h by $1;
 store i into '$out';
 """).bind({'in1':input1,'in2':input2, 'out':output}).runSingle() 
@@ -68,7 +68,7 @@ else:
                                          e = cogroup c by (name, age), d by (name, age) ;
                                          f = foreach e generate flatten(c), flatten(d);
                                          g = group f by registration;
-                                         h = foreach g generate group, SUM(f.d::contributions);
+                                         h = foreach g generate group, (float) ROUND(SUM(f.d::contributions) * 100) / 100.0;
                                          i = order h by $1;
                                          store i into ':OUTPATH:';
 \,
@@ -92,38 +92,12 @@ hdfs = FileSystem.get(config)
                 }
                 ]
 
-                },
-                {
-                'name' => 'Jython_Embedded',
-                'tests' => [
-                        {
-                        'num' => 1, 
-			,'pig' => q\#!/usr/bin/python
-# JYTHON COMMENT
-from org.apache.pig.scripting import Pig
-
-P = Pig.compile("""A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); 
-store A into ':OUTPATH:';""")
-
-Q = P.bind()
-
-result = Q.runSingle()
-
-if result.isSuccessful():
-	print "Pig job PASSED"
-
-else:
-	raise "Pig job FAILED"    
-\,
-                        'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); 
-                                                 store A into ':OUTPATH:';\,
-                        }
-                        ]
 		}, {
 
 			'name' => 'Jython_CompileBindRun'
 		       ,'tests' => [
-				{ # bind() with no parameters, runSingle
+				{
+                                # bind no parameters, runSingle
 				'num' => 1
 				,'pig' => q\#!/usr/bin/python
 # JYTHON COMMENT
@@ -150,7 +124,7 @@ else:
 				,'delimiter' => '	'
 
 				},{
-#  	9.2 	1 	bind single input parameter and no output parameters 	 
+                        # bind single input parameter
 			'num' => 2
 			,'pig' => q\#!/usr/bin/python
 
@@ -179,7 +153,7 @@ else:
 
 #                       ,'expected_out_regex' => "Pig job PASSED"
 			},{
-#  		bind parallel execution with a multiple entries
+                        # bind parallel execution with a multiple entries
 			'num' => 3
 			,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -231,9 +205,7 @@ for i in [0, 1, 2]:
 \,
 
 			},{
-#  8.6 	compile pig script file with no input and no output parameters
-#12.2 	import python modules 
-#
+                # compile pig script file with no parameters
 		'num' => 4
 		,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -243,6 +215,7 @@ pig_script = ":TMP:/script.pig"
 pigfile = open( pig_script, 'w+')
 pigfile.write("""
 A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+-- a comment
 store A into ':OUTPATH:';
 """)
 pigfile.close()
@@ -263,7 +236,7 @@ else:
                         ,'floatpostprocess' => 1
                         ,'delimiter' => '	'
 		},{
-#  8.7 	compile pig script file with no input and with output parameters
+                # compile pig script file with parameters
 		'num' => 5
 		,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -300,7 +273,7 @@ else:
                         ,'delimiter' => '	'
 
 		},{
-            # 11.15   1  	 results.getResults(alias) for null results
+                    # results.getResults(alias) for null results
 		    'num' => 6
 		    ,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -318,7 +291,7 @@ result = P.bind().runSingle()
                                       store EMPTY into ':OUTPATH:';\
 		},
         {
-            # bind reading from python context
+                    # bind parameters from python context
 		    'num' => 7
 		    ,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -340,7 +313,7 @@ result = P.bind().runSingle()
                                       store B into ':OUTPATH:';\
 
 		},{
-            # bind multiple times
+                    # bind multiple times
 		    'num' => 8
 		    ,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -367,56 +340,8 @@ for i in [1,2,3]:
                                       B= foreach A generate age + 3;
                                       store B into ':OUTPATH:.3';\,
 
-		},
-        {
-            # invoke .run() on a non-parallel pig script
-		    'num' => 9
-		    ,'pig' => q\#!/usr/bin/python
-from org.apache.pig.scripting import Pig
-
-P = Pig.compile("""
-A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-store A into ':OUTPATH:';
-""")
-result = P.bind().run()
-\,
-             'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-                                      store A into ':OUTPATH:';\,
- 	    },
-        {
-#  8.6 	compile pig script file with no input and no output parameters
-#12.2 	import python modules 
-#
-		'num' => 10
-		,'pig' => q\#!/usr/bin/python
-from org.apache.pig.scripting import Pig
-
-#create pig script
-pig_script = ":TMP:/script.pig"
-pigfile = open( pig_script, 'w+')
-pigfile.write("""
-A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
--- a comment
-store A into ':OUTPATH:';
-""")
-pigfile.close()
-
-#execute pig script
-
-result = Pig.compileFromFile( pig_script ).bind().runSingle()
-
-if result.isSuccessful():
-    print "Pig job PASSED"
-else:
-    raise "Pig job FAILED"    
-\,
-
-             'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-                                      store A into ':OUTPATH:';
-\
-                        ,'floatpostprocess' => 1
-                        ,'delimiter' => '	'
 	},{
+                # python script with parameters
                 'num' => 11
                 ,'pig_params' => ['-p', qq(loadfile='studenttab10k')],
                 ,'pig' => q\#!/usr/bin/python
@@ -441,6 +366,7 @@ else:
                         ,'floatpostprocess' => 1
                         ,'delimiter' => '       '
         },{
+                # python script with parameter file
                 'num' => 12
                 ,'pig_params' => ['-m', ":PARAMPATH:/params_3"],
                 ,'pig' => q\#!/usr/bin/python
@@ -465,6 +391,7 @@ else:
                         ,'floatpostprocess' => 1
                         ,'delimiter' => '       '
 	},{
+                # python script with command line arguments
                 'num' => 13
                 ,'additional_cmd_args' => ['studenttab10k']
                 ,'pig' => q\#!/usr/bin/python
@@ -495,7 +422,7 @@ else:
 	'name' => 'Jython_Diagnostics'
        ,'tests' => [
                 {
-#  11.23 	1 	explain() on a complex query 	 
+                # explain() on a complex query
 		'num' => 1
 		,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -525,7 +452,7 @@ result = P.bind({'in1':input1, 'in2':inp
                        ,'rc'=> 0
 
 	}, {
-#11.22 	1 	illustrate() on a complex query 	 
+                # illustrate() on a complex query 	 
 		'num' => 2
 		,'execonly' => 'mapred,local' #TODO: PIG-3993: Illustrate is yet to be implemented in Tez
 		,'pig' => q\#!/usr/bin/python
@@ -555,7 +482,7 @@ result = P.bind({'in1':input1, 'in2':inp
                        ,'rc'=> 0
                        ,'expected_out_regex' => "A.*name:bytearray.*age:bytearray.*gpa:bytearray"
 	}, {
-#  11.24 	1 	describe() on an alias 	 
+                # describe() on an alias 	 
 		'num' => 3
 		,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -583,7 +510,7 @@ result = P.bind({'in1':input1, 'in2':inp
                        ,'rc'=> 0
                        ,'expected_out_regex' => "A:.*{name:.*bytearray,age:.*bytearray,gpa:.*bytearray}"
 	}, {
-#11.29 	1 	describe() on an undefined alias 
+                # describe() on an undefined alias 
 		'num' => 4
 		,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -613,7 +540,7 @@ result = P.bind({'in1':input1, 'in2':inp
 
 
 	}, {
-#  11.27 	1 	illustrate(alias) 	 
+                # illustrate(alias) 	 
 		'num' => 5
 		,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -643,7 +570,7 @@ result = P.bind({'in1':input1, 'in2':inp
                        ,'expected_err_regex' => "ERROR 1121"
 
 	}, {
-#  11.28 	1 	explain(alias) 
+                # explain(alias) 
 		'num' => 6
 		,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -710,14 +637,10 @@ Pig.fs("-copyFromLocal :TMP:/iterator_ou
 		},
 	]
 	}, {
-# 12.2 import python modules 	 
-# 12.1 python comments 	 
-# 12.6 fs lists a file 	 
-
-
 	'name' => 'Jython_Misc'
        ,'tests' => [
                 {
+                # fs commands: lists a file 	 
 		'num' => 1
 		,'pig' => q\#!/usr/bin/python
 # JYTHON COMMENT
@@ -778,8 +701,8 @@ P.bind().runSingle()
                 'name' => 'Jython_Properties',
                 'tests' => [
                         {
+                        # check if property is passed to Pig
                         'num' => 1 
-                        ,'ignore' => 1 # This is a good test except that we can't verify it.
 		,'pig' => q\#!/usr/bin/python
 # JYTHON COMMENT
 from org.apache.pig.scripting import Pig
@@ -791,7 +714,7 @@ store A into ':OUTPATH:';""")
 Q = P.bind()
 
 prop = Properties()
-prop.put("mapred.job.name", "friendship")
+prop.put("pig.default.load.func", "wrong")
 result = Q.runSingle(prop)
 
 if result.isSuccessful():
@@ -799,10 +722,8 @@ if result.isSuccessful():
 else:
 	raise "Pig job FAILED"    
 \
-
-                        ,'sql' => "select name, age, gpa+0.00 from studenttab10k;" 
-                        ,'floatpostprocess' => 1
-                        ,'delimiter' => '	'
+                       ,'rc'=> 6
+                       ,'expected_err_regex' => "ERROR 1070: Could not resolve wrong using imports"
 
 	}
 	]
@@ -811,7 +732,7 @@ else:
                 'name' => 'Jython_Error',
                 'tests' => [
 	                {
-                        # run a script that returns single negative result 
+                                    # run a script that returns single negative result 
 			            'num' => 1
 			            ,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
@@ -834,103 +755,18 @@ else:
 
 		                ,'rc' => 6
                         ,'expected_err_regex' => "ERROR 1121"
-			        },
-                    {
-                        # run a script that returns single negative result 
-			            'num' => 2
-			            ,'pig' => q\#!/usr/bin/python
-from org.apache.pig.scripting import Pig
-
-input= ":INPATH:/singlefile/studenttab10k"
-output = ":OUTPATH:"
-
-P = Pig.compile("""A = load '$in' as (name, age, gpa); store A into '$out';""")
-
-Q = P.bind({'in':input, 'out':bad_output}) 
-
-result = Q.runSingle()
-
-if result.isSuccessful():
-    print "Pig job PASSED"
-
-else:
-    raise "Pig job FAILED"    
-\ 
-
-                        ,'rc' => 6
-                        ,'expected_err_regex' => "name 'bad_output' is not defined"
 			        },{
-                        # bind an undefined input parameter 	 
-		                'num' => 3
-		                ,'pig' => q\#!/usr/bin/python
-from org.apache.pig.scripting import Pig
-
-input= ":INPATH:/singlefile/studenttab10k"
-output = ":OUTPATH:"
-
-P = Pig.compile("""A = load '$in' as (name, age, gpa); store A into '$out';""")
-
-Q = P.bind({'in':invalid_parameter, 'out':output}) 
-
-result = Q.runSingle()
-
-if result.isSuccessful():
-    print "Pig job PASSED"
-
-else:
-    raise "Pig job FAILED"    
-\
-
-                        ,'expected_err_regex' => "ERROR 1121"
-                       ,'rc'=> 6
-
-		            },
-                    {
-                        #  compileFromFile for pig script file that does not exist throws IOException 	 
+                                # compileFromFile for pig script file that does not exist throws IOException 	 
 		                'num' => 4
 		                ,'pig' => q\#!/usr/bin/python
+import os
 from org.apache.pig.scripting import Pig
 
 # intentionally don't create pig script
 
-pig_script = tmp_dir + "/script.pig"
-
-#execute pig script
-input1= ":INPATH:/singlefile/studenttab10k"
-input2= ":INPATH:/singlefile/votertab10k"
-output1= ":OUTPATH:.1"
-output2= ":OUTPATH:.2"
-
-result = Pig.compileFromFile(pig_script).bind({'in1':input1,'in2':input2, 'out1':output1, 'out2':output2 }).run()
-
-if result.isSuccessful():
-    print "Pig job PASSED"
-
-else:
-    raise "Pig job FAILED"    
-\
-
-                       ,'expected_err_regex' => "ERROR 1121"
-                       ,'rc'=> 6
-		            },
-                    {
-                        #  compileFromFile for pig script file that does not have read permissions throws IOException 	
-		                'num' => 5
-		                ,'pig' => q\#!/usr/bin/python
-from org.apache.pig.scripting import Pig
-
-#create pig script
-
 pig_script = ":TMP:/script.pig"
-pigfile = open( pig_script, 'w')
-#no read permissions and file is left open until afer compile statement
-pigfile.write("""
-A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-B = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
-store A into '$out1';
-store B into '$out2';
-""")
-pigfile.close()
+
+os.remove(pig_script)
 
 #execute pig script
 input1= ":INPATH:/singlefile/studenttab10k"
@@ -938,11 +774,9 @@ input2= ":INPATH:/singlefile/votertab10k
 output1= ":OUTPATH:.1"
 output2= ":OUTPATH:.2"
 
-result = Pig.compileFromFile(pig_script).bind({'in1':input1,'in2':input2, 'out1':output1, 'out2':output2 }).run()
-
-pigfile.close()
+results = Pig.compileFromFile(pig_script).bind({'in1':input1,'in2':input2, 'out1':output1, 'out2':output2 }).run()
 
-if result.isSuccessful():
+if results[0].isSuccessful():
     print "Pig job PASSED"
 
 else:
@@ -977,13 +811,16 @@ else:
                        ,'expected_err_regex' => "ERROR 1121"
                     },
                     {
-                        # 11.10 iter.next for an alias that is undefined 
+                        # iter.next for an alias that is undefined 
 		                'num' => 7
 		                ,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
 
 #create pig script
 
+out1= ":OUTPATH:.1"
+out2= ":OUTPATH:.2"
+
 P = Pig.compile("""A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 B= filter A by age < 50;
 store B into '$out1';
@@ -992,9 +829,10 @@ D = filter C by name matches '^fred*';
 store D into '$out2';
 """)
 
-result = P.bind().run()
+results = P.bind().run()
+iter = results[0].result("E").iterator()
 
-if result.isSuccessful():
+if results[0].isSuccessful():
     print "Pig job PASSED"
 
 else:
@@ -1010,30 +848,6 @@ else:
                 'tests' => [
                     {
                         # sql command
-		                'num' => 1
-                                ,'java_params' => ['-Dhcat.bin=:HCATBIN:']
-		                ,'pig' => q\#!/usr/bin/python
-from org.apache.pig.scripting import Pig
-
-#create pig script
-
-Pig.sql("""sql drop table if exists pig_script_hcat_ddl_1;""")
-ret = Pig.sql("""sql create table pig_script_hcat_ddl_1(name string,
-age int,
-gpa double)
-stored as textfile;
-""")
-
-if ret==0:
-    print "SQL command PASSED"
-
-else:
-    raise "SQL command FAILED"    
-\
-                       ,'rc' => 0
-                    },
-                    {
-                        # sql command
 		                'num' => 2
 		                ,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig

Modified: pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl (original)
+++ pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl Fri Feb 24 08:19:42 2017
@@ -41,7 +41,6 @@ our @lastName = ("allen", "brown", "cars
 #	rankaacd: RANK BY a ASC , c DESC
 #	rankaaba: RANK BY a ASC , b ASC
 #	a,b,c:    values
-#	tail:     long value in order to create multiple mappers
 ############################################################################
 our @rankedTuples = (
 	"1,21,5,7,1,1,0,8,8","2,26,2,3,2,5,1,9,10","3,30,24,21,2,3,1,3,10","4,6,10,8,3,4,1,7,2",
@@ -501,22 +500,10 @@ sub getBulkCopyCmd(){
             my $randf = rand(10);
             printf HDFS "%d:%d:%d:%d:%d:%dL:%.2ff:%.2f\n", $tid, $i, $rand5, $rand100, $rand1000, $rand1000, $randf, $randf;
         }
-    }  elsif ($filetype eq "ranking") {
+    } elsif ($filetype eq "ranking") {
         for (my $i = 0; $i < $numRows; $i++) {
             my $tuple = $rankedTuples[int($i)];
-            printf HDFS "$tuple,";
-            for my $j ( 0 .. 1000000) {
-				printf HDFS "%d",$j;
-			}
-			printf HDFS "\n";
-        }
-    } elsif ($filetype eq "biggish") {
-        for (my $i = 1; $i < $numRows; $i++) {
-            printf HDFS "$i,$i,";
-            for my $j ( 0 .. 1000) {
-				printf HDFS "%d",$j;
-            }
-            printf HDFS "\n";
+            printf HDFS "$tuple\n";
         }
     } elsif ($filetype eq "utf8Student") {
         srand(3.14159 + $numRows);

Modified: pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java (original)
+++ pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java Fri Feb 24 08:19:42 2017
@@ -360,7 +360,7 @@ public class TestLoadStoreFuncLifeCycle
             // result, the number of StoreFunc instances is greater by 1 in
             // Hadoop-2.0.x.
             assertTrue("storer instanciation count increasing: " + Storer.count,
-                    Storer.count <= (org.apache.pig.impl.util.Utils.isHadoop2() ? 5 : 4));
+                    Storer.count <= 5);
         }
     }
 

Modified: pig/branches/spark/test/org/apache/pig/TestMain.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/TestMain.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/TestMain.java (original)
+++ pig/branches/spark/test/org/apache/pig/TestMain.java Fri Feb 24 08:19:42 2017
@@ -24,8 +24,10 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.BufferedWriter;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileWriter;
+import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
@@ -35,6 +37,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.parser.ParserException;
 import org.apache.pig.parser.SourceLocation;
 import org.apache.pig.test.TestPigRunner.TestNotificationListener;
+import org.apache.pig.test.Util;
 import org.apache.pig.tools.parameters.ParameterSubstitutionException;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.Test;
@@ -152,6 +155,35 @@ public class TestMain {
     }
 
 
+    @Test
+    public void testParseInputScript() throws Exception {
+        File input = Util.createInputFile("tmp", "",
+                new String[]{"{(1,1.0)}\ttestinputstring1",
+                        "{(2,2.0)}\ttestinputstring1",
+                        "{(3,3.0)}\ttestinputstring1",
+                        "{(4,4.0)}\ttestinputstring1"}
+        );
+        File out = new File(System.getProperty("java.io.tmpdir")+"/testParseInputScriptOut");
+        File scriptFile = Util.createInputFile("pigScript", "",
+                new String[]{"A = load '"+input.getAbsolutePath()+"' as (a:{(x:chararray, y:float)}, b:chararray);",
+                        "B = foreach A generate\n" +
+                                "    b,\n" +
+                                "    (bag{tuple(long)}) a.x as ax:{(x:long)};",
+                        "store B into '"+out.getAbsolutePath()+"';"}
+        );
+
+        Main.run(new String[]{"-x", "local", scriptFile.getAbsolutePath()}, null);
+        BufferedReader file = new BufferedReader(new FileReader(new File(out.getAbsolutePath()+"/part-m-00000")));
+        String line;
+        int count = 0;
+        while(( line = file.readLine()) != null) {
+            count++;
+        }
+        assertEquals(4,count);
+        Util.deleteDirectory(new File(out.getAbsolutePath()));
+        assertTrue(!new File(out.getAbsolutePath()).exists());
+    }
+
     public static class TestNotificationListener2 extends TestNotificationListener {
         protected boolean hadArgs = false;
         public TestNotificationListener2() {}

Modified: pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java Fri Feb 24 08:19:42 2017
@@ -709,6 +709,19 @@ public class TestAvroStorage {
     }
 
     @Test
+    public void testGroupWithRepeatedSubRecords() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+      final String check = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+      testAvroStorage(true, basedir + "code/pig/group_test.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordWithRepeatedSubRecords.avsc",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test
     public void testLoadDirectory() throws Exception {
       final String input = basedir + "data/avro/uncompressed/testdirectory";
       final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";

Modified: pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java Fri Feb 24 08:19:42 2017
@@ -195,7 +195,7 @@ public class TestOrcStorage {
         Reader reader = OrcFile.createReader(fs, Util.getFirstPartFile(new Path(OUTPUT1)));
         assertEquals(reader.getNumberOfRows(), 2);
 
-        RecordReader rows = reader.rows(null);
+        RecordReader rows = reader.rows();
         Object row = rows.next(null);
         StructObjectInspector soi = (StructObjectInspector)reader.getObjectInspector();
         IntWritable intWritable = (IntWritable)soi.getStructFieldData(row,
@@ -291,7 +291,7 @@ public class TestOrcStorage {
         ObjectInspector oi = orcReader.getObjectInspector();
         StructObjectInspector soi = (StructObjectInspector) oi;
 
-        RecordReader reader = orcReader.rows(null);
+        RecordReader reader = orcReader.rows();
         Object row = null;
 
         while (reader.hasNext()) {
@@ -326,9 +326,9 @@ public class TestOrcStorage {
         Reader orcReaderActual = OrcFile.createReader(fs, orcFile);
         StructObjectInspector soiActual = (StructObjectInspector) orcReaderActual.getObjectInspector();
 
-        RecordReader readerExpected = orcReaderExpected.rows(null);
+        RecordReader readerExpected = orcReaderExpected.rows();
         Object expectedRow = null;
-        RecordReader readerActual = orcReaderActual.rows(null);
+        RecordReader readerActual = orcReaderActual.rows();
         Object actualRow = null;
 
         while (readerExpected.hasNext()) {

Added: pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig (added)
+++ pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig Fri Feb 24 08:19:42 2017
@@ -0,0 +1,5 @@
+in = LOAD '$INFILE' USING AvroStorage();
+grouped = GROUP in BY (value1.thing);
+flattened = FOREACH grouped GENERATE flatten(in) as (key: chararray,value1: (thing: chararray,count: int),value2: (thing: chararray,count: int));
+RMF $OUTFILE;
+STORE flattened INTO '$OUTFILE' USING AvroStorage();

Modified: pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java (original)
+++ pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java Fri Feb 24 08:19:42 2017
@@ -17,9 +17,9 @@
  */
 package org.apache.pig.data;
 
-import static junit.framework.Assert.assertEquals;
 import static org.apache.pig.builtin.mock.Storage.resetData;
 import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -599,33 +599,33 @@ public class TestSchemaTuple {
         Data data = resetData(pigServer);
 
         data.set("foo1",
-            tuple(0),
-            tuple(1),
-            tuple(2),
-            tuple(3),
-            tuple(4),
-            tuple(5),
-            tuple(6),
-            tuple(7),
-            tuple(8),
-            tuple(9)
+            tuple(0, 0),
+            tuple(1, 1),
+            tuple(2, 2),
+            tuple(3, 3),
+            tuple(4, 4),
+            tuple(5, 5),
+            tuple(6, 6),
+            tuple(7, 7),
+            tuple(8, 8),
+            tuple(9, 9)
             );
 
         data.set("foo2",
-            tuple(0),
-            tuple(1),
-            tuple(2),
-            tuple(3),
-            tuple(4),
-            tuple(5),
-            tuple(6),
-            tuple(7),
-            tuple(8),
-            tuple(9)
+            tuple(0, 0),
+            tuple(1, 1),
+            tuple(2, 2),
+            tuple(3, 3),
+            tuple(4, 4),
+            tuple(5, 5),
+            tuple(6, 6),
+            tuple(7, 7),
+            tuple(8, 8),
+            tuple(9, 9)
             );
 
-        pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int);");
-        pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int);");
+        pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int, y:int);");
+        pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int, y:int);");
         if (preSort) {
             pigServer.registerQuery("A = ORDER A BY x ASC;");
             pigServer.registerQuery("B = ORDER B BY x ASC;");
@@ -638,20 +638,24 @@ public class TestSchemaTuple {
             if (!out.hasNext()) {
                 throw new Exception("Output should have had more elements! Failed on element: " + i);
             }
-            assertEquals(tuple(i, i), out.next());
+            assertEquals(tuple(i, i, i, i), out.next());
         }
         assertFalse(out.hasNext());
 
-        pigServer.registerQuery("STORE D INTO 'bar' USING mock.Storage();");
+        pigServer.registerQuery("STORE D INTO 'bar1' USING mock.Storage();");
+        pigServer.registerQuery("E = JOIN A by (x, y),  B by (x, y) using '"+joinType+"';");
+        pigServer.registerQuery("F = ORDER E BY $0 ASC;");
+        pigServer.registerQuery("STORE F INTO 'bar2' USING mock.Storage();");
 
-        List<Tuple> tuples = data.get("bar");
+        List<Tuple> bar1 = data.get("bar1");
+        List<Tuple> bar2 = data.get("bar2");
 
-        if (tuples.size() != 10) {
-            throw new Exception("Output does not have enough elements! List: " + tuples);
-        }
+        assertEquals("Output does not have enough elements! List: " + bar1, 10, bar1.size());
+        assertEquals("Output does not have enough elements! List: " + bar2, 10, bar2.size());
 
         for (int i = 0; i < 10; i++) {
-            assertEquals(tuple(i, i), tuples.get(i));
+            assertEquals(tuple(i, i, i, i), bar1.get(i));
+            assertEquals(tuple(i, i, i, i), bar2.get(i));
         }
 
     }

Added: pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java (added)
+++ pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniGenericCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import static org.apache.pig.builtin.mock.Storage.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHiveUDTF {
+    private static PigServer pigServer = null;
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+
+    @BeforeClass
+    public static void oneTimeSetup() throws ExecException {
+        pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+    }
+
+    @Test
+    public void testHiveUDTFOnBagInput() throws IOException {
+        Data data = resetData(pigServer);
+
+        Tuple tuple = tuple(bag(tuple("a"), tuple("b"), tuple("c")));
+
+        data.set("TestHiveUDTF", tuple);
+
+        pigServer.registerQuery("define posexplode HiveUDTF('posexplode');");
+        pigServer.registerQuery("A = load 'TestHiveUDTF' USING mock.Storage() as (a0:{(b0:chararray)});");
+        pigServer.registerQuery("B = foreach A generate posexplode(a0);");
+
+        Iterator<Tuple> result = pigServer.openIterator("B");
+        List<Tuple> out = Lists.newArrayList(result);
+
+        assertEquals(2, out.size());
+        assertTrue("Result doesn't contain the HiveUDTF output",
+                out.contains(tuple(bag(tuple(0, "a"), tuple(1, "b"), tuple(2, "c")))));
+        assertTrue("Result doesn't contain an empty bag",
+                out.contains(tuple(bag())));
+    }
+
+    @Test
+    public void testHiveUDTFOnBagInputWithTwoProjection() throws IOException {
+        Data data = resetData(pigServer);
+
+        Tuple tuple = tuple(bag(tuple("a"), tuple("b"), tuple("c")));
+
+        data.set("TestHiveUDTF", tuple);
+
+        pigServer.registerQuery("define posexplode HiveUDTF('posexplode');");
+        pigServer.registerQuery("A = load 'TestHiveUDTF' USING mock.Storage() as (a0:{(b0:chararray)});");
+        pigServer.registerQuery("B = foreach A generate a0, posexplode(a0);");
+
+        Iterator<Tuple> result = pigServer.openIterator("B");
+        List<Tuple> out = Lists.newArrayList(result);
+
+        assertEquals(2, out.size());
+        assertTrue("Result doesn't contain the HiveUDTF output",
+                out.contains(tuple(bag(tuple("a"), tuple("b"), tuple("c")), bag(tuple(0, "a"), tuple(1, "b"), tuple(2, "c")))));
+        assertTrue("Result doesn't contain an empty bag",
+                out.contains(tuple(null, bag())));
+    }
+
+    @Test
+    public void testHiveUDTFOnClose() throws IOException {
+        Data data = resetData(pigServer);
+
+        List<Tuple> tuples = Arrays.asList(tuple("a", 1), tuple("a", 2), tuple("a", 3));
+
+        data.set("TestHiveUDTF", tuples);
+
+        pigServer.registerQuery("define COUNT2 HiveUDTF('org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount2');");
+        pigServer.registerQuery("a = load 'TestHiveUDTF' USING mock.Storage() as (name:chararray, id:int);");
+        pigServer.registerQuery("b = foreach a generate flatten(COUNT2(name));");
+
+        Iterator<Tuple> result = pigServer.openIterator("b");
+        List<Tuple> out = Lists.newArrayList(result);
+
+        assertEquals(2, out.size());
+        assertEquals(tuple(3), out.get(0));
+        assertEquals(tuple(3), out.get(1));
+    }
+
+}

Modified: pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java (original)
+++ pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java Fri Feb 24 08:19:42 2017
@@ -652,4 +652,14 @@ public class TestQueryParser {
     public void testSplit2() throws Exception {
         shouldPass("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';");
     }
+
+    @Test
+    public void testBigDecimalParsing() throws Exception {
+        shouldPass("B = FILTER A BY $1 < 1234567890.123456789BD;");
+    }
+
+    @Test
+    public void testBigIntegerParsing() throws Exception {
+        shouldPass("B = FILTER A BY $1 < 1234567890123456789BI;");
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java (original)
+++ pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java Fri Feb 24 08:19:42 2017
@@ -19,10 +19,20 @@ package org.apache.pig.parser;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
 import java.util.Properties;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.NonFSLoadFunc;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.test.Util;
 import org.junit.Test;
@@ -72,43 +82,76 @@ public class TestQueryParserUtils {
         QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc);
         assertEquals(null, props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
 
-        if(org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
-            // webhdfs
-            props.remove(MRConfiguration.JOB_HDFS_SERVERS);
-            QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
-            assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc);
-            assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
-            // har with webhfs
-            QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc);
-            assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc);
-            assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            props.remove(MRConfiguration.JOB_HDFS_SERVERS);
-            QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc);
-            assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
-            //viewfs
-            props.remove(MRConfiguration.JOB_HDFS_SERVERS);
-            QueryParserUtils.setHdfsServers("viewfs:/tmp", pc);
-            assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            QueryParserUtils.setHdfsServers("viewfs:///tmp", pc);
-            assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc);
-            assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
-            //har with viewfs
-            props.remove(MRConfiguration.JOB_HDFS_SERVERS);
-            QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc);
-            assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc);
-            assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        // webhdfs
+        props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+        QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
+        assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc);
+        assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+
+        // har with webhfs
+        QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc);
+        assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc);
+        assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+        QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc);
+        assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
 
+        //viewfs
+        props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+        QueryParserUtils.setHdfsServers("viewfs:/tmp", pc);
+        assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        QueryParserUtils.setHdfsServers("viewfs:///tmp", pc);
+        assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc);
+        assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
 
-        }
+        //har with viewfs
+        props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+        QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc);
+        assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc);
+        assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+
+    }
 
 
+    @Test
+    public void testNonFSLoadFunc() throws Exception {
+        PigServer pigServer = new PigServer(Util.getLocalTestMode(), new Properties());
+        pigServer.registerQuery("A =  load 'hbase://query/SELECT ID, NAME, DATE FROM HIRES WHERE DATE > TO_DATE(\"1990-12-21 05:55:00.000\")' using org.apache.pig.parser.TestQueryParserUtils$DummyNonFSLoader();");
+        pigServer.shutdown();
     }
 
+    /**
+     * Test class for testNonFSLoadFuncNoSetHdfsServersCall test case
+     */
+    public static class DummyNonFSLoader extends LoadFunc implements NonFSLoadFunc {
+
+        @Override
+        public void setLocation(String location, Job job) throws IOException {
+            throw new RuntimeException("Should not be called");
+        }
+
+        @Override
+        public InputFormat getInputFormat() throws IOException {
+            throw new RuntimeException("Should not be called");
+        }
+
+        @Override
+        public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
+            throw new RuntimeException("Should not be called");
+        }
+
+        @Override
+        public Tuple getNext() throws IOException {
+            throw new RuntimeException("Should not be called");
+        }
+
+        @Override
+        public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+            return location;
+        }
+    }
 }

Added: pig/branches/spark/test/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/MiniCluster.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/MiniCluster.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/MiniCluster.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+
+/**
+ * This class builds a single instance of itself with the Singleton
+ * design pattern. While building the single instance, it sets up a
+ * mini cluster that actually consists of a mini DFS cluster and a
+ * mini MapReduce cluster on the local machine and also sets up the
+ * environment for Pig to run on top of the mini cluster.
+ */
+public class MiniCluster extends MiniGenericCluster {
+    private static final File CONF_DIR = new File("build/classes");
+    private static final File CONF_FILE = new File(CONF_DIR, "hadoop-site.xml");
+
+    protected MiniMRYarnCluster m_mr = null;
+    private Configuration m_dfs_conf = null;
+    private Configuration m_mr_conf = null;
+
+    /**
+     * @deprecated use {@link org.apache.pig.test.MiniGenericCluster.buildCluster() instead.
+     */
+    @Deprecated
+    public static MiniCluster buildCluster() {
+        System.setProperty("test.exec.type", "mr");
+        return (MiniCluster)MiniGenericCluster.buildCluster("mr");
+    }
+
+    @Override
+    public ExecType getExecType() {
+        return ExecType.MAPREDUCE;
+    }
+
+    @Override
+    protected void setupMiniDfsAndMrClusters() {
+        try {
+            final int dataNodes = 4;     // There will be 4 data nodes
+            final int taskTrackers = 4;  // There will be 4 task tracker nodes
+
+            System.setProperty("hadoop.log.dir", "build/test/logs");
+            // Create the dir that holds hadoop-site.xml file
+            // Delete if hadoop-site.xml exists already
+            CONF_DIR.mkdirs();
+            if(CONF_FILE.exists()) {
+                CONF_FILE.delete();
+            }
+
+            // Builds and starts the mini dfs and mapreduce clusters
+            Configuration config = new Configuration();
+            config.set("yarn.scheduler.capacity.root.queues", "default");
+            config.set("yarn.scheduler.capacity.root.default.capacity", "100");
+            m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
+            m_fileSys = m_dfs.getFileSystem();
+            m_dfs_conf = m_dfs.getConfiguration(0);
+
+            //Create user home directory
+            m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
+
+            m_mr = new MiniMRYarnCluster("PigMiniCluster", taskTrackers);
+            m_mr.init(m_dfs_conf);
+            m_mr.start();
+
+            // Write the necessary config info to hadoop-site.xml
+            m_mr_conf = new Configuration(m_mr.getConfig());
+
+            m_conf = m_mr_conf;
+            m_conf.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+            m_conf.unset(MRConfiguration.JOB_CACHE_FILES);
+
+            m_conf.setInt(MRConfiguration.IO_SORT_MB, 200);
+            m_conf.set(MRConfiguration.CHILD_JAVA_OPTS, "-Xmx512m");
+
+            m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2);
+            m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2);
+            m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2);
+            m_conf.set("dfs.datanode.address", "0.0.0.0:0");
+            m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+            m_conf.set("pig.jobcontrol.sleep", "100");
+            m_conf.writeXml(new FileOutputStream(CONF_FILE));
+            m_fileSys.copyFromLocalFile(new Path(CONF_FILE.getAbsoluteFile().toString()),
+                    new Path("/pigtest/conf/hadoop-site.xml"));
+            DistributedCache.addFileToClassPath(new Path("/pigtest/conf/hadoop-site.xml"), m_conf);
+
+            System.err.println("XXX: Setting " + FileSystem.FS_DEFAULT_NAME_KEY + " to: " + m_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+            // Set the system properties needed by Pig
+            System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER));
+            System.setProperty("namenode", m_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+            System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void shutdownMiniMrClusters() {
+        // Delete hadoop-site.xml on shutDown
+        if(CONF_FILE.exists()) {
+            CONF_FILE.delete();
+        }
+        if (m_mr != null) { m_mr.stop(); }
+        m_mr = null;
+    }
+
+    static public Launcher getLauncher() {
+        return new MapReduceLauncher();
+    }
+}

Added: pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
+
+public class SparkMiniCluster extends MiniGenericCluster {
+    private static final File CONF_DIR = new File("build/classes");
+    private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
+    private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
+    private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
+    private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml");
+
+    private Configuration m_dfs_conf = null;
+    protected MiniMRYarnCluster m_mr = null;
+    private Configuration m_mr_conf = null;
+
+    private static final Log LOG = LogFactory
+            .getLog(SparkMiniCluster.class);
+    private ExecType spark = new SparkExecType();
+    SparkMiniCluster() {
+
+    }
+
+    @Override
+    public ExecType getExecType() {
+        return spark;
+    }
+
+    @Override
+    protected void setupMiniDfsAndMrClusters() {
+        try {
+            deleteConfFiles();
+            CONF_DIR.mkdirs();
+
+            // Build mini DFS cluster
+            Configuration hdfsConf = new Configuration();
+            m_dfs = new MiniDFSCluster.Builder(hdfsConf)
+                    .numDataNodes(2)
+                    .format(true)
+                    .racks(null)
+                    .build();
+            m_fileSys = m_dfs.getFileSystem();
+            m_dfs_conf = m_dfs.getConfiguration(0);
+
+            //Create user home directory
+            m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
+            // Write core-site.xml
+            Configuration core_site = new Configuration(false);
+            core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+            core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
+
+            Configuration hdfs_site = new Configuration(false);
+            for (Map.Entry<String, String> conf : m_dfs_conf) {
+                if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) {
+                    hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey()));
+                }
+            }
+            hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
+
+            // Build mini YARN cluster
+            m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
+            m_mr.init(m_dfs_conf);
+            m_mr.start();
+            m_mr_conf = m_mr.getConfig();
+            m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                    System.getProperty("java.class.path"));
+
+            Configuration mapred_site = new Configuration(false);
+            Configuration yarn_site = new Configuration(false);
+            for (Map.Entry<String, String> conf : m_mr_conf) {
+                if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) {
+                    if (conf.getKey().contains("yarn")) {
+                        yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+                    } else if (!conf.getKey().startsWith("dfs")){
+                        mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+                    }
+                }
+            }
+
+            mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
+            yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
+
+            m_conf = m_mr_conf;
+            System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+            System.setProperty("hadoop.log.dir", "build/test/logs");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+
+        }
+    }
+
+    @Override
+    protected void shutdownMiniMrClusters() {
+        deleteConfFiles();
+        if (m_mr != null) {
+            m_mr.stop();
+            m_mr = null;
+        }
+    }
+
+    private void deleteConfFiles() {
+
+        if(CORE_CONF_FILE.exists()) {
+            CORE_CONF_FILE.delete();
+        }
+        if(HDFS_CONF_FILE.exists()) {
+            HDFS_CONF_FILE.delete();
+        }
+        if(MAPRED_CONF_FILE.exists()) {
+            MAPRED_CONF_FILE.delete();
+        }
+        if(YARN_CONF_FILE.exists()) {
+            YARN_CONF_FILE.delete();
+        }
+    }
+
+    static public Launcher getLauncher() {
+        return new SparkLauncher();
+    }
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBZip.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBZip.java Fri Feb 24 08:19:42 2017
@@ -43,7 +43,6 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -67,16 +66,10 @@ public class TestBZip {
 
     @Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.")
     public static Iterable<Object[]> data() {
-        if ( HadoopShims.isHadoopYARN() ) {
-            return Arrays.asList(new Object[][] {
-                { false  },
-                { true   }
-            });
-        } else {
-            return Arrays.asList(new Object[][] {
-                { false }
-            });
-        }
+        return Arrays.asList(new Object[][] {
+            { false  },
+            { true   }
+        });
     }
 
     public TestBZip (Boolean useBzipFromHadoop) {

Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Fri Feb 24 08:19:42 2017
@@ -130,6 +130,7 @@ import org.apache.pig.data.DefaultBagFac
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -3206,72 +3207,47 @@ public class TestBuiltin {
     @Test
     public void testUniqueID() throws Exception {
         Util.resetStateForExecModeSwitch();
-        String inputFileName = "testUniqueID.txt";
-        Util.createInputFile(cluster, inputFileName, new String[]
-            {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
         Properties copyproperties = new Properties();
         copyproperties.putAll(cluster.getProperties());
         PigServer pigServer = new PigServer(cluster.getExecType(), copyproperties);
-        pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10");
+
+        // running with 2 mappers each taking 5 records
+        String TMP_DIR = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toUri().getPath();
+        Util.createInputFile(cluster, TMP_DIR + "/input1.txt", new String[] {"1\n2\n3\n4\n5"});
+        Util.createInputFile(cluster, TMP_DIR + "/input2.txt", new String[] {"1\n2\n3\n4\n5"});
         pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
-        pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
+
+        pigServer.registerQuery("A = load '" + TMP_DIR + "' as (name);");
         pigServer.registerQuery("B = foreach A generate name, UniqueID();");
         Iterator<Tuple> iter = pigServer.openIterator("B");
-        if (!Util.isSparkExecType(cluster.getExecType())) {
-            assertEquals(iter.next().get(1), "0-0");
-            assertEquals(iter.next().get(1), "0-1");
-            assertEquals(iter.next().get(1), "0-2");
-            assertEquals(iter.next().get(1), "0-3");
-            assertEquals(iter.next().get(1), "0-4");
-            assertEquals(iter.next().get(1), "1-0");
-            assertEquals(iter.next().get(1), "1-1");
-            assertEquals(iter.next().get(1), "1-2");
-            assertEquals(iter.next().get(1), "1-3");
-            assertEquals(iter.next().get(1), "1-4");
-        } else{
-            //there will be 2 InputSplits when mapred.max.split.size is 10(byte) for the testUniqueID.txt(20 bytes)
-            //Split0:
-            //            1\n
-            //            2\n
-            //            3\n
-            //            4\n
-            //            5\n
-            //            1\n
-            //Split1:
-            //            2\n
-            //            3\n
-            //            4\n
-            //            5\n
-            //The size of Split0 is 12 not 10 because LineRecordReader#nextKeyValue will read one more line
-            //More detail see PIG-4383
-            assertEquals(iter.next().get(1), "0-0");
-            assertEquals(iter.next().get(1), "0-1");
-            assertEquals(iter.next().get(1), "0-2");
-            assertEquals(iter.next().get(1), "0-3");
-            assertEquals(iter.next().get(1), "0-4");
-            assertEquals(iter.next().get(1), "0-5");
-            assertEquals(iter.next().get(1), "1-0");
-            assertEquals(iter.next().get(1), "1-1");
-            assertEquals(iter.next().get(1), "1-2");
-            assertEquals(iter.next().get(1), "1-3");
-        }
-        Util.deleteFile(cluster, inputFileName);
+        assertEquals("0-0",iter.next().get(1));
+        assertEquals("0-1",iter.next().get(1));
+        assertEquals("0-2",iter.next().get(1));
+        assertEquals("0-3",iter.next().get(1));
+        assertEquals("0-4",iter.next().get(1));
+        assertEquals("1-0",iter.next().get(1));
+        assertEquals("1-1",iter.next().get(1));
+        assertEquals("1-2",iter.next().get(1));
+        assertEquals("1-3",iter.next().get(1));
+        assertEquals("1-4",iter.next().get(1));
+        Util.deleteFile(cluster, TMP_DIR + "/input1.txt");
+        Util.deleteFile(cluster, TMP_DIR + "/input2.txt");
     }
 
     @Test
     public void testRANDOMWithJob() throws Exception {
         Util.resetStateForExecModeSwitch();
-        String inputFileName = "testRANDOM.txt";
-        Util.createInputFile(cluster, inputFileName, new String[]
-            {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
-
         Properties copyproperties = new Properties();
         copyproperties.putAll(cluster.getProperties());
         PigServer pigServer = new PigServer(cluster.getExecType(), copyproperties);
-        // running with two mappers
-        pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10");
+
+        // running with 2 mappers each taking 5 records
+        String TMP_DIR = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toUri().getPath();
+        Util.createInputFile(cluster, TMP_DIR + "/input1.txt", new String[] {"1\n2\n3\n4\n5"});
+        Util.createInputFile(cluster, TMP_DIR + "/input2.txt", new String[] {"1\n2\n3\n4\n5"});
         pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
-        pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
+
+        pigServer.registerQuery("A = load '" + TMP_DIR + "' as (name);");
         pigServer.registerQuery("B = foreach A generate name, RANDOM();");
         Iterator<Tuple> iter = pigServer.openIterator("B");
         double [] mapper1 = new double[5];
@@ -3294,7 +3270,8 @@ public class TestBuiltin {
         for( int i = 0; i < 5; i++ ){
             assertNotEquals(mapper1[i], mapper2[i], 0.0001);
         }
-        Util.deleteFile(cluster, inputFileName);
+        Util.deleteFile(cluster, TMP_DIR + "/input1.txt");
+        Util.deleteFile(cluster, TMP_DIR + "/input2.txt");
     }
 
 

Added: pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+
+import java.util.Properties;
+
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestConfigurationUtil {
+
+    @Test
+    public void testExpandForAlternativeNames() {
+        Properties properties = null;
+        properties = ConfigurationUtil.expandForAlternativeNames("fs.df.interval", "500");
+        Assert.assertEquals(1,properties.size());
+        Assert.assertEquals("500",properties.get("fs.df.interval"));
+
+        properties = ConfigurationUtil.expandForAlternativeNames("dfs.df.interval", "600");
+        Assert.assertEquals(2,properties.size());
+        Assert.assertEquals("600",properties.get("fs.df.interval"));
+        Assert.assertEquals("600",properties.get("dfs.df.interval"));
+
+        properties = ConfigurationUtil.expandForAlternativeNames("", "");
+        Assert.assertEquals(1,properties.size());
+        Assert.assertEquals("",properties.get(""));
+
+    }
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCounters.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCounters.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCounters.java Fri Feb 24 08:19:42 2017
@@ -30,17 +30,17 @@ import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -49,8 +49,8 @@ import org.junit.runners.JUnit4;
 public class TestCounters {
     String file = "input.txt";
 
-    static MiniCluster cluster = MiniCluster.buildCluster();
-    
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+
     final int MAX = 100*1000;
     Random r = new Random();
 
@@ -59,7 +59,7 @@ public class TestCounters {
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-    
+
     @Test
     public void testMapOnly() throws IOException, ExecException {
         int count = 0;
@@ -70,13 +70,13 @@ public class TestCounters {
             if(t > 50) count ++;
         }
         pw.close();
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = filter a by $0 > 50;");
         pigServer.registerQuery("c = foreach b generate $0 - 50;");
         ExecJob job = pigServer.store("c", "output_map_only");
         PigStats pigStats = job.getStatistics();
-        
+
         //counting the no. of bytes in the output file
         //long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
@@ -85,9 +85,9 @@ public class TestCounters {
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output_map_only"), true);
 
@@ -98,7 +98,7 @@ public class TestCounters {
         JobGraph jg = pigStats.getJobGraph();
         Iterator<JobStats> iter = jg.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();                    
+            JobStats js = iter.next();
 
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
@@ -123,20 +123,20 @@ public class TestCounters {
                 count ++;
         }
         pw.close();
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = filter a by $0 > 50;");
         pigServer.registerQuery("c = foreach b generate $0 - 50;");
         ExecJob job = pigServer.store("c", "output_map_only", "BinStorage");
         PigStats pigStats = job.getStatistics();
-        
+
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
                 "output_map_only", pigServer.getPigContext()),
                 pigServer.getPigContext());
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
 
         cluster.getFileSystem().delete(new Path(file), true);
@@ -149,8 +149,8 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
-        
+            JobStats js = iter.next();
+
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -158,7 +158,7 @@ public class TestCounters {
             assertEquals(0, js.getReduceInputRecords());
             assertEquals(0, js.getReduceOutputRecords());
         }
-            
+
         System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
@@ -183,7 +183,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group;");
@@ -195,7 +195,7 @@ public class TestCounters {
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
 
         cluster.getFileSystem().delete(new Path(file), true);
@@ -208,7 +208,7 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -242,7 +242,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group;");
@@ -253,9 +253,9 @@ public class TestCounters {
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
@@ -266,7 +266,7 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -300,7 +300,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
@@ -311,20 +311,20 @@ public class TestCounters {
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
- 
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
         System.out.println("============================================");
         System.out.println("Test case MapCombineReduce");
         System.out.println("============================================");
-        
+
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -337,7 +337,7 @@ public class TestCounters {
         System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
-     
+
     @Test
     public void testMapCombineReduceBinStorage() throws IOException, ExecException {
         int count = 0;
@@ -358,20 +358,20 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
 
         ExecJob job = pigServer.store("c", "output", "BinStorage");
         PigStats pigStats = job.getStatistics();
-        
+
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
                 pigServer.getPigContext()), pigServer.getPigContext());
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
@@ -379,11 +379,11 @@ public class TestCounters {
         System.out.println("============================================");
         System.out.println("Test case MapCombineReduce");
         System.out.println("============================================");
- 
+
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -399,6 +399,8 @@ public class TestCounters {
 
     @Test
     public void testMultipleMRJobs() throws IOException, ExecException {
+        Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job",
+                Util.isMapredExecType(cluster.getExecType()));
         int count = 0;
         PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
         int [] nos = new int[10];
@@ -413,38 +415,38 @@ public class TestCounters {
         }
         pw.close();
 
-        for(int i = 0; i < 10; i++) { 
+        for(int i = 0; i < 10; i++) {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = order a by $0;");
         pigServer.registerQuery("c = group b by $0;");
         pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
         ExecJob job = pigServer.store("d", "output");
         PigStats pigStats = job.getStatistics();
-        
+
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
-        
+
         System.out.println("============================================");
         System.out.println("Test case MultipleMRJobs");
         System.out.println("============================================");
-        
+
         JobGraph jp = pigStats.getJobGraph();
-        MRJobStats js = (MRJobStats)jp.getSinks().get(0);
-        
+        JobStats js = (JobStats)jp.getSinks().get(0);
+
         System.out.println("Job id: " + js.getName());
         System.out.println(jp.toString());
-        
+
         System.out.println("Map input records : " + js.getMapInputRecords());
         assertEquals(MAX, js.getMapInputRecords());
         System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -453,12 +455,12 @@ public class TestCounters {
         assertEquals(count, js.getReduceInputRecords());
         System.out.println("Reduce output records : " + js.getReduceOutputRecords());
         assertEquals(count, js.getReduceOutputRecords());
-        
+
         System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
         assertEquals(filesize, js.getHdfsBytesWritten());
 
     }
-    
+
     @Test
     public void testMapOnlyMultiQueryStores() throws Exception {
         PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
@@ -467,8 +469,8 @@ public class TestCounters {
             pw.println(t);
         }
         pw.close();
-        
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+
+        PigServer pigServer = new PigServer(cluster.getExecType(),
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file + "';");
@@ -479,22 +481,22 @@ public class TestCounters {
         List<ExecJob> jobs = pigServer.executeBatch();
         PigStats stats = jobs.get(0).getStatistics();
         assertTrue(stats.getOutputLocations().size() == 2);
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
 
-        MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
-        
+        JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+
         Map<String, Long> entry = js.getMultiStoreCounters();
         long counter = 0;
         for (Long val : entry.values()) {
             counter += val;
         }
-        
-        assertEquals(MAX, counter);       
-    }    
-    
+
+        assertEquals(MAX, counter);
+    }
+
     @Test
     public void testMultiQueryStores() throws Exception {
         int[] nums = new int[100];
@@ -505,13 +507,13 @@ public class TestCounters {
             nums[t]++;
         }
         pw.close();
-        
+
         int groups = 0;
         for (int i : nums) {
             if (i > 0) groups++;
         }
-        
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+
+        PigServer pigServer = new PigServer(cluster.getExecType(),
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file + "';");
@@ -525,29 +527,29 @@ public class TestCounters {
         pigServer.registerQuery("store g into '/tmp/outout2';");
         List<ExecJob> jobs = pigServer.executeBatch();
         PigStats stats = jobs.get(0).getStatistics();
-        
+
         assertTrue(stats.getOutputLocations().size() == 2);
-               
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
 
-        MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
-        
+        JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+
         Map<String, Long> entry = js.getMultiStoreCounters();
         long counter = 0;
         for (Long val : entry.values()) {
             counter += val;
         }
-        
-        assertEquals(groups, counter);       
-    }    
-    
-    /*    
+
+        assertEquals(groups, counter);
+    }
+
+    /*
      * IMPORTANT NOTE:
      * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
      * SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
-     */ 
+     */
 //    @Test
 //    public void testLocal() throws IOException, ExecException {
 //        int count = 0;
@@ -566,7 +568,7 @@ public class TestCounters {
 //        }
 //        pw.close();
 //
-//        for(int i = 0; i < 10; i++) 
+//        for(int i = 0; i < 10; i++)
 //            if(nos[i] > 0)
 //                count ++;
 //
@@ -580,56 +582,56 @@ public class TestCounters {
 //        pigServer.registerQuery("c = group b by $0;");
 //        pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
 //        PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
-//        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+//        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs());
 //        long filesize = 0;
 //        while(is.read() != -1) filesize++;
-//        
+//
 //        is.close();
 //        out.delete();
-//        
+//
 //        //Map<String, Map<String, String>> stats = pigStats.getPigStats();
-//        
+//
 //        assertEquals(10, pigStats.getRecordsWritten());
 //        assertEquals(110, pigStats.getBytesWritten());
 //
 //    }
 
     @Test
-    public void testJoinInputCounters() throws Exception {        
+    public void testJoinInputCounters() throws Exception {
         testInputCounters("join");
     }
-    
+
     @Test
-    public void testCogroupInputCounters() throws Exception {        
+    public void testCogroupInputCounters() throws Exception {
         testInputCounters("cogroup");
     }
-    
+
     @Test
-    public void testSkewedInputCounters() throws Exception {        
+    public void testSkewedInputCounters() throws Exception {
         testInputCounters("skewed");
     }
-    
+
     @Test
-    public void testSelfJoinInputCounters() throws Exception {        
+    public void testSelfJoinInputCounters() throws Exception {
         testInputCounters("self-join");
     }
-    
+
     private static boolean multiInputCreated = false;
-    
+
     private static int count = 0;
-            
-    private void testInputCounters(String keyword) throws Exception {  
+
+    private void testInputCounters(String keyword) throws Exception {
         String file1 = "multi-input1.txt";
         String file2 = "multi-input2.txt";
-        
+
         String output = keyword;
-        
+
         if (keyword.equals("self-join")) {
             file2 = file1;
             keyword = "join";
         }
-         
-        final int MAX_NUM_RECORDS = 100; 
+
+        final int MAX_NUM_RECORDS = 100;
         if (!multiInputCreated) {
             PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1));
             for (int i = 0; i < MAX_NUM_RECORDS; i++) {
@@ -637,7 +639,7 @@ public class TestCounters {
                 pw.println(t);
             }
             pw.close();
-                        
+
             PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2));
             for (int i = 0; i < MAX_NUM_RECORDS; i++) {
                 int t = r.nextInt(100);
@@ -649,8 +651,8 @@ public class TestCounters {
             pw2.close();
             multiInputCreated = true;
         }
-        
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+
+        PigServer pigServer = new PigServer(cluster.getExecType(),
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file1 + "';");
@@ -661,7 +663,7 @@ public class TestCounters {
             pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';");
         }
         ExecJob job = pigServer.store("c", output + "_output");
-        
+
         PigStats stats = job.getStatistics();
         assertTrue(stats.isSuccessful());
         List<InputStats> inputs = stats.getInputStats();
@@ -680,4 +682,46 @@ public class TestCounters {
             }
         }
     }
+
+    @Test
+    public void testSplitUnionOutputCounters() throws Exception {
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input"));
+        for (int i = 0; i < 10; i++) {
+            pw.println(i);
+        }
+        pw.close();
+        String query =
+                "a = load 'splitunion-input';" +
+                "split a into b if $0 < 5, c otherwise;" +
+                "d = union b, c;";
+
+        pigServer.registerQuery(query);
+
+        ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage");
+        PigStats stats1 = job.getStatistics();
+
+        query =
+                "a = load 'splitunion-input';" +
+                "split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" +
+                "e = distinct d;" +
+                "f = union b, c, e;";
+
+        pigServer.registerQuery(query);
+
+        job = pigServer.store("f", "splitunion-output-1", "PigStorage");
+        PigStats stats2 = job.getStatistics();
+
+        PigStats[] pigStats = new PigStats[]{stats1, stats2};
+        for (int i = 0; i < 2; i++) {
+            PigStats stats = pigStats[i];
+            assertTrue(stats.isSuccessful());
+            List<OutputStats> outputs = stats.getOutputStats();
+            assertEquals(1, outputs.size());
+            OutputStats output = outputs.get(0);
+            assertEquals("splitunion-output-" + i, output.getName());
+            assertEquals(10, output.getNumberRecords());
+            assertEquals(20, output.getBytes());
+        }
+    }
 }