You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 08:19:46 UTC

svn commit: r1784237 [18/22] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Modified: pig/branches/spark/test/org/apache/pig/test/TestPlanGeneration.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPlanGeneration.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPlanGeneration.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPlanGeneration.java Fri Feb 24 08:19:42 2017
@@ -20,6 +20,9 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
+import java.util.List;
+
+import junit.framework.Assert;
 
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
@@ -36,15 +39,22 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import static org.apache.pig.builtin.mock.Storage.*;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.CastExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOStore;
@@ -61,8 +71,8 @@ public class TestPlanGeneration {
     private static PigServer ps;
 
     @BeforeClass
-    public static void setUp() throws ExecException {
-        ps = new PigServer(ExecType.LOCAL);
+    public static void setUp() throws Exception {
+        ps = new PigServer(Util.getLocalTestMode());
         pc = ps.getPigContext();
         pc.connect();
     }
@@ -311,4 +321,218 @@ public class TestPlanGeneration {
         assertNotNull(((PartitionedLoader)loLoad.getLoadFunc()).getPartFilter());
         assertEquals("b", loStore.getAlias());
     }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast1() throws Exception {
+        // A cast ForEach is inserted to take care of the user schema
+        String query = "A = load 'foo' as (a, b:int);\n" +
+                "B = foreach A generate a as a0:chararray, b as b:int;\n" +
+                "store B into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
+        // before a0 is typecasted to chararray, it should be bytearray
+        assertEquals(DataType.BYTEARRAY, loForEach1.getSchema().getField(0).type);
+        // type of b should stay as int
+        assertEquals(DataType.INTEGER, loForEach1.getSchema().getField(1).type);
+        assertEquals("B", loForEach2.getAlias());
+        LOGenerate generate = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
+        CastExpression cast = (CastExpression)generate.getOutputPlans().get(0).getSources().get(0);
+        Assert.assertTrue(cast.getType()==DataType.CHARARRAY);
+        assertEquals(loForEach2.getSchema().getField(0).alias, "a0");
+        Assert.assertTrue(lp.getSuccessors(loForEach2).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast2() throws Exception {
+        // No additional cast ForEach will be inserted, but schema should match
+        String query = "A = load 'foo' as (a, b);\n" +
+                "B = foreach A generate (chararray)a as a0:chararray;\n" +
+                "store B into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        assertEquals(loForEach.getSchema().getField(0).alias, "a0");
+        Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast3() throws Exception {
+        // No additional cast ForEach will be inserted, but schema should match
+        String query = "A = load 'foo' as (a, b);\n" +
+                "B = foreach A generate (chararray)a as a0:int;\n" +
+                "store B into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        LOGenerate generate1 = (LOGenerate)loForEach1.getInnerPlan().getSinks().get(0);
+        CastExpression cast1 = (CastExpression)generate1.getOutputPlans().get(0).getSources().get(0);
+        Assert.assertTrue(cast1.getType()==DataType.CHARARRAY);
+        //before a0 is typecasted to int, it should be chararray
+        Assert.assertEquals(DataType.CHARARRAY, loForEach1.getSchema().getField(0).type);
+        LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
+        LOGenerate generate2 = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
+        CastExpression cast2 = (CastExpression)generate2.getOutputPlans().get(0).getSources().get(0);
+        Assert.assertTrue(cast2.getType()==DataType.INTEGER);
+        Assert.assertTrue(lp.getSuccessors(loForEach2).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast4() throws Exception {
+        // No additional cast ForEach will be inserted
+        String query = "a = load 'foo' as (nb1:bag{}, nb2:chararray);\n" +
+                "b = foreach a generate flatten(nb1) as (year, name), nb2;\n" +
+                "store b into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast5() throws Exception {
+        // cast ForEach will be inserted
+        String query = "a = load 'foo' as (nb1:bag{}, nb2:chararray);\n" +
+                "b = foreach a generate flatten(nb1) as (year, name:chararray), nb2 as nb2:chararray;\n" +
+                "store b into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        // flattened "name" field should be bytearray before typecasted to  chararray
+        Assert.assertEquals(DataType.BYTEARRAY, loForEach1.getSchema().getField(1).type);
+        LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
+        LOGenerate generate = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
+        Assert.assertTrue(generate.getOutputPlans().get(0).getSources().get(0) instanceof ProjectExpression);
+        CastExpression cast = (CastExpression)generate.getOutputPlans().get(1).getSources().get(0);
+        Assert.assertTrue(cast.getType()==DataType.CHARARRAY);
+        Assert.assertTrue(generate.getOutputPlans().get(2).getSources().get(0) instanceof ProjectExpression);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast6() throws Exception {
+        // no cast ForEach will be inserted
+        String query = "a = load 'foo' as (nb1:bag{(year,name)}, nb2);\n" +
+                "b = foreach a generate flatten(nb1) as (year, name2), nb2;\n" +
+                "store b into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        assertEquals(loForEach.getSchema().getField(1).alias, "name2");
+        Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testForEachWithCast7() throws Exception {
+        // no cast ForEach will be inserted, since we don't know the size of outputs
+        // in first inner plan
+        String query = "a = load 'foo' as (nb1:bag{}, nb2:bag{});\n" +
+                "b = foreach a generate flatten(nb1), flatten(nb2) as (year, name);\n" +
+                "store b into 'output';";
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        LOLoad loLoad = (LOLoad)lp.getSources().get(0);
+        LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
+        Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testAsType1() throws Exception {
+        Data data = Storage.resetData(ps);
+        data.set("input", tuple(0.1), tuple(1.2), tuple(2.3));
+
+        String query =
+            "A = load 'input' USING mock.Storage() as (a1:double);\n"
+            + "B = FOREACH A GENERATE a1 as (a2:int);\n"
+            + "store B into 'out' using mock.Storage;" ;
+
+        Util.registerMultiLineQuery(ps, query);
+        List<Tuple> list = data.get("out");
+        // Without PIG-2315, this failed with (0.1), (1.2), (2.3)
+        List<Tuple> expectedRes =
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[] {"(0)", "(1)", "(2)"});
+        Util.checkQueryOutputsAfterSort(list, expectedRes);
+    }
+
+    @Test
+    // See PIG-2315
+    public void testAsType2() throws Exception {
+        Data data = Storage.resetData(ps);
+        data.set("input", tuple("a"), tuple("b"), tuple("c"));
+
+        String query =
+            "A = load 'input' USING mock.Storage(); \n"
+            + "A2 = FOREACH A GENERATE 12345 as (a2:chararray); \n"
+            + "B = load 'input' USING mock.Storage(); \n"
+            + "B2 = FOREACH A GENERATE '12345' as (b2:chararray); \n"
+            + "C = union A2, B2;\n"
+            + "D = distinct C;\n"
+            + "store D into 'out' using mock.Storage;" ;
+
+        Util.registerMultiLineQuery(ps, query);
+        List<Tuple> list = data.get("out");
+        // Without PIG-2315, this produced TWO 12345.
+        // One by chararray and another by int.
+        List<Tuple> expectedRes =
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[] {"('12345')"});
+        Util.checkQueryOutputsAfterSort(list, expectedRes);
+    }
+
+    @Test
+    // See PIG-4933
+    public void testAsWithByteArrayCast() throws Exception {
+        Data data = Storage.resetData(ps);
+	    data.set("input_testAsWithByteArrayCast", "t1:(f1:bytearray, f2:bytearray), f3:chararray",
+				tuple(tuple(1,5), "a"),
+				tuple(tuple(2,4), "b"),
+				tuple(tuple(3,3), "c") );
+
+        String query =
+            "A = load 'input_testAsWithByteArrayCast' USING mock.Storage();\n"
+            + "B = FOREACH A GENERATE t1 as (t2:(newf1, newf2:float)), f3;"
+            + "store B into 'out' using mock.Storage;" ;
+
+        // This will call typecast of (bytearray,float) on a tuple
+        // bytearray2bytearray should be no-op.
+        // Without pig-4933 patch on POCast,
+        // this typecast was producing empty results
+
+        Util.registerMultiLineQuery(ps, query);
+        List<Tuple> list = data.get("out");
+        String[] expectedRes =
+                        new String[] {"((1,5.0),a)","((2,4.0),b)","((3,3.0),c)"};
+        for( int i=0; i < list.size(); i++ ) {
+            Assert.assertEquals(expectedRes[i], list.get(i).toString());
+        }
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java Fri Feb 24 08:19:42 2017
@@ -1397,13 +1397,13 @@ public class TestPruneColumn {
     }
 
     @Test
-    public void testRelayFlattenMap() throws Exception {
+    public void testFlattenMapCantPruneKeys() throws Exception {
         pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
-                + "' as (a0, a1:map[]);");
+                + "' as (a0, a1:map[int]);");
 
         pigServer.registerQuery("B = foreach A generate flatten(a1);");
-        pigServer.registerQuery("C = foreach B generate a1#'key1';");
-
+        pigServer.registerQuery("B1 = filter B by a1::key == 'key1';");
+        pigServer.registerQuery("C = foreach B1 generate a1::value;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
 
         assertTrue(iter.hasNext());
@@ -1418,8 +1418,7 @@ public class TestPruneColumn {
 
         assertFalse(iter.hasNext());
 
-        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
-                "Map key required for A: $1->[key1]"}));
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestRegisterParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRegisterParser.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestRegisterParser.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestRegisterParser.java Fri Feb 24 08:19:42 2017
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.Writer;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Properties;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.impl.PigContext;
@@ -42,6 +43,9 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+
 
 public class TestRegisterParser {
     private PigServer pigServer;
@@ -49,7 +53,12 @@ public class TestRegisterParser {
 
     @Before
     public void setUp() throws Exception {
-	pigServer = new PigServer(ExecType.LOCAL);
+        Properties properties = PropertiesUtil.loadDefaultProperties();
+        properties.setProperty("fs.s3.impl", LocalFileSystem.class.getName());
+        properties.setProperty("fs.s3n.impl", LocalFileSystem.class.getName());
+        properties.setProperty("fs.s3a.impl", LocalFileSystem.class.getName());
+
+	pigServer = new PigServer(ExecType.LOCAL, properties);
 
 	// Generate test jar files
 	for (int i = 1; i <= 5; i++) {
@@ -107,6 +116,34 @@ public class TestRegisterParser {
 	}
     }
 
+    @Test
+    public void testResolveForVariousFileSystemSchemes() throws URISyntaxException, IOException, ParserException {
+        URI[] list = new URI[6];
+        list[0] = new URI("file://test.jar");
+        list[1] = new URI("hdfs://test.jar");
+        list[2] = new URI("s3://test.jar");
+        list[3] = new URI("s3n://test.jar");
+        list[4] = new URI("s3a://test.jar");
+        list[5] = new URI("test.jar");
+
+        RegisterResolver registerResolver = new RegisterResolver(pigServer);
+        for (URI uri : list) {
+            URI[] resolvedUris = registerResolver.resolve(uri);
+	    Assert.assertEquals(1, resolvedUris.length);
+            Assert.assertEquals(uri, resolvedUris[0]);
+        }
+    }
+
+    @Test(expected = ParserException.class)
+    public void testResolveParseException() throws URISyntaxException, IOException, ParserException {
+        new RegisterResolver(pigServer).resolve(new URI("abc://test.jar"));
+    }
+
+    @Test(expected = URISyntaxException.class)
+    public void testResolveURISyntaxException() throws URISyntaxException, IOException, ParserException {
+        new RegisterResolver(pigServer).resolve(new URI("123://test.jar"));
+    }
+
     // Throw error when a scripting language and namespace is specified for a jar
     @Test(expected = ParserException.class)
     public void testRegisterJarException1() throws IOException, ParserException {

Modified: pig/branches/spark/test/org/apache/pig/test/TestScriptUDF.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestScriptUDF.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestScriptUDF.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestScriptUDF.java Fri Feb 24 08:19:42 2017
@@ -247,7 +247,11 @@ public class TestScriptUDF{
         Assert.assertTrue(t.get(0).toString().equals(System.getenv(input[0])));
         Assert.assertTrue(iter.hasNext());
         t = iter.next();
-        Assert.assertTrue(t.get(0).toString().equals(System.getenv(input[1])));
+        if (System.getenv(input[1]) != null) {  // JAVA_HOME is set, t.get(0) is not null
+            Assert.assertTrue(t.get(0).toString().equals(System.getenv(input[1])));
+        } else {  // JAVA_HOME is not set, t.get(0) is null
+            Assert.assertNull(t.get(0));
+        }
         Assert.assertFalse(iter.hasNext());
     }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java Fri Feb 24 08:19:42 2017
@@ -65,6 +65,7 @@ public class TestSkewedJoin {
     private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
     private static final String INPUT_FILE6 = "SkewedJoinInput6.txt";
     private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
+    private static final String INPUT_FILE8 = "SkewedJoinInput8.txt";
     private static final String TEST_DIR = Util.getTestDirectory(TestSkewedJoin.class);
     private static final String INPUT_DIR = TEST_DIR + Path.SEPARATOR + "input";
     private static final String OUTPUT_DIR = TEST_DIR + Path.SEPARATOR + "output";
@@ -173,6 +174,11 @@ public class TestSkewedJoin {
         }
         w7.close();
 
+        //Empty file
+        PrintWriter w8 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE8));
+        w8.close();
+
+
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3);
@@ -180,6 +186,7 @@ public class TestSkewedJoin {
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE5, INPUT_FILE5);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE6, INPUT_FILE6);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE7, INPUT_FILE7);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE8, INPUT_FILE8);
     }
 
     private static void deleteFiles() throws IOException {
@@ -187,6 +194,21 @@ public class TestSkewedJoin {
     }
 
     @Test
+    public void testSkewedJoinMapLeftEmpty() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE8 + "' as (idM:[]);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+        pigServer.registerQuery("C = join A by idM#'id', B by id using 'skewed' PARALLEL 2;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        int count = 0;
+        while(iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        assertEquals(0, count);
+    }
+
+
+    @Test
     public void testSkewedJoinWithGroup() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");

Modified: pig/branches/spark/test/org/apache/pig/test/TestStreamingLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestStreamingLocal.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestStreamingLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestStreamingLocal.java Fri Feb 24 08:19:42 2017
@@ -18,6 +18,7 @@
 package org.apache.pig.test;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
@@ -372,4 +373,41 @@ public class TestStreamingLocal {
             Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
         }
     }
+
+    @Test
+    // Perl script with a syntax error, See PIG-4976
+    public void testNegativeScriptSyntaxError() throws IOException {
+
+        for( int numinput : new int [] {10, 9999} ) {
+            String[] inputStrings = new String[numinput];
+            for (int i=0;i<numinput;i++) {
+                inputStrings[i] = Integer.toString(i);
+            }
+            File input = Util.createInputFile("tmp", "", inputStrings);
+            // Perl script
+            String[] script =
+                new String[] {
+                              "#!/usr/bin/perl",
+                              "syntax error",
+                             };
+            File command1 = Util.createInputFile("script", "pl", script);
+            String query =
+                    "define CMD `perl " + command1.getName() + "` output('foo')" +
+                    "ship ('" + Util.encodeEscape(command1.toString()) + "');";
+            boolean succeeded=true;
+            try {
+                pigServer.registerQuery( query );
+                pigServer.registerQuery("A = load '"
+                        + Util.generateURI(input.toString(),
+                                pigServer.getPigContext())
+                        + "' using PigStorage();");
+                pigServer.registerQuery("B = stream A through CMD;");
+                pigServer.openIterator("B");
+            } catch(Exception ex) {
+                succeeded=false;
+            }
+           Assert.assertFalse("Job with " + numinput + " lines input did not fail.", succeeded);
+        }
+    }
+
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java Fri Feb 24 08:19:42 2017
@@ -39,8 +39,10 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import junit.framework.Assert;
@@ -55,6 +57,8 @@ import org.apache.pig.builtin.PigStorage
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
@@ -2908,12 +2912,12 @@ public class TestTypeCheckingValidatorNe
 
         @Test
         public void testUnionLineageDifferentSchemaFail() throws Throwable {
-            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b') as (field4, field5, field6: chararray, field7 );"
-            + "c = union a , b;"
+            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b') as (field4, field5, field6: chararray, field7 );\n"
+            + "c = union a , b;\n"
             + "d = foreach c generate $3 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         private void checkWarning(String query, String warnMsg) throws FrontendException {
@@ -2955,12 +2959,12 @@ public class TestTypeCheckingValidatorNe
         public void testUnionLineageMixSchemaFail() throws Throwable {
             // different loader caster associated with each input, so can't determine
             // which one to use on union output
-            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');"
-            + "c = union a , b;"
+            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');\n"
+            + "c = union a , b;\n"
             + "d = foreach c generate $3 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         @Test
@@ -3302,12 +3306,12 @@ public class TestTypeCheckingValidatorNe
 
         @Test
         public void testCrossLineageNoSchemaFail() throws Throwable {
-            String query = "a = load 'a' using PigStorage('a');"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');"
-            + "c = cross a , b;"
+            String query = "a = load 'a' using PigStorage('a');\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');\n"
+            + "c = cross a , b;\n"
             + "d = foreach c generate $1 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         @Test
@@ -3323,12 +3327,12 @@ public class TestTypeCheckingValidatorNe
 
         @Test
         public void testCrossLineageMixSchemaFail() throws Throwable {
-            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');"
-            + "c = cross a , b;"
+            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');\n"
+            + "c = cross a , b;\n"
             + "d = foreach c generate $3 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         @Test
@@ -3357,12 +3361,12 @@ public class TestTypeCheckingValidatorNe
         public void testJoinLineageNoSchemaFail() throws Throwable {
             //this test case should change when we decide on what flattening a tuple or bag
             //with null schema results in a foreach flatten and hence a join
-            String query =  "a = load 'a' using PigStorage('a');"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster();"
-            + "c = join a by $0, b by $0;"
+            String query =  "a = load 'a' using PigStorage('a');\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster();\n"
+            + "c = join a by $0, b by $0;\n"
             + "d = foreach c generate $1 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         @Test
@@ -3378,12 +3382,12 @@ public class TestTypeCheckingValidatorNe
         public void testJoinLineageMixSchemaFail() throws Throwable {
             //this test case should change when we decide on what flattening a tuple or bag
             //with null schema results in a foreach flatten and hence a join
-            String query =  "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster();"
-            + "c = join a by field1, b by $0;"
+            String query =  "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster();\n"
+            + "c = join a by field1, b by $0;\n"
             + "d = foreach c generate $3 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         @Test
@@ -3867,12 +3871,12 @@ public class TestTypeCheckingValidatorNe
          */
         @Test
         public void testLineageMultipleLoader3() throws FrontendException {
-            String query =  "A = LOAD 'data1' USING PigStorage() AS (u, v, w);"
-            +  "B = LOAD 'data2' USING TextLoader() AS (x, y);"
-            + "C = COGROUP A BY u, B by x;"
-            +  "D = FOREACH C GENERATE (chararray)group;";
+            String query =  "A = LOAD 'data1' USING PigStorage() AS (u, v, w);\n"
+            +  "B = LOAD 'data2' USING TextLoader() AS (x, y);\n"
+            +  "C = COGROUP A BY u, B by x;\n"
+            +  "D = FOREACH C GENERATE (chararray)group;\n";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to chararray at <line 4,");
         }
 
         /**
@@ -4063,12 +4067,12 @@ public class TestTypeCheckingValidatorNe
 
         @Test
         public void testUDFNoInnerSchema() throws FrontendException {
-            String query = "a= load '1.txt';"
+            String query = "a= load '1.txt' using PigStorage(':') ;"
                 + "b = foreach a generate "+TestUDFTupleNullInnerSchema.class.getName()+"($0);"
                 + "c = foreach b generate flatten($0);"
                 + "d = foreach c generate $0 + 1;";
 
-            checkLastForeachCastLoadFunc(query, null, 0);
+            checkLastForeachCastLoadFunc(query, "PigStorage(':')");
         }
 
         //see PIG-1990
@@ -4118,4 +4122,56 @@ public class TestTypeCheckingValidatorNe
                 " corresponding column in earlier relation(s) in the statement";
             Util.checkExceptionMessage(query, "c", msg);
         }
+        //see PIG-4734
+        public static class GenericToMap extends EvalFunc<Map<String, Double>> {
+            @Override
+            public Map exec(Tuple input) throws IOException {
+                Map<String, Double> output = new HashMap<String, Double>();
+                output.put((String)input.get(0), (Double)input.get(1));
+                return output;
+            }
+        }
+        @Test
+        public void testBinCondCompatMap() throws Exception {
+            String query =
+                "a = load 'studenttab10k' as (name:chararray, gpa:double);"
+                + "b = foreach a generate gpa, TOMAP(name, gpa) as m1, "
+                + GenericToMap.class.getName() + "(name, gpa) as m2;"
+                + "c = foreach b generate (gpa>3? m1 : m2);";
+                createAndProcessLPlan(query);
+        }
+        public static class GenericToTuple extends EvalFunc<Tuple> {
+            @Override
+            public Tuple exec(Tuple input) throws IOException {
+                return input;
+            }
+        }
+        @Test
+        public void testBinCondCompatTuple() throws Exception {
+            String query =
+                "a = load 'studenttab10k' as (name:chararray, gpa:double);"
+                + "b = foreach a generate gpa, TOTUPLE(name, gpa) as t1, "
+                + GenericToTuple.class.getName() + "(name, gpa) as t2;"
+                + "c = foreach b generate (gpa>3? t1 : t2);";
+                createAndProcessLPlan(query);
+        }
+        public static class GenericToBag extends EvalFunc<DataBag> {
+            @Override
+            public DataBag exec(Tuple input) throws IOException {
+                DataBag bag = new NonSpillableDataBag(1);
+                Tuple t = new DefaultTuple();
+                t.append(input.get(0));
+                bag.add(t);
+                return bag;
+            }
+        }
+        @Test
+        public void testBinCondCompatBag() throws Exception {
+            String query =
+                "a = load 'studenttab10k' as (name:chararray, gpa:double);"
+                + "b = foreach a generate gpa, TOBAG(name) as b1, "
+                + GenericToBag.class.getName() + "(name) as b2;"
+                + "c = foreach b generate (gpa>3? b1 : b2);";
+                createAndProcessLPlan(query);
+        }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestUnionOnSchema.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestUnionOnSchema.java Fri Feb 24 08:19:42 2017
@@ -96,8 +96,6 @@ public class TestUnionOnSchema  {
 
     /**
      * Test UNION ONSCHEMA on two inputs with same schema
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaSameSchema() throws Exception {
@@ -128,8 +126,6 @@ public class TestUnionOnSchema  {
     
     /**
      * Test UNION ONSCHEMA with operations after the union
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaFilter() throws Exception {
@@ -161,8 +157,6 @@ public class TestUnionOnSchema  {
     
     /**
      * Test UNION ONSCHEMA with operations after the union
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaSuccOps() throws Exception {
@@ -194,8 +188,6 @@ public class TestUnionOnSchema  {
     
     /**
      * Test UNION ONSCHEMA with cast from bytearray to another type
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaCastOnByteArray() throws Exception {
@@ -223,8 +215,6 @@ public class TestUnionOnSchema  {
     /**
      * Test UNION ONSCHEMA where a common column has additional 'namespace' part
      *  in the column name in one of the inputs
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaScopedColumnName() throws Exception {
@@ -266,8 +256,6 @@ public class TestUnionOnSchema  {
     /**
      * Test UNION ONSCHEMA where a common column has additional 'namespace' part
      *  in the column name in both the inputs
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaScopedColumnNameBothInp1() throws Exception {
@@ -302,8 +290,6 @@ public class TestUnionOnSchema  {
     /**
      * Test UNION ONSCHEMA where a common column has additional 'namespace' part
      *  in the column name in both the inputs
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaScopedColumnNameBothInp2() throws Exception {
@@ -340,8 +326,6 @@ public class TestUnionOnSchema  {
      * Test UNION ONSCHEMA where a common column has additional 'namespace' part
      *  in the column name in one of the inputs.
      *  Negative test case
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaScopedColumnNameNeg() throws Exception {
@@ -366,8 +350,6 @@ public class TestUnionOnSchema  {
     /**
      * Test UNION ONSCHEMA on two inputs with same column names, but different
      * numeric types - test type promotion
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaDiffNumType() throws Exception {
@@ -396,8 +378,6 @@ public class TestUnionOnSchema  {
 
     /**
      * Test UNION ONSCHEMA on two inputs with no common columns
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaNoCommonCols() throws Exception {
@@ -424,8 +404,6 @@ public class TestUnionOnSchema  {
     
     /**
      * Test UNION ONSCHEMA on two inputs , one input with additional columns
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaAdditionalColumn() throws Exception {
@@ -498,8 +476,6 @@ public class TestUnionOnSchema  {
     
     /**
      * Test UNION ONSCHEMA on 3 inputs 
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchema3Inputs() throws Exception {
@@ -533,8 +509,6 @@ public class TestUnionOnSchema  {
 
     /**
      * Test UNION ONSCHEMA with bytearray type 
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaByteArrayConversions() throws Exception {
@@ -572,8 +546,6 @@ public class TestUnionOnSchema  {
     
     /**
      * negative test - test error on no schema
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaNoSchema() throws Exception {
@@ -597,8 +569,6 @@ public class TestUnionOnSchema  {
     
     /**
      * negative test - test error on null alias in one of the FieldSchema
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaNullAliasInFieldSchema() throws Exception {
@@ -640,8 +610,6 @@ public class TestUnionOnSchema  {
 
     /**
      * test union with incompatible types in schema
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaIncompatibleTypes() throws Exception {
@@ -650,7 +618,15 @@ public class TestUnionOnSchema  {
             + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
             + "u = union onschema l1, l2;";
 
-        checkSchemaEquals(query, "x : long, y : bytearray");
+        checkSchemaEx(query, "Cannot cast from chararray to bytearray");
+
+        //without "onschema"
+        query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
+            + "u = union l1, l2;";
+
+        checkSchemaEx(query, "Cannot cast from chararray to bytearray");
 
 
         
@@ -659,8 +635,15 @@ public class TestUnionOnSchema  {
             + "l2 = load '" + INP_FILE_2NUMS + "' as (x : map[ ], y : chararray);"
             + "u = union onschema l1, l2;"
         ; 
-        checkSchemaEquals(query, "x : bytearray, y : chararray");
+        checkSchemaEx(query, "Cannot cast from long to bytearray");
                
+        query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (x : map[ ], y : chararray);"
+            + "u = union l1, l2;"
+        ;
+        checkSchemaEx(query, "Cannot cast from long to bytearray");
+
         // bag column with different internal column types
         query =
             "  l1 = load '" + INP_FILE_2NUMS 
@@ -708,8 +691,6 @@ public class TestUnionOnSchema  {
 
     /**
      * Test UNION ONSCHEMA with input relation having udfs
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaInputUdfs() throws Exception {
@@ -745,8 +726,6 @@ public class TestUnionOnSchema  {
     /**
      * Test UNION ONSCHEMA with udf whose default type is different from
      * final type
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaUdfTypeEvolution() throws Exception {
@@ -797,8 +776,6 @@ public class TestUnionOnSchema  {
     /**
      * Test UNION ONSCHEMA with udf whose default type is different from
      * final type - where udf is not in immediate input of union
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaUdfTypeEvolution2() throws Exception {
@@ -869,8 +846,6 @@ public class TestUnionOnSchema  {
     /**
      * Test UNION ONSCHEMA with input relation having column names with multiple
      * level of namespace in their names
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaScopeMulti() throws Exception {
@@ -916,8 +891,6 @@ public class TestUnionOnSchema  {
     
     /**
      * Test query with a union-onschema having another as input 
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testTwoUnions() throws Exception {

Added: pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.filefilter.RegexFileFilter;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+
+public class TezMiniCluster extends MiniGenericCluster {
+    private static final File CONF_DIR = new File("build/classes");
+    private static final File TEZ_LIB_DIR = new File("build/ivy/lib/Pig");
+    private static final File TEZ_CONF_FILE = new File(CONF_DIR, "tez-site.xml");
+    private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
+    private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
+    private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
+    private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml");
+    private static final ExecType TEZ = new TezExecType();
+
+    protected MiniMRYarnCluster m_mr = null;
+    private Configuration m_dfs_conf = null;
+    private Configuration m_mr_conf = null;
+
+    @Override
+    public ExecType getExecType() {
+        return TEZ;
+    }
+
+    @Override
+    public void setupMiniDfsAndMrClusters() {
+        try {
+            deleteConfFiles();
+            CONF_DIR.mkdirs();
+
+            // Build mini DFS cluster
+            Configuration hdfsConf = new Configuration();
+            m_dfs = new MiniDFSCluster.Builder(hdfsConf)
+                    .numDataNodes(2)
+                    .format(true)
+                    .racks(null)
+                    .build();
+            m_fileSys = m_dfs.getFileSystem();
+            m_dfs_conf = m_dfs.getConfiguration(0);
+            //Create user home directory
+            m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
+
+            // Write core-site.xml
+            Configuration core_site = new Configuration(false);
+            core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+            core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
+
+            Configuration hdfs_site = new Configuration(false);
+            for (Entry<String, String> conf : m_dfs_conf) {
+                if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) {
+                    hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey()));
+                }
+            }
+            hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
+
+            // Build mini YARN cluster
+            m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
+            m_mr.init(m_dfs_conf);
+            m_mr.start();
+            m_mr_conf = m_mr.getConfig();
+            File libDir = new File(System.getProperty("ivy.lib.dir", "build/ivy/lib/Pig"));
+            File classesDir = new File(System.getProperty("build.classes", "build/classes"));
+            File testClassesDir = new File(System.getProperty("test.build.classes", "test/build/classes"));
+            String classpath = libDir.getAbsolutePath() + "/*"
+                    + File.pathSeparator + classesDir.getAbsolutePath()
+                    + File.pathSeparator + testClassesDir.getAbsolutePath();
+            m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, classpath);
+            m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx512m");
+            m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx512m");
+
+            Configuration mapred_site = new Configuration(false);
+            Configuration yarn_site = new Configuration(false);
+            for (Entry<String, String> conf : m_mr_conf) {
+                if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) {
+                    if (conf.getKey().contains("yarn")) {
+                        yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+                    } else if (!conf.getKey().startsWith("dfs")){
+                        mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+                    }
+                }
+            }
+
+            mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
+            yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
+
+            // Write tez-site.xml
+            Configuration tez_conf = new Configuration(false);
+            // TODO PIG-3659 - Remove this once memory management is fixed
+            tez_conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, "20");
+            tez_conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "false");
+            tez_conf.set("tez.lib.uris", "hdfs:///tez,hdfs:///tez/lib");
+            // Set to a lower value so that tests don't get stuck for long because of 1 AM running at a time
+            tez_conf.set(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, "20");
+            // Lower the max task attempts to 2 so that negative tests fail
+            // faster. By default, tasks retry 4 times
+            tez_conf.set(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, "2");
+            tez_conf.writeXml(new FileOutputStream(TEZ_CONF_FILE));
+
+            // Copy tez jars to hdfs
+            m_fileSys.mkdirs(new Path("/tez/lib"));
+            FileFilter fileFilter = new RegexFileFilter("tez-.+\\.jar$");
+            File[] tezJars = TEZ_LIB_DIR.listFiles(fileFilter);
+            for (int i = 0; i < tezJars.length; i++) {
+                if (tezJars[i].getName().startsWith("tez-api")) {
+                    m_fileSys.copyFromLocalFile(
+                            new Path(tezJars[i].getAbsoluteFile().toString()),
+                            new Path("/tez"));
+                } else {
+                    m_fileSys.copyFromLocalFile(
+                            new Path(tezJars[i].getAbsoluteFile().toString()),
+                            new Path("/tez/lib"));
+                }
+            }
+
+            m_conf = m_mr_conf;
+            // Turn FetchOptimizer off so that we can actually test Tez
+            m_conf.set(PigConfiguration.PIG_OPT_FETCH, System.getProperty("test.opt.fetch", "false"));
+
+            System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+            System.setProperty("hadoop.log.dir", "build/test/logs");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void shutdownMiniDfsAndMrClusters() {
+        TezSessionManager.shutdown();
+        super.shutdownMiniDfsAndMrClusters();
+    }
+
+    @Override
+    protected void shutdownMiniMrClusters() {
+        deleteConfFiles();
+        if (m_mr != null) {
+            m_mr.stop();
+            m_mr = null;
+        }
+    }
+
+    private void deleteConfFiles() {
+        if(TEZ_CONF_FILE.exists()) {
+            TEZ_CONF_FILE.delete();
+        }
+        if(CORE_CONF_FILE.exists()) {
+            CORE_CONF_FILE.delete();
+        }
+        if(HDFS_CONF_FILE.exists()) {
+            HDFS_CONF_FILE.delete();
+        }
+        if(MAPRED_CONF_FILE.exists()) {
+            MAPRED_CONF_FILE.delete();
+        }
+        if(YARN_CONF_FILE.exists()) {
+            YARN_CONF_FILE.delete();
+        }
+    }
+
+    static public Launcher getLauncher() {
+        return new TezLauncher();
+    }
+}

Modified: pig/branches/spark/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/Util.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/Util.java Fri Feb 24 08:19:42 2017
@@ -480,6 +480,19 @@ public class Util {
         fs.delete(new Path(fileName), true);
     }
 
+    /**
+     * Deletes a dfs file from the MiniCluster DFS quietly
+     *
+     * @param miniCluster the MiniCluster where the file should be deleted
+     * @param fileName the path of the file to be deleted
+     */
+     public static void deleteQuietly(MiniGenericCluster miniCluster, String fileName) {
+         try {
+             deleteFile(miniCluster, fileName);
+         } catch (IOException ignored) {
+         }
+     }
+
     static public void deleteFile(PigContext pigContext, String fileName)
     throws IOException {
         Configuration conf = ConfigurationUtil.toConfiguration(
@@ -658,13 +671,10 @@ public class Util {
          }
      }
 
-     static private String getMkDirCommandForHadoop2_0(String fileName) {
-         if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
-             Path parentDir = new Path(fileName).getParent();
-             String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
-             return mkdirCommand;
-         }
-         return "";
+     static private String getFSMkDirCommand(String fileName) {
+         Path parentDir = new Path(fileName).getParent();
+         String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
+         return mkdirCommand;
      }
 
     /**
@@ -686,7 +696,7 @@ public class Util {
             fileNameOnCluster = fileNameOnCluster.replace('\\','/');
         }
         PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
-        String script = getMkDirCommandForHadoop2_0(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster;
+        String script = getFSMkDirCommand(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster;
         GruntParser parser = new GruntParser(new StringReader(script), ps);
         parser.setInteractive(false);
         try {
@@ -847,7 +857,23 @@ public class Util {
     }
 
     public static File createFile(String[] data) throws Exception{
-        File f = File.createTempFile("tmp", "");
+        return createFile(null,data);
+    }
+
+    public static File createFile(String filePath, String[] data) throws Exception {
+        File f;
+        if( null == filePath || filePath.isEmpty() ) {
+          f = File.createTempFile("tmp", "");
+        } else  {
+          f = new File(filePath);
+        }
+
+        if (f.getParent() != null && !(new File(f.getParent())).exists()) {
+            (new File(f.getParent())).mkdirs();
+        }
+
+        f.deleteOnExit();
+
         PrintWriter pw = new PrintWriter(f);
         for (int i=0; i<data.length; i++){
             pw.println(data[i]);
@@ -918,14 +944,7 @@ public class Util {
         MapRedUtil.checkLeafIsStore(pp, pc);
 
         MapReduceLauncher launcher = new MapReduceLauncher();
-
-        java.lang.reflect.Method compile = launcher.getClass()
-                .getDeclaredMethod("compile",
-                        new Class[] { PhysicalPlan.class, PigContext.class });
-
-        compile.setAccessible(true);
-
-        return (MROperPlan) compile.invoke(launcher, new Object[] { pp, pc });
+        return launcher.compile(pp,pc);
     }
 
     public static MROperPlan buildMRPlan(String query, PigContext pc) throws Exception {

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld Fri Feb 24 08:19:42 2017
@@ -0,0 +1,91 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-48	->	Tez vertex scope-49,Tez vertex scope-50,
+Tez vertex scope-50	->	Tez vertex scope-46,Tez vertex scope-47,
+Tez vertex scope-46	->	Tez vertex scope-49,
+Tez vertex scope-47	->	Tez vertex scope-49,
+Tez vertex scope-49
+
+Tez vertex scope-48
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{bytearray}(false) - scope-30	->	[ scope-49, scope-50]
+|   |
+|   Project[bytearray][0] - scope-31
+|
+|---c: New For Each(false,false)[bag] - scope-20
+    |   |
+    |   Project[bytearray][0] - scope-15
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][1] - scope-17
+    |
+    |---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-14
+Tez vertex scope-50
+# Combine plan on edge <scope-48>
+Local Rearrange[tuple]{int}(false) - scope-55	->	 scope-50
+|   |
+|   Project[int][0] - scope-54
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+# Plan on vertex
+POValueOutputTez - scope-52	->	 [scope-46, scope-47]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-51
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-26	<-	 scope-50	->	 scope-49
+|   |
+|   Project[bytearray][0] - scope-27
+|
+|---b: New For Each(false,false)[bag] - scope-6
+    |   |
+    |   Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-4
+    |   |
+    |   |---Project[bytearray][1] - scope-3
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-47
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-28	<-	 scope-50	->	 scope-49
+|   |
+|   Project[bytearray][0] - scope-29
+|
+|---a: New For Each(false,false)[bag] - scope-13
+    |   |
+    |   Project[bytearray][0] - scope-8
+    |   |
+    |   Cast[int] - scope-11
+    |   |
+    |   |---Project[bytearray][1] - scope-10
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-7
+Tez vertex scope-49
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-45
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-44
+    |   |
+    |   Project[bytearray][2] - scope-36
+    |   |
+    |   Project[int][3] - scope-38
+    |   |
+    |   Project[int][1] - scope-40
+    |   |
+    |   Project[int][5] - scope-42
+    |
+    |---d: New For Each(true,true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-32
+        |   |
+        |   Project[bag][2] - scope-33
+        |   |
+        |   Project[bag][3] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{bytearray} - scope-25

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld Fri Feb 24 08:19:42 2017
@@ -0,0 +1,91 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-48	->	Tez vertex scope-49,Tez vertex scope-50,
+Tez vertex scope-50	->	Tez vertex scope-46,Tez vertex scope-47,
+Tez vertex scope-46	->	Tez vertex scope-49,
+Tez vertex scope-47	->	Tez vertex scope-49,
+Tez vertex scope-49
+
+Tez vertex scope-48
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{bytearray}(false) - scope-30	->	[ scope-49, scope-50]
+|   |
+|   Project[bytearray][0] - scope-31
+|
+|---c: New For Each(false,false)[bag] - scope-20
+    |   |
+    |   Project[bytearray][0] - scope-15
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][1] - scope-17
+    |
+    |---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-14
+Tez vertex scope-50
+# Combine plan on edge <scope-48>
+Local Rearrange[tuple]{int}(false) - scope-55	->	 scope-50
+|   |
+|   Project[int][0] - scope-54
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+# Plan on vertex
+POValueOutputTez - scope-52	->	 [scope-46, scope-47]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-51
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-26	<-	 scope-50	->	 scope-49
+|   |
+|   Project[bytearray][0] - scope-27
+|
+|---b: New For Each(false,false)[bag] - scope-6
+    |   |
+    |   Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-4
+    |   |
+    |   |---Project[bytearray][1] - scope-3
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-47
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-28	<-	 scope-50	->	 scope-49
+|   |
+|   Project[bytearray][0] - scope-29
+|
+|---a: New For Each(false,false)[bag] - scope-13
+    |   |
+    |   Project[bytearray][0] - scope-8
+    |   |
+    |   Cast[int] - scope-11
+    |   |
+    |   |---Project[bytearray][1] - scope-10
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-7
+Tez vertex scope-49
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-45
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-44
+    |   |
+    |   Project[bytearray][2] - scope-36
+    |   |
+    |   Project[int][3] - scope-38
+    |   |
+    |   Project[int][1] - scope-40
+    |   |
+    |   Project[int][5] - scope-42
+    |
+    |---d: New For Each(true,true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-32
+        |   |
+        |   Project[bag][2] - scope-33
+        |   |
+        |   Project[bag][3] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{bytearray} - scope-25

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld Fri Feb 24 08:19:42 2017
@@ -0,0 +1,83 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-39	->	Tez vertex scope-41,Tez vertex scope-42,
+Tez vertex scope-42	->	Tez vertex scope-40,
+Tez vertex scope-40	->	Tez vertex scope-41,
+Tez vertex scope-41
+
+Tez vertex scope-39
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{chararray}(false) - scope-20	->	[ scope-41, scope-42]
+|   |
+|   Project[chararray][0] - scope-21
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[chararray] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-42
+# Combine plan on edge <scope-39>
+Local Rearrange[tuple]{int}(false) - scope-47	->	 scope-42
+|   |
+|   Project[int][0] - scope-46
+|
+|---Package(BloomPackager)[tuple]{int} - scope-45
+# Plan on vertex
+POValueOutputTez - scope-44	->	 [scope-40]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-43
+Tez vertex scope-40
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{chararray}(false) - scope-22	<-	 scope-42	->	 scope-41
+|   |
+|   Project[chararray][0] - scope-23
+|
+|---b: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[chararray] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-41
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-38
+|
+|---e: New For Each(false,false,false)[bag] - scope-37
+    |   |
+    |   Project[chararray][0] - scope-31
+    |   |
+    |   Project[int][1] - scope-33
+    |   |
+    |   Project[int][3] - scope-35
+    |
+    |---d: New For Each(true,true)[tuple] - scope-30
+        |   |
+        |   Project[bag][1] - scope-24
+        |   |
+        |   POBinCond[bag] - scope-29
+        |   |
+        |   |---Project[bag][2] - scope-25
+        |   |
+        |   |---POUserFunc(org.apache.pig.builtin.IsEmpty)[boolean] - scope-27
+        |   |   |
+        |   |   |---Project[bag][2] - scope-26
+        |   |
+        |   |---Constant({(,)}) - scope-28
+        |
+        |---d: Package(Packager)[tuple]{chararray} - scope-19

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld Fri Feb 24 08:19:42 2017
@@ -0,0 +1,83 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-39	->	Tez vertex scope-41,Tez vertex scope-42,
+Tez vertex scope-42	->	Tez vertex scope-40,
+Tez vertex scope-40	->	Tez vertex scope-41,
+Tez vertex scope-41
+
+Tez vertex scope-39
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{chararray}(false) - scope-20	->	[ scope-41, scope-42]
+|   |
+|   Project[chararray][0] - scope-21
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[chararray] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-42
+# Combine plan on edge <scope-39>
+Local Rearrange[tuple]{int}(false) - scope-47	->	 scope-42
+|   |
+|   Project[int][0] - scope-46
+|
+|---Package(BloomPackager)[tuple]{int} - scope-45
+# Plan on vertex
+POValueOutputTez - scope-44	->	 [scope-40]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-43
+Tez vertex scope-40
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{chararray}(false) - scope-22	<-	 scope-42	->	 scope-41
+|   |
+|   Project[chararray][0] - scope-23
+|
+|---b: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[chararray] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-41
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-38
+|
+|---e: New For Each(false,false,false)[bag] - scope-37
+    |   |
+    |   Project[chararray][0] - scope-31
+    |   |
+    |   Project[int][1] - scope-33
+    |   |
+    |   Project[int][3] - scope-35
+    |
+    |---d: New For Each(true,true)[tuple] - scope-30
+        |   |
+        |   Project[bag][1] - scope-24
+        |   |
+        |   POBinCond[bag] - scope-29
+        |   |
+        |   |---Project[bag][2] - scope-25
+        |   |
+        |   |---POUserFunc(org.apache.pig.builtin.IsEmpty)[boolean] - scope-27
+        |   |   |
+        |   |   |---Project[bag][2] - scope-26
+        |   |
+        |   |---Constant({(,)}) - scope-28
+        |
+        |---d: Package(Packager)[tuple]{chararray} - scope-19

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld Fri Feb 24 08:19:42 2017
@@ -0,0 +1,105 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-45	->	Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex scope-46	->	Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex group scope-59	->	Tez vertex scope-52,
+Tez vertex scope-52	->	Tez vertex scope-44,
+Tez vertex scope-44	->	Tez vertex scope-51,
+Tez vertex group scope-58	->	Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-45
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-60	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-61
+|
+|---b: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-46
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-62	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-63
+|
+|---c: New For Each(false,false)[bag] - scope-23
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][0] - scope-17
+    |   |
+    |   Cast[int] - scope-21
+    |   |
+    |   |---Project[bytearray][1] - scope-20
+    |
+    |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-16
+Tez vertex group scope-59	<-	 [scope-45, scope-46]	->	 scope-52
+# No plan on vertex group
+Tez vertex scope-52
+# Combine plan on edge <scope-45>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Combine plan on edge <scope-46>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54	->	 [scope-44]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-44
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-30
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex group scope-58	<-	 [scope-45, scope-46]	->	 scope-51
+# No plan on vertex group
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+    |   |
+    |   Project[int][0] - scope-36
+    |   |
+    |   Project[int][1] - scope-38
+    |   |
+    |   Project[int][3] - scope-40
+    |
+    |---d: New For Each(true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-33
+        |   |
+        |   Project[bag][2] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-28

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld Fri Feb 24 08:19:42 2017
@@ -0,0 +1,105 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-45	->	Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex scope-46	->	Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex group scope-59	->	Tez vertex scope-52,
+Tez vertex scope-52	->	Tez vertex scope-44,
+Tez vertex scope-44	->	Tez vertex scope-51,
+Tez vertex group scope-58	->	Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-45
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-60	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-61
+|
+|---b: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-46
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-62	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-63
+|
+|---c: New For Each(false,false)[bag] - scope-23
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][0] - scope-17
+    |   |
+    |   Cast[int] - scope-21
+    |   |
+    |   |---Project[bytearray][1] - scope-20
+    |
+    |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-16
+Tez vertex group scope-59	<-	 [scope-45, scope-46]	->	 scope-52
+# No plan on vertex group
+Tez vertex scope-52
+# Combine plan on edge <scope-45>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Combine plan on edge <scope-46>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54	->	 [scope-44]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-44
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-30
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex group scope-58	<-	 [scope-45, scope-46]	->	 scope-51
+# No plan on vertex group
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+    |   |
+    |   Project[int][0] - scope-36
+    |   |
+    |   Project[int][1] - scope-38
+    |   |
+    |   Project[int][3] - scope-40
+    |
+    |---d: New For Each(true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-33
+        |   |
+        |   Project[bag][2] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-28

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld Fri Feb 24 08:19:42 2017
@@ -0,0 +1,97 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-44	->	Tez vertex scope-46,
+Tez vertex scope-45	->	Tez vertex scope-46,
+Tez vertex scope-50	->	Tez vertex scope-51,Tez vertex scope-52,
+Tez vertex scope-52	->	Tez vertex scope-46,
+Tez vertex scope-46	->	Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-44
+# Plan on vertex
+POValueOutputTez - scope-48	->	 [scope-46]
+|
+|---b: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-45
+# Plan on vertex
+POValueOutputTez - scope-49	->	 [scope-46]
+|
+|---c: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-50
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-31	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-32
+|
+|---a: New For Each(false,false)[bag] - scope-24
+    |   |
+    |   Cast[int] - scope-19
+    |   |
+    |   |---Project[bytearray][0] - scope-18
+    |   |
+    |   Cast[int] - scope-22
+    |   |
+    |   |---Project[bytearray][1] - scope-21
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-52
+# Combine plan on edge <scope-50>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54	->	 [scope-46]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-30
+|
+|---POShuffledValueInputTez - scope-47	<-	 [scope-44, scope-45]
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+    |   |
+    |   Project[int][2] - scope-36
+    |   |
+    |   Project[int][3] - scope-38
+    |   |
+    |   Project[int][1] - scope-40
+    |
+    |---d: New For Each(true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-33
+        |   |
+        |   Project[bag][2] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-28

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld Fri Feb 24 08:19:42 2017
@@ -0,0 +1,97 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-44	->	Tez vertex scope-46,
+Tez vertex scope-45	->	Tez vertex scope-46,
+Tez vertex scope-50	->	Tez vertex scope-51,Tez vertex scope-52,
+Tez vertex scope-52	->	Tez vertex scope-46,
+Tez vertex scope-46	->	Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-44
+# Plan on vertex
+POValueOutputTez - scope-48	->	 [scope-46]
+|
+|---b: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-45
+# Plan on vertex
+POValueOutputTez - scope-49	->	 [scope-46]
+|
+|---c: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-50
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-31	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-32
+|
+|---a: New For Each(false,false)[bag] - scope-24
+    |   |
+    |   Cast[int] - scope-19
+    |   |
+    |   |---Project[bytearray][0] - scope-18
+    |   |
+    |   Cast[int] - scope-22
+    |   |
+    |   |---Project[bytearray][1] - scope-21
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-52
+# Combine plan on edge <scope-50>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54	->	 [scope-46]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-30
+|
+|---POShuffledValueInputTez - scope-47	<-	 [scope-44, scope-45]
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+    |   |
+    |   Project[int][2] - scope-36
+    |   |
+    |   Project[int][3] - scope-38
+    |   |
+    |   Project[int][1] - scope-40
+    |
+    |---d: New For Each(true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-33
+        |   |
+        |   Project[bag][2] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-28

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld Fri Feb 24 08:19:42 2017
@@ -0,0 +1,107 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-60	->	Tez vertex scope-61,Tez vertex scope-62,
+Tez vertex scope-62	->	Tez vertex scope-54,Tez vertex scope-58,
+Tez vertex scope-54	->	Tez vertex scope-58,Tez vertex scope-61,
+Tez vertex scope-58	->	Tez vertex scope-61,
+Tez vertex scope-61
+
+Tez vertex scope-60
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-38	->	[ scope-61, scope-62]
+|   |
+|   Project[int][0] - scope-39
+|
+|---b: New For Each(false,false)[bag] - scope-28
+    |   |
+    |   Cast[int] - scope-23
+    |   |
+    |   |---Project[bytearray][0] - scope-22
+    |   |
+    |   Cast[int] - scope-26
+    |   |
+    |   |---Project[bytearray][1] - scope-25
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-21
+Tez vertex scope-62
+# Combine plan on edge <scope-60>
+Local Rearrange[tuple]{int}(false) - scope-67	->	 scope-62
+|   |
+|   Project[int][0] - scope-66
+|
+|---Package(BloomPackager)[tuple]{int} - scope-65
+# Plan on vertex
+POValueOutputTez - scope-64	->	 [scope-54, scope-58]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-63
+Tez vertex scope-54
+# Plan on vertex
+a: Split - scope-68
+|   |
+|   d: BloomFilter Rearrange[tuple]{int}(false) - scope-34	<-	 scope-62	->	 scope-61
+|   |   |
+|   |   Project[int][0] - scope-35
+|   |
+|   |---a1: Filter[bag] - scope-11
+|       |   |
+|       |   Equal To[boolean] - scope-14
+|       |   |
+|       |   |---Project[int][0] - scope-12
+|       |   |
+|       |   |---Constant(3) - scope-13
+|   |
+|   POValueOutputTez - scope-55	->	 [scope-58]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-58
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-36	<-	 scope-62	->	 scope-61
+|   |
+|   Project[int][0] - scope-37
+|
+|---a2: Filter[bag] - scope-17
+    |   |
+    |   Equal To[boolean] - scope-20
+    |   |
+    |   |---Project[int][0] - scope-18
+    |   |
+    |   |---Constant(4) - scope-19
+    |
+    |---POValueInputTez - scope-59	<-	 scope-54
+Tez vertex scope-61
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-53
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-52
+    |   |
+    |   Project[int][0] - scope-44
+    |   |
+    |   Project[int][1] - scope-46
+    |   |
+    |   Project[int][3] - scope-48
+    |   |
+    |   Project[int][5] - scope-50
+    |
+    |---d: New For Each(true,true,true)[tuple] - scope-43
+        |   |
+        |   Project[bag][1] - scope-40
+        |   |
+        |   Project[bag][2] - scope-41
+        |   |
+        |   Project[bag][3] - scope-42
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-33