You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC

svn commit: r1783988 [23/24] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -66,7 +66,7 @@ e: Local Rearrange[tuple]{int}(false) -
     |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-67
 Tez vertex scope-94
 # Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-86
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-86
 |
 |---e: New For Each(true,true)[tuple] - scope-85
     |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld Wed Feb 22 09:43:41 2017
@@ -65,7 +65,7 @@ Tez vertex group scope-45	<-	 [scope-37,
 # No plan on vertex group
 Tez vertex scope-44
 # Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-36
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-36
 |
 |---e: New For Each(true,true)[tuple] - scope-35
     |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -58,7 +58,7 @@ Local Rearrange[tuple]{int}(false) - sco
     |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-58
 Tez vertex scope-77
 # Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-74
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-74
 |
 |---e: FRJoin[tuple] - scope-68	<-	 scope-81
     |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld Wed Feb 22 09:43:41 2017
@@ -58,7 +58,7 @@ Local Rearrange[tuple]{int}(false) - sco
     |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
 Tez vertex scope-36
 # Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-33
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-33
 |
 |---e: FRJoin[tuple] - scope-27	<-	 scope-40
     |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -48,7 +48,7 @@ Local Rearrange[tuple]{int}(false) - sco
 |---POShuffledValueInputTez - scope-166	<-	 [scope-163, scope-164]
 Tez vertex scope-162
 # Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-161
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-161
 |
 |---e: FRJoin[tuple] - scope-155	<-	 scope-165
     |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld Wed Feb 22 09:43:41 2017
@@ -47,7 +47,7 @@ Tez vertex group scope-123	<-	 [scope-11
 # No plan on vertex group
 Tez vertex scope-116
 # Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-115
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-115
 |
 |---e: FRJoin[tuple] - scope-109	<-	 scope-123
     |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -48,7 +48,7 @@ Local Rearrange[tuple]{tuple}(false) - s
 |   |
 |   Constant(DummyVal) - scope-122
 |
-|---New For Each(true,true)[tuple] - scope-127
+|---New For Each(false,true)[tuple] - scope-127
     |   |
     |   Project[int][0] - scope-110
     |   |
@@ -104,7 +104,7 @@ Partition Rearrange[tuple]{int}(false) -
     |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-102
 Tez vertex scope-142
 # Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-113
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-113
 |
 |---New For Each(true,true)[tuple] - scope-146
     |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld Wed Feb 22 09:43:41 2017
@@ -19,7 +19,7 @@ Local Rearrange[tuple]{tuple}(false) - s
 |   |
 |   Constant(DummyVal) - scope-68
 |
-|---New For Each(true,true)[tuple] - scope-74
+|---New For Each(false,true)[tuple] - scope-74
     |   |
     |   Project[int][0] - scope-71
     |   |
@@ -50,7 +50,7 @@ Local Rearrange[tuple]{tuple}(false) - s
 |   |
 |   Constant(DummyVal) - scope-78
 |
-|---New For Each(true,true)[tuple] - scope-84
+|---New For Each(false,true)[tuple] - scope-84
     |   |
     |   Project[int][0] - scope-81
     |   |
@@ -120,7 +120,7 @@ Partition Rearrange[tuple]{int}(false) -
     |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
 Tez vertex scope-57
 # Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-28
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-28
 |
 |---New For Each(true,true)[tuple] - scope-61
     |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -86,7 +86,7 @@ POIdentityInOutTez - scope-118	<-	 scope
 |   Project[int][0] - scope-90
 Tez vertex scope-119
 # Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-92
+d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-92
 |
 |---New For Each(true)[tuple] - scope-122
     |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld Wed Feb 22 09:43:41 2017
@@ -102,7 +102,7 @@ POIdentityInOutTez - scope-45	<-	 scope-
 |   Project[int][0] - scope-17
 Tez vertex scope-46
 # Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-19
+d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-19
 |
 |---New For Each(true)[tuple] - scope-49
     |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -64,7 +64,7 @@ POValueOutputTez - scope-73	->	 [scope-7
             |---POValueInputTez - scope-68	<-	 scope-65
 Tez vertex scope-70
 # Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-59
+d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-59
 |
 |---d: Limit - scope-58
     |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld Wed Feb 22 09:43:41 2017
@@ -64,7 +64,7 @@ POValueOutputTez - scope-36	->	 [scope-3
             |---POValueInputTez - scope-31	<-	 scope-28
 Tez vertex scope-33
 # Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-22
+d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-22
 |
 |---d: Limit - scope-21
     |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -15,7 +15,7 @@ Tez vertex scope-45
 |   |
 |   a2: Split - scope-68
 |   |   |
-|   |   a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-17
+|   |   a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-17
 |   |   |
 |   |   POValueOutputTez - scope-60	->	 [scope-57]
 |   |
@@ -74,7 +74,7 @@ Tez vertex scope-57
 # Plan on vertex
 1-3: Split - scope-67
 |   |
-|   d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-38
+|   d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-38
 |   |
 |   |---d: Filter[bag] - scope-34
 |       |   |
@@ -84,7 +84,7 @@ Tez vertex scope-57
 |       |   |
 |       |   |---Constant(500) - scope-36
 |   |
-|   e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-44
+|   e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-44
 |   |
 |   |---e: Filter[bag] - scope-39
 |       |   |

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld Wed Feb 22 09:43:41 2017
@@ -4,7 +4,7 @@
 #--------------------------------------------------
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-45	->	Tez vertex group scope-70,Tez vertex group scope-70,Tez vertex group scope-71,Tez vertex group scope-71,
+Tez vertex scope-45	->	Tez vertex group scope-70,Tez vertex group scope-71,
 Tez vertex scope-56	->	Tez vertex group scope-70,Tez vertex group scope-71,
 Tez vertex group scope-70
 Tez vertex group scope-71
@@ -15,11 +15,11 @@ Tez vertex scope-45
 |   |
 |   a2: Split - scope-68
 |   |   |
-|   |   a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-17
+|   |   a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-17
 |   |   |
 |   |   1-3: Split - scope-72
 |   |   |   |
-|   |   |   d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-77	->	 scope-38
+|   |   |   d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-77	->	 scope-38
 |   |   |   |
 |   |   |   |---d: Filter[bag] - scope-73
 |   |   |       |   |
@@ -29,7 +29,7 @@ Tez vertex scope-45
 |   |   |       |   |
 |   |   |       |   |---Constant(500) - scope-75
 |   |   |   |
-|   |   |   e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-83	->	 scope-44
+|   |   |   e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-83	->	 scope-44
 |   |   |   |
 |   |   |   |---e: Filter[bag] - scope-78
 |   |   |       |   |
@@ -53,7 +53,7 @@ Tez vertex scope-45
 |   |
 |   1-3: Split - scope-84
 |   |   |
-|   |   d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-89	->	 scope-38
+|   |   d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-89	->	 scope-38
 |   |   |
 |   |   |---d: Filter[bag] - scope-85
 |   |       |   |
@@ -63,7 +63,7 @@ Tez vertex scope-45
 |   |       |   |
 |   |       |   |---Constant(500) - scope-87
 |   |   |
-|   |   e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-95	->	 scope-44
+|   |   e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-95	->	 scope-44
 |   |   |
 |   |   |---e: Filter[bag] - scope-90
 |   |       |   |
@@ -98,7 +98,7 @@ Tez vertex scope-56
 # Plan on vertex
 1-3: Split - scope-96
 |   |
-|   d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-101	->	 scope-38
+|   d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-101	->	 scope-38
 |   |
 |   |---d: Filter[bag] - scope-97
 |       |   |
@@ -108,7 +108,7 @@ Tez vertex scope-56
 |       |   |
 |       |   |---Constant(500) - scope-99
 |   |
-|   e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-107	->	 scope-44
+|   e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-107	->	 scope-44
 |   |
 |   |---e: Filter[bag] - scope-102
 |       |   |

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java Wed Feb 22 09:43:41 2017
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
@@ -47,6 +48,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -62,12 +64,23 @@ public class TestTezAutoParallelism {
     private static Properties properties;
     private static MiniGenericCluster cluster;
 
+    private static final PathFilter PART_FILE_FILTER = new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+            if (path.getName().startsWith("part")) {
+                return true;
+            }
+            return false;
+        }
+    };
+
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
         cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ);
         properties = cluster.getProperties();
         //Test spilling to disk as tests here have multiple splits
         properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, "10");
+        properties.setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false");
         createFiles();
     }
 
@@ -84,6 +97,11 @@ public class TestTezAutoParallelism {
 
     @After
     public void tearDown() throws Exception {
+        removeProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION);
+        removeProperty(MRConfiguration.MAX_SPLIT_SIZE);
+        removeProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
+        removeProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+        removeProperty(TezConfiguration.TEZ_AM_LOG_LEVEL);
         pigServer.shutdown();
         pigServer = null;
     }
@@ -131,32 +149,53 @@ public class TestTezAutoParallelism {
     @Test
     public void testGroupBy() throws IOException{
         // parallelism is 3 originally, reduce to 1
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name;");
         pigServer.store("B", "output1");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER);
         assertEquals(files.length, 1);
+        fs.delete(new Path("output1"), true);
+    }
+
+    @Test
+    public void testBytesPerReducer() throws IOException{
+
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+
+        StringWriter writer = new StringWriter();
+        Util.createLogAppender("testAutoParallelism", writer, TezDagBuilder.class);
+        try {
+            pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+            pigServer.registerQuery("B = group A by name;");
+            pigServer.store("B", "output1");
+            FileSystem fs = cluster.getFileSystem();
+            FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER);
+            assertEquals(files.length, 10);
+            String log = writer.toString();
+            assertTrue(log.contains("For vertex - scope-13: parallelism=3"));
+            assertTrue(log.contains("For vertex - scope-14: parallelism=10"));
+        } finally {
+            Util.removeLogAppender("testAutoParallelism", TezDagBuilder.class);
+            Util.deleteFile(cluster, "output1");
+        }
     }
 
     @Test
     public void testOrderbyDecreaseParallelism() throws IOException{
         // order by parallelism is 3 originally, reduce to 1
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name parallel 3;");
@@ -164,86 +203,54 @@ public class TestTezAutoParallelism {
         pigServer.registerQuery("D = order C by age;");
         pigServer.store("D", "output2");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output2"), PART_FILE_FILTER);
         assertEquals(files.length, 1);
     }
 
     @Test
     public void testOrderbyIncreaseParallelism() throws IOException{
         // order by parallelism is 3 originally, increase to 4
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name parallel 3;");
         pigServer.registerQuery("C = foreach B generate group as name, AVG(A.age) as age;");
         pigServer.registerQuery("D = order C by age;");
         pigServer.store("D", "output3");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output3"), PART_FILE_FILTER);
         assertEquals(files.length, 4);
     }
 
     @Test
     public void testSkewedJoinDecreaseParallelism() throws IOException{
         // skewed join parallelism is 4 originally, reduce to 1
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
         pigServer.store("C", "output4");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output4"), PART_FILE_FILTER);
         assertEquals(files.length, 1);
     }
 
     @Test
     public void testSkewedJoinIncreaseParallelism() throws IOException{
         // skewed join parallelism is 3 originally, increase to 5
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
         pigServer.store("C", "output5");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER);
         assertEquals(files.length, 5);
     }
 
@@ -251,23 +258,15 @@ public class TestTezAutoParallelism {
     public void testSkewedFullJoinIncreaseParallelism() throws IOException{
         // skewed full join parallelism take the initial setting, since the join vertex has a broadcast(sample) dependency,
         // which prevent it changing parallelism
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name full, B by name using 'skewed';");
         pigServer.store("C", "output6");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER);
         assertEquals(files.length, 5);
     }
 
@@ -275,9 +274,9 @@ public class TestTezAutoParallelism {
     public void testSkewedJoinIncreaseParallelismWithScalar() throws IOException{
         // skewed join parallelism take the initial setting, since the join vertex has a broadcast(scalar) dependency,
         // which prevent it changing parallelism
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
@@ -287,19 +286,29 @@ public class TestTezAutoParallelism {
         pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
         pigServer.store("G", "output7");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output7"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output7"), PART_FILE_FILTER);
         assertEquals(files.length, 4);
     }
 
     @Test
+    public void testSkewedJoinRightInputAutoParallelism() throws IOException{
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "1.0");
+        setProperty(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+        pigServer.registerQuery("B = FILTER B by name == 'Noah';");
+        pigServer.registerQuery("B1 = group B by name;");
+        pigServer.registerQuery("C = join A by name, B1 by group using 'skewed';");
+        pigServer.store("C", "output8");
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus[] files = fs.listStatus(new Path("output8"), PART_FILE_FILTER);
+        assertEquals(5, files.length);
+    }
+
+    @Test
     public void testFlattenParallelism() throws IOException{
         String outputDir = "/tmp/testFlattenParallelism";
         String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"
@@ -386,9 +395,9 @@ public class TestTezAutoParallelism {
         // When there is a combiner operation involved user specified parallelism is overriden
         Util.createLogAppender("testAutoParallelism", writer, classesToLog);
         try {
-            pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-            pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
-            pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
+            setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+            setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
+            setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
             pigServer.setBatchOn();
             pigServer.registerScript(new ByteArrayInputStream(script.getBytes()));
             pigServer.executeBatch();
@@ -416,4 +425,12 @@ public class TestTezAutoParallelism {
             Util.deleteFile(cluster, outputDir);
         }
     }
+
+    private void setProperty(String property, String value) {
+        pigServer.getPigContext().getProperties().setProperty(property, value);
+    }
+
+    private void removeProperty(String property) {
+        pigServer.getPigContext().getProperties().remove(property);
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Wed Feb 22 09:43:41 2017
@@ -20,13 +20,21 @@ package org.apache.pig.tez;
 import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Properties;
+import java.util.Random;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
@@ -35,7 +43,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
 import org.apache.pig.builtin.OrcStorage;
 import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
 import org.apache.pig.test.Util;
@@ -66,11 +76,14 @@ public class TestTezCompiler {
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
+        resetFileLocalizer();
         pc = new PigContext(new TezLocalExecType(), new Properties());
+        FileUtils.deleteDirectory(new File("/tmp/pigoutput"));
     }
 
     @AfterClass
     public static void tearDownAfterClass() throws Exception {
+        resetFileLocalizer();
     }
 
     @Before
@@ -79,6 +92,7 @@ public class TestTezCompiler {
         pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY);
         pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION);
         pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
+        pc.getProperties().remove(PigConfiguration.PIG_BLOOMJOIN_STRATEGY);
         pigServer = new PigServer(pc);
     }
 
@@ -88,13 +102,20 @@ public class TestTezCompiler {
         TezPlanContainer.resetScope();
     }
 
+    private static void resetFileLocalizer() {
+        FileLocalizer.deleteTempFiles();
+        FileLocalizer.setInitialized(false);
+        // Set random seed to generate deterministic temporary paths
+        FileLocalizer.setR(new Random(1331L));
+    }
+
     @Test
     public void testStoreLoad() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
-                "store a into 'file:///tmp/output';" +
-                "b = load 'file:///tmp/output' as (x:int, y:int);" +
-                "store b into 'file:///tmp/output1';";
+                "store a into 'file:///tmp/pigoutput';" +
+                "b = load 'file:///tmp/pigoutput' as (x:int, y:int);" +
+                "store b into 'file:///tmp/pigoutput1';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-1.gld");
     }
@@ -103,25 +124,85 @@ public class TestTezCompiler {
     public void testStoreLoadMultiple() throws Exception {
         String query =
                 "a = load 'file:///tmp/input';" +
-                "store a into 'file:///tmp/output/Dir1';" +
-                "a = load 'file:///tmp/output/Dir1';" +
-                "store a into 'file:///tmp/output/Dir2' using BinStorage();" +
-                "a = load 'file:///tmp/output/Dir1';" +
-                "store a into 'file:///tmp/output/Dir3';" +
-                "a = load 'file:///tmp/output/Dir2' using BinStorage();" +
-                "store a into 'file:///tmp/output/Dir4';" +
-                "a = load 'file:///tmp/output/Dir3';" +
-                "b = load 'file:///tmp/output/Dir2' using BinStorage();" +
-                "c = load 'file:///tmp/output/Dir1';" +
+                "store a into 'file:///tmp/pigoutput/Dir1';" +
+                "a = load 'file:///tmp/pigoutput/Dir1';" +
+                "store a into 'file:///tmp/pigoutput/Dir2' using BinStorage();" +
+                "a = load 'file:///tmp/pigoutput/Dir1';" +
+                "store a into 'file:///tmp/pigoutput/Dir3';" +
+                "a = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" +
+                "store a into 'file:///tmp/pigoutput/Dir4';" +
+                "a = load 'file:///tmp/pigoutput/Dir3';" +
+                "b = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" +
+                "c = load 'file:///tmp/pigoutput/Dir1';" +
                 "d = cogroup a by $0, b by $0, c by $0;" +
-                "store d into 'file:///tmp/output/Dir5';";
+                "store d into 'file:///tmp/pigoutput/Dir5';";
 
-        // To get around difference in ordering of operators in plan due to JDK7 and JDK8
-        if (System.getProperties().getProperty("java.version").startsWith("1.8")) {
-            run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld");
-        } else {
-            run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld");
-        }
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld");
+    }
+
+    @Test
+    public void testStoreLoadJoinMultiple() throws Exception {
+        // Case where different store load statements are used in a single join
+        String query =
+                "a = load 'file:///tmp/pigoutput/Dir1';" +
+                "b = filter a by $0 == 1;" +
+                "c = filter a by $0 == 2;" +
+                "store b into 'file:///tmp/pigoutput/Dir2';" +
+                "store c into 'file:///tmp/pigoutput/Dir3';" +
+                "d = load 'file:///tmp/pigoutput/Dir2';" +
+                "e = load 'file:///tmp/pigoutput/Dir3';" +
+                "f = join d by $0, e by $0;" +
+                "store f into 'file:///tmp/pigoutput/Dir5';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-3.gld");
+
+        resetScope();
+        query =
+                "a = load 'file:///tmp/pigoutput/Dir1';" +
+                "b = distinct a;" +
+                "c = group a by $0;" +
+                "store b into 'file:///tmp/pigoutput/Dir2';" +
+                "store c into 'file:///tmp/pigoutput/Dir3';" +
+                "d = load 'file:///tmp/pigoutput/Dir2';" +
+                "e = load 'file:///tmp/pigoutput/Dir3';" +
+                "f = load 'file:///tmp/pigoutput/Dir4';" +
+                "g = join d by $0, f by $0 using 'repl';" +
+                "h = join e by $0, f by $0 using 'repl';" +
+                "store g into 'file:///tmp/pigoutput/Dir4';" +
+                "store h into 'file:///tmp/pigoutput/Dir5';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-4.gld");
+    }
+
+    @Test
+    public void testStoreLoadSplit() throws Exception {
+        // Cases where segmenting into two DAGs is not straight forward due to Split.
+        // The Split operator is required in both the segments.
+
+        resetFileLocalizer();
+        // Split operator as root vertex
+        String query =
+                "a = load 'file:///tmp/input';" +
+                "a1 = filter a by $0 == 5;" +
+                "store a1 into 'file:///tmp/pigoutput/Dir1';" +
+                "b = load 'file:///tmp/pigoutput/Dir1';" +
+                "c = join a by $0, b by $0;" +
+                "store c into 'file:///tmp/pigoutput/Dir2';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-5.gld");
+
+        // Split operator as intermediate vertex
+        query =
+                "a = load 'file:///tmp/input';" +
+                "a = distinct a;" +
+                "store a into 'file:///tmp/pigoutput/Dir1';" +
+                "b = load 'file:///tmp/pigoutput/Dir1';" +
+                "c = join a by $0, b by $0;" +
+                "store c into 'file:///tmp/pigoutput/Dir2';";
+
+        resetScope();
+        resetFileLocalizer();
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld");
     }
 
     @Test
@@ -129,7 +210,7 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = native 'hadoop-examples.jar' Store a into '/tmp/table_testNativeMRJobSimple_input' Load '/tmp/table_testNativeMRJobSimple_output' `wordcount /tmp/table_testNativeMRJobSimple_input /tmp/table_testNativeMRJobSimple_output`;" +
-                "store b into 'file:///tmp/output';";
+                "store b into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Native-1.gld");
     }
@@ -140,7 +221,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = filter a by x > 0;" +
                 "c = foreach b generate y;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld");
     }
@@ -151,7 +232,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = group a by x;" +
                 "c = foreach b generate group, COUNT(a.x);" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld");
     }
@@ -163,12 +244,131 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = join a by x, b by x;" +
                 "d = foreach c generate a::x as x, y, z;" +
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld");
     }
 
     @Test
+    public void testBloomJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x, y:int);" +
+                "b = load 'file:///tmp/input2' as (x, z:int);" +
+                "c = load 'file:///tmp/input2' as (x, w:int);" +
+                "d = join b by x, a by x, c by x using 'bloom';" +
+                "e = foreach d generate a::x as x, y, z, w;" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld");
+    }
+
+    @Test
+    public void testBloomJoinLeftOuter() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:chararray, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:chararray, z:int);" +
+                "d = join a by x left, b by x using 'bloom';" +
+                "e = foreach d generate a::x as x, y, z;" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld");
+    }
+
+    @Test
+    public void testBloomJoinUnion() throws Exception {
+        // Left input from a union
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = load 'file:///tmp/input3' as (x:int, z:int);" +
+                "b = union b, c;" +
+                "d = join a by x, b by x using 'bloom';" +
+                "e = foreach d generate a::x as x, y, z;" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld");
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null);
+
+        resetScope();
+        // Right input from a union
+        query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = load 'file:///tmp/input3' as (x:int, z:int);" +
+                "b = union b, c;" +
+                "d = join b by x, a by x using 'bloom';" +
+                "e = foreach d generate a::x as x, y, z;" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        // Needs shared edges and PIG-3856 to be a more optimial plan
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld");
+    }
+
+    @Test
+    public void testBloomJoinSplit() throws Exception {
+        // Left input from a split
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "a1 = filter a by x == 3;" +
+                "a2 = filter a by x == 4;" +
+                "d = join a1 by x, a2 by x, b by x using 'bloom';" +
+                "e = foreach d generate a1::x as x, a1::y as y1, a2::y as y2, z;" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld");
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null);
+
+        resetScope();
+        // Right input from a split
+        query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "a1 = filter a by x == 3;" +
+                "a2 = filter a by x == 4;" +
+                "d = join b by x, a1 by x using 'bloom';" +
+                "e = foreach d generate a1::x as x, y, z;" +
+                "store a2 into 'file:///tmp/pigoutput/a2';" +
+                "store e into 'file:///tmp/pigoutput/e';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld");
+    }
+
+    @Test
+    public void testBloomSelfJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = filter a by x < 5;" +
+                "c = filter a by x == 10;" +
+                "d = filter a by x > 10;" +
+                "e = join b by x, c by x, d by x using 'bloom';" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld");
+    }
+
+    @Test
     public void testSelfJoin() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -176,7 +376,7 @@ public class TestTezCompiler {
                 "c = filter a by x == 10;" +
                 "d = filter a by x > 10;" +
                 "e = join b by x, c by x, d by x;" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-1.gld");
     }
@@ -188,7 +388,7 @@ public class TestTezCompiler {
                 "b = filter a by x < 5;" +
                 "c = filter a by x == 10;" +
                 "d = join b by x, c by x using 'skewed';" +
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld");
     }
@@ -201,7 +401,7 @@ public class TestTezCompiler {
                 "c = filter a by x == 10;" +
                 "d = filter a by x > 10;" +
                 "e = join b by x, c by x, d by x using 'replicated';" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-3.gld");
     }
@@ -213,7 +413,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = union a, b;" +
                 "d = join b by x, c by x using 'replicated';" +
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld");
     }
@@ -226,7 +426,7 @@ public class TestTezCompiler {
                 "a2 = filter a by x < 2;" +
                 "b = union a1, a2;" +
                 "c = join b by x, a by x;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld");
     }
@@ -242,7 +442,7 @@ public class TestTezCompiler {
                 "a5 = foreach a4 generate a2::x as x, a3::y as y;" +
                 "b = union a1, a5;" +
                 "c = join b by x, a by x;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld");
     }
@@ -254,7 +454,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = cross a, b;" +
                 "d = foreach c generate a::x as x, y, z;" +
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-1.gld");
     }
@@ -266,7 +466,7 @@ public class TestTezCompiler {
                 "b = filter a by x < 5;" +
                 "c = filter a by x == 10;" +
                 "d = cross b, c;" +
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-2.gld");
     }
@@ -278,7 +478,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = cross b, a;" +
                 "d = foreach c generate a.x, a.y, z;" + //Scalar
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld");
     }
@@ -290,7 +490,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = join a by x, b by x using 'skewed';" +
                 "d = foreach c generate a::x as x, y, z;" +
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld");
     }
@@ -303,7 +503,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = join a by x, b by x using 'skewed';" +
                 "d = foreach c generate a::x as x, y, z;" +
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld");
     }
@@ -314,7 +514,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = limit a 10;" +
                 "c = foreach b generate y;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld");
     }
@@ -325,7 +525,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = order a by x, y;" +
                 "c = limit b 10;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld");
     }
@@ -338,18 +538,31 @@ public class TestTezCompiler {
                 "g = group a all;" +
                 "h = foreach g generate COUNT(a) as sum;" +
                 "c = limit b h.sum/2;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld");
     }
 
     @Test
+    public void testLimitReplJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input' as (x:int, y:int);" +
+                "c = limit a 1;" +
+                "d = join c by x, b by x using 'replicated';" +
+                "store a into 'file:///tmp/pigoutput/a';" +
+                "store d into 'file:///tmp/pigoutput/d';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-4.gld");
+    }
+
+    @Test
     public void testDistinct() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = distinct a;" +
                 "c = foreach b generate y;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld");
     }
@@ -360,7 +573,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = group a by x;" +
                 "c = foreach b { d = distinct a; generate COUNT(d); };" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld");
     }
@@ -374,7 +587,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = load 'file:///tmp/input3' as (x:int, z:int);" +
                 "d = join a by x, b by x, c by x using 'replicated';" +
-                "store d into 'file:///tmp/output/d';";
+                "store d into 'file:///tmp/pigoutput/d';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld");
     }
@@ -387,7 +600,7 @@ public class TestTezCompiler {
                 "b1 = foreach b generate group, COUNT(a.y);" +
                 "c = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "d = join b1 by group, c by x using 'replicated';" +
-                "store d into 'file:///tmp/output/e';";
+                "store d into 'file:///tmp/pigoutput/e';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld");
     }
@@ -397,7 +610,7 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" +
                 "b = stream a through `stream.pl -n 5`;" +
-                "STORE b INTO 'file:///tmp/output';";
+                "STORE b INTO 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld");
     }
@@ -408,7 +621,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int, z:int);" +
                 "b = group a by $0;" +
                 "c = foreach b { d = limit a 10; e = order d by $1; f = order e by $0; generate group, f;};"+
-                "store c INTO 'file:///tmp/output';";
+                "store c INTO 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld");
 
@@ -422,7 +635,7 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" +
                 "b = order a by x;" +
-                "STORE b INTO 'file:///tmp/output';";
+                "STORE b INTO 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld");
     }
@@ -433,7 +646,7 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" +
                 "b = filter a by x == 1;" +
                 "c = order b by x;" +
-                "STORE c INTO 'file:///tmp/output';";
+                "STORE c INTO 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld");
     }
@@ -444,7 +657,7 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' using org.apache.pig.backend.hadoop.hbase.HBaseStorage(',') as (x:int, y:int);" +
                 "b = order a by x;" +
-                "STORE b INTO 'file:///tmp/output';";
+                "STORE b INTO 'file:///tmp/pigoutput';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld");
         setProperty("pig.sort.readonce.loadfuncs", null);
@@ -459,7 +672,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input2' as (x:int, z:int);" +
                 "c = cogroup a by x, b by x;" +
                 "d = foreach c generate group, COUNT(a.y), COUNT(b.z);" +
-                "store d into 'file:///tmp/output/d';";
+                "store d into 'file:///tmp/pigoutput/d';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld");
     }
@@ -469,9 +682,9 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "split a into b if x <= 5, c if x <= 10, d if x >10;" +
-                "store b into 'file:///tmp/output/b';" +
-                "store c into 'file:///tmp/output/c';" +
-                "store d into 'file:///tmp/output/d';";
+                "store b into 'file:///tmp/pigoutput/b';" +
+                "store c into 'file:///tmp/pigoutput/c';" +
+                "store d into 'file:///tmp/pigoutput/d';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld");
@@ -500,14 +713,14 @@ public class TestTezCompiler {
                 // Needs to be removed in Tez plan as well.
                 "f1 = limit f 1;" +
                 "f2 = union d1, f1;" +
-                "store b1 into 'file:///tmp/output/b1';" +
-                "store b2 into 'file:///tmp/output/b2';" +
-                "store c1 into 'file:///tmp/output/c1';" +
-                "store c3 into 'file:///tmp/output/c1';" +
-                "store d1 into 'file:///tmp/output/d1';" +
-                "store e1 into 'file:///tmp/output/e1';" +
-                "store f1 into 'file:///tmp/output/f1';" +
-                "store f2 into 'file:///tmp/output/f2';";
+                "store b1 into 'file:///tmp/pigoutput/b1';" +
+                "store b2 into 'file:///tmp/pigoutput/b2';" +
+                "store c1 into 'file:///tmp/pigoutput/c1';" +
+                "store c3 into 'file:///tmp/pigoutput/c1';" +
+                "store d1 into 'file:///tmp/pigoutput/d1';" +
+                "store e1 into 'file:///tmp/pigoutput/e1';" +
+                "store f1 into 'file:///tmp/pigoutput/f1';" +
+                "store f2 into 'file:///tmp/pigoutput/f2';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld");
@@ -523,8 +736,8 @@ public class TestTezCompiler {
                 "b = foreach b generate group, COUNT(a.x);" +
                 "c = group a by (x,y);" +
                 "c = foreach c generate group, COUNT(a.y);" +
-                "store b into 'file:///tmp/output/b';" +
-                "store c into 'file:///tmp/output/c';";
+                "store b into 'file:///tmp/pigoutput/b';" +
+                "store c into 'file:///tmp/pigoutput/c';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld");
@@ -540,9 +753,9 @@ public class TestTezCompiler {
                 "c = join a by x, b by x;" +
                 "d = foreach c generate $0, $1, $3;" +
                 "e = foreach c generate $0, $1, $2, $3;" +
-                "store c into 'file:///tmp/output/c';" +
-                "store d into 'file:///tmp/output/d';" +
-                "store e into 'file:///tmp/output/e';";
+                "store c into 'file:///tmp/pigoutput/c';" +
+                "store d into 'file:///tmp/pigoutput/d';" +
+                "store e into 'file:///tmp/pigoutput/e';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld");
@@ -555,13 +768,13 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
                 "b = group a by x;" + //b: {group: int,a: {(x: int,y: int)}}
-                "store b into 'file:///tmp/output/b';" +
+                "store b into 'file:///tmp/pigoutput/b';" +
                 "c = foreach b generate a.x, a.y;" + //c: {{(x: int)},{(y: int)}}
-                "store c into 'file:///tmp/output/c';" +
+                "store c into 'file:///tmp/pigoutput/c';" +
                 "d = foreach b GENERATE FLATTEN(a);" + //d: {a::x: int,a::y: int}
-                "store d into 'file:///tmp/output/d';" +
+                "store d into 'file:///tmp/pigoutput/d';" +
                 "e = foreach d GENERATE a::x, a::y;" +
-                "store e into 'file:///tmp/output/e';";
+                "store e into 'file:///tmp/pigoutput/e';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld");
@@ -576,8 +789,8 @@ public class TestTezCompiler {
                 "b = group a by x;" +
                 "c = foreach b generate group, COUNT(a) as cnt;" +
                 "SPLIT a into d if (2 * c.cnt) < y, e OTHERWISE;" +
-                "store d into 'file:///tmp/output1';" +
-                "store e into 'file:///tmp/output2';";
+                "store d into 'file:///tmp/pigoutput1';" +
+                "store e into 'file:///tmp/pigoutput2';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6.gld");
@@ -594,7 +807,7 @@ public class TestTezCompiler {
                 "c = join a by $0, b by $0 using 'replicated';" +
                 "d = join a by $1, b by $1 using 'replicated';" +
                 "e = union c,d;" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7.gld");
@@ -613,7 +826,7 @@ public class TestTezCompiler {
                 "c = foreach c generate $0 as c1;" +
                 "d = group a by x;" +
                 "e = foreach d generate group, b.b1, c.c1;" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld");
@@ -623,18 +836,67 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testMultiQueryMultipleReplicateJoinWithUnion() throws Exception {
+        // Replicate joins are from a split
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, y:int);" +
+                "c = load 'file:///tmp/input3' as (x:int, y:int);" +
+                "d = union a, b;" +
+                "e = filter c by y < 2;" +
+                "f = filter c by y > 5;" +
+                "g = join d by x, e by x using 'replicated';" +
+                "h = join g by d::x, f by x using 'replicated';" +
+                "store h into 'file:///tmp/pigoutput';";
+
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9-OPTOFF.gld");
+
+        // Union is also from a split
+        query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = filter a by x == 2;" +
+                "c = load 'file:///tmp/input3' as (x:int, y:int);" +
+                "d = union a, b;" +
+                "e = filter c by y < 2;" +
+                "f = filter c by y > 5;" +
+                "g = join d by x, e by x using 'replicated';" +
+                "h = join g by d::x, f by x using 'replicated';" +
+                "store h into 'file:///tmp/pigoutput';";
+
+        resetScope();
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10-OPTOFF.gld");
+    }
+
+    @Test
     public void testUnionStore() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld");
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b PARALLEL 15;" +
+                "store c into 'file:///tmp/pigoutput';";
+        // Union optimization should be turned off if PARALLEL clause is specified
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
     }
 
     @Test
@@ -643,14 +905,15 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         String oldSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS);
         String oldUnSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
-        // Plan should not have union optimization applied
+        // Plan should not have union optimization applied as PigStorage is unsupported
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, null);
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, OrcStorage.class.getName());
@@ -658,27 +921,37 @@ public class TestTezCompiler {
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
-                "store c into 'file:///tmp/output' using " + DummyStoreWithOutputFormat.class.getName() + "();";
-        // Plan should not have union optimization applied
+                "store c into 'file:///tmp/pigoutput' using " + DummyStoreWithOutputFormat.class.getName() + "();";
+        // Plan should not have union optimization applied as only ORC is supported
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
 
         resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null);
+        query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b;" +
+                "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();";
+        // Plan should not have union optimization applied as supportsParallelWriteToStoreLocation returns false
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld");
+
+        resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null);
         query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "split a into b if x > 5, c if x == 7, d if x == 8, e otherwise;" +
                 "u1 = union onschema b, c;" +
-                "store u1 into 'file:///tmp/output/u1';" +
+                "store u1 into 'file:///tmp/pigoutput/u1';" +
                 //TODO: multiple levels of split not merged
                 "u2 = union onschema a, b, c;" +
-                "store u2 into 'file:///tmp/output/u2';" +
+                "store u2 into 'file:///tmp/pigoutput/u2';" +
                 "u3 = union onschema d, e;" +
-                "store u3 into 'file:///tmp/output/u3';" +
+                "store u3 into 'file:///tmp/pigoutput/u3';" +
                 "j1 = join d by x, a by x using 'replicated';" +
                 "j1 = foreach j1 generate d::x as x, d::y as y;" +
                 "u4 = union onschema j1, a;" +
-                "store u4 into 'file:///tmp/output/u4';";
+                "store u4 into 'file:///tmp/pigoutput/u4';";
 
         // Plan should have union optimization applied even for unsupported storefunc
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld");
@@ -696,7 +969,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = group c by x;" +
                 "e = foreach d generate group, SUM(c.y);" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld");
@@ -712,7 +985,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join c by x, d by x;" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld");
@@ -729,7 +1002,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join c by x, d by x using 'replicated';" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         //TODO: PIG-3856 Not optimized
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -743,7 +1016,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join d by x, c by x using 'replicated';" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         // Optimized
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -762,7 +1035,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join c by x, d by x using 'skewed';" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld");
@@ -779,7 +1052,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
                 "d = order c by x;" +
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld");
@@ -795,7 +1068,7 @@ public class TestTezCompiler {
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
                 "c = union onschema a, b;" +
                 "d = limit c 1;" +
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld");
@@ -811,9 +1084,9 @@ public class TestTezCompiler {
                 "split a into a1 if x > 100, a2 otherwise;" +
                 "c = union onschema a1, a2, b;" +
                 "split c into d if x > 500, e otherwise;" +
-                "store a2 into 'file:///tmp/output/a2';" +
-                "store d into 'file:///tmp/output/d';" +
-                "store e into 'file:///tmp/output/e';";
+                "store a2 into 'file:///tmp/pigoutput/a2';" +
+                "store d into 'file:///tmp/pigoutput/d';" +
+                "store e into 'file:///tmp/pigoutput/e';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld");
@@ -831,8 +1104,8 @@ public class TestTezCompiler {
                 "d = load 'file:///tmp/input1' as (x:int, y:chararray);" +
                 "e = union onschema c, d;" +
                 "f = group e by x;" +
-                "store e into 'file:///tmp/output1';" +
-                "store f into 'file:///tmp/output2';";
+                "store e into 'file:///tmp/pigoutput1';" +
+                "store f into 'file:///tmp/pigoutput2';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld");
@@ -850,7 +1123,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, y:chararray);" +
                 "e = union onschema c, d;" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld");
@@ -872,10 +1145,10 @@ public class TestTezCompiler {
                 "c2 = foreach c generate y, x;" +
                 "c3 = union c1, c2;" +
                 "a1 = union onschema b3, c3;" +
-                "store a1 into 'file:///tmp/output1';" +
+                "store a1 into 'file:///tmp/pigoutput1';" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join a1 by x, d by x using 'skewed';" +
-                "store e into 'file:///tmp/output2';";
+                "store e into 'file:///tmp/pigoutput2';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12.gld");
@@ -892,7 +1165,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join c by x, d by x using 'replicated';" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld");
@@ -906,7 +1179,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join d by x, c by x using 'replicated';" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -924,7 +1197,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join c by x, d by x using 'skewed';" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld");
@@ -938,7 +1211,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = join d by x, c by x using 'skewed';" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -956,7 +1229,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = filter c by x == d.x;" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17.gld");
@@ -970,7 +1243,7 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
                 "e = filter d by x == c.x;" +
-                "store e into 'file:///tmp/output';";
+                "store e into 'file:///tmp/pigoutput';";
 
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -981,11 +1254,76 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testUnionSplitUnionStore() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input1' as (y:chararray, x:int);" +
+                "c = union onschema a, b;" +
+                "split c into d if x <= 5, e if x <= 10, f if x >10, g if y == '6';" +
+                "h = union onschema d, e;" +
+                "i = union onschema f, g;" +
+                "store h into 'file:///tmp/pigoutput/1';" +
+                "store i into 'file:///tmp/pigoutput/2';";
+
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19-OPTOFF.gld");
+
+        // With a join in between
+        query =
+                "a = load 'file:///tmp/input' as (x:chararray);" +
+                "b = load 'file:///tmp/input' as (x:chararray);" +
+                "c = load 'file:///tmp/input' as (y:chararray);" +
+                "u1 = union onschema a, b;" +
+                "SPLIT u1 INTO r IF x != '', s OTHERWISE;" +
+                "d = JOIN r BY x LEFT, c BY y;" +
+                "u2 = UNION ONSCHEMA d, s;" +
+                "e = FILTER u2 BY x == '';" +
+                "f = FILTER u2 BY x == 'm';" +
+                "u3 = UNION ONSCHEMA e, f;" +
+                "store u3 into 'file:///tmp/pigoutput';";
+
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20-OPTOFF.gld");
+    }
+
+    @Test
+    public void testUnionSplitUnionLimitStore() throws Exception {
+        // Similar to previous testcase but a LIMIT at the end to test a non-store vertex group
+        String query =
+                "a = load 'file:///tmp/input' as (x:chararray);" +
+                "b = load 'file:///tmp/input' as (x:chararray);" +
+                "c = load 'file:///tmp/input' as (y:chararray);" +
+                "u1 = union onschema a, b;" +
+                "SPLIT u1 INTO r IF x != '', s OTHERWISE;" +
+                "d = JOIN r BY x LEFT, c BY y;" +
+                "u2 = UNION ONSCHEMA d, s;" +
+                "e = FILTER u2 BY x == '';" +
+                "f = FILTER u2 BY x == 'm';" +
+                "u3 = UNION ONSCHEMA e, f;" +
+                "SPLIT u3 INTO t if x != '', u OTHERWISE;" +
+                "v = LIMIT t 10;" +
+                "store v into 'file:///tmp/pigoutput';";
+
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21-OPTOFF.gld");
+    }
+
+    @Test
     public void testRank() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
                 "b = rank a;" +
-                "store b into 'file:///tmp/output/d';";
+                "store b into 'file:///tmp/pigoutput/d';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-1.gld");
     }
@@ -996,7 +1334,7 @@ public class TestTezCompiler {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
                 "b = rank a by x;" +
-                "store b into 'file:///tmp/output/d';";
+                "store b into 'file:///tmp/pigoutput/d';";
 
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld");
     }
@@ -1052,5 +1390,32 @@ public class TestTezCompiler {
         assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)),
                 TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean)));
     }
+
+    public static class TestDummyStoreFunc extends StoreFunc {
+
+        @Override
+        public OutputFormat getOutputFormat() throws IOException {
+            return null;
+        }
+
+        @Override
+        public void setStoreLocation(String location, Job job)
+                throws IOException {
+        }
+
+        @Override
+        public void prepareToWrite(RecordWriter writer) throws IOException {
+        }
+
+        @Override
+        public void putNext(Tuple t) throws IOException {
+        }
+
+        @Override
+        public Boolean supportsParallelWriteToStoreLocation() {
+            return false;
+        }
+
+    }
 }