You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/01/29 23:19:35 UTC

svn commit: r1727655 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ src/org/apache/pig/backend/hadoop/executionengine/tez/util/ src/org/apache/pig/impl/builtin/ test/e2e/pig/tests/ test/org/apache/pig/test/dat...

Author: rohini
Date: Fri Jan 29 22:19:35 2016
New Revision: 1727655

URL: http://svn.apache.org/viewvc?rev=1727655&view=rev
Log:
PIG-4690: Union with self replicate join will fail in Tez (rohini)

Added:
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java
    pig/trunk/test/e2e/pig/tests/multiquery.conf
    pig/trunk/test/e2e/pig/tests/nightly.conf
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jan 29 22:19:35 2016
@@ -85,6 +85,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4690: Union with self replicate join will fail in Tez (rohini)
+
 PIG-4791: PORelationToExprProject filters records instead of returning emptybag in nested foreach after union (rohini)
 
 PIG-4779: testBZ2Concatenation[pig.bzip.use.hadoop.inputformat = true] failing due to successful read (knoguchi)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Jan 29 22:19:35 2016
@@ -69,6 +69,7 @@ public class MultiQueryOptimizerTez exte
             }
 
             List<TezOperator> splittees = new ArrayList<TezOperator>();
+            Set<TezOperator> mergedNonPackageInputSuccessors = new HashSet<TezOperator>();
 
             List<TezOperator> successors = getPlan().getSuccessors(tezOp);
             for (TezOperator successor : successors) {
@@ -117,12 +118,18 @@ public class MultiQueryOptimizerTez exte
                 // since Tez does not handle double edge between vertexes
                 // Successor could be
                 //    - union operator (if no union optimizer changing it to vertex group which supports multiple edges)
-                //    - self replicate join
-                //    - self skewed join
-                // Self hash joins can write to same output edge and is handled by POShuffleTezLoad
-                // TODO: PIG-3876 to handle this by writing to same edge
+                //    - self replicate join, self skewed join or scalar
+                //    - POPackage (Self hash joins can write to same output edge and is handled by POShuffleTezLoad)
                 Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>();
+                // These successors should not be merged due to diamond shape
+                Set<TezOperator> toNotMergeSuccessors = new HashSet<TezOperator>();
+                // These successors can be merged
                 Set<TezOperator> toMergeSuccessors = new HashSet<TezOperator>();
+                // These successors (Scalar, POFRJoinTez) can be merged if they are the only input.
+                // Only in case of POPackage(POShuffleTezLoad) multiple inputs can be handled from a Split
+                Set<TezOperator> nonPackageInputSuccessors = new HashSet<TezOperator>();
+                boolean canMerge = true;
+
                 mergedSuccessors.addAll(successors);
                 for (TezOperator splittee : splittees) {
                     if (getPlan().getSuccessors(splittee) != null) {
@@ -136,18 +143,52 @@ public class MultiQueryOptimizerTez exte
                                     UnionOptimizer.isOptimizable(succSuccessor,
                                             unionSupportedStoreFuncs,
                                             unionUnsupportedStoreFuncs))) {
+                                toNotMergeSuccessors.add(succSuccessor);
+                            } else {
                                 toMergeSuccessors.add(succSuccessor);
+                                List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor);
+                                if (unionSuccessors != null) {
+                                    for (TezOperator unionSuccessor : unionSuccessors) {
+                                        if (TezCompilerUtil.isNonPackageInput(succSuccessor.getOperatorKey().toString(), unionSuccessor)) {
+                                            canMerge = canMerge ? nonPackageInputSuccessors.add(unionSuccessor) : false;
+                                        } else {
+                                            toMergeSuccessors.add(unionSuccessor);
+                                        }
+                                    }
+                                }
                             }
-                        } else if (successors.contains(succSuccessor)) {
-                                // Self replicate/skewed join
-                                toMergeSuccessors.add(succSuccessor);
+                        } else if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) {
+                            // Output goes to scalar or POFRJoinTez instead of POPackage
+                            // POPackage/POShuffleTezLoad can handle multiple inputs from a Split.
+                            // But if input is sent to any other operator like
+                            // scalar, POFRJoinTez then we need to ensure it is the only one.
+                            canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false;
+                        } else {
+                            toMergeSuccessors.add(succSuccessor);
                         }
                     }
                 }
 
-                mergedSuccessors.retainAll(toMergeSuccessors);
+                if (canMerge) {
+                    if (!nonPackageInputSuccessors.isEmpty() || !mergedNonPackageInputSuccessors.isEmpty()) {
+                        // If a non-POPackage input successor is already merged or
+                        // if there is a POPackage and non-POPackage to be merged,
+                        // then skip as it will become diamond shape
+                        // For eg: POFRJoinTez+Scalar, POFRJoinTez/Scalar+POPackage
+                        if (nonPackageInputSuccessors.removeAll(mergedSuccessors)
+                                || toMergeSuccessors.removeAll(mergedNonPackageInputSuccessors)
+                                || toMergeSuccessors.removeAll(nonPackageInputSuccessors)) {
+                            continue;
+                        }
+                    }
+                } else {
+                    continue;
+                }
+
+                mergedSuccessors.retainAll(toNotMergeSuccessors);
                 if (mergedSuccessors.isEmpty()) { // no shared edge after merge
                     splittees.add(successor);
+                    mergedNonPackageInputSuccessors.addAll(nonPackageInputSuccessors);
                 }
             }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Jan 29 22:19:35 2016
@@ -36,14 +36,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -147,6 +145,12 @@ public class TezOperDependencyParallelis
             roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
         }
 
+        if (roundedEstimatedParallelism == 0) {
+            throw new IOException("Estimated parallelism for "
+                    + tezOper.getOperatorKey().toString()
+                    + " is 0 which is unexpected");
+        }
+
         return roundedEstimatedParallelism;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Fri Jan 29 22:19:35 2016
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -193,6 +192,29 @@ public class TezCompilerUtil {
         } catch (VisitorException e) {
             throw new PlanException(e);
         }
+    }
+
+    public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException {
+        try {
+            List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
+            for (TezInput input : inputs) {
+                if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
+                    return true;
+                }
+            }
+            List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
+            for (POUserFunc userFunc : userFuncs) {
+                if (userFunc.getFunc() instanceof ReadScalarsTez) {
+                    TezInput input = (TezInput)userFunc.getFunc();
+                    if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        } catch (VisitorException e) {
+            throw new PlanException(e);
+        }
     }
 
     static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) {

Modified: pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java Fri Jan 29 22:19:35 2016
@@ -18,8 +18,11 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
+import java.text.MessageFormat;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -32,6 +35,9 @@ import org.apache.pig.impl.util.UDFConte
 
 
 public class GFCross extends EvalFunc<DataBag> {
+
+    private static final Log LOG = LogFactory.getLog(GFCross.class);
+
     private int numInputs, myNumber, numGroupsPerInput, numGroupsGoingTo;
     private BagFactory mBagFactory = BagFactory.getInstance();
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -70,6 +76,12 @@ public class GFCross extends EvalFunc<Da
 
             numGroupsPerInput = (int) Math.ceil(Math.pow(parallelism, 1.0/numInputs));
             numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
+
+            LOG.info(MessageFormat.format("Parallelism = {0}, numInputs = {1}, myNumber = {2},"
+                            + " numGroupsPerInput = {3}, numGroupsGoingTo = {4}",
+                            parallelism, numInputs, myNumber,
+                            numGroupsPerInput, numGroupsGoingTo));
+
         }
 
         DataBag output = mBagFactory.newDefaultBag();

Modified: pig/trunk/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/multiquery.conf?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/trunk/test/e2e/pig/tests/multiquery.conf Fri Jan 29 22:19:35 2016
@@ -811,6 +811,16 @@ b = filter a by gpa >= 3;
 c = filter a by gpa < 2;
 d = join c by name full outer, b by name PARALLEL 2;
 store d into ':OUTPATH:';\,
+            },
+            {
+            # Self join union replicated
+            'num' => 9,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+a1 = filter a by gpa == 0.00;
+a2 = filter a by gpa == 4.00;
+b = union a1, a2;
+c = JOIN a by name, b by name using 'replicated';
+store c into ':OUTPATH:';\,
             }
             ] # end of tests
         },

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Fri Jan 29 22:19:35 2016
@@ -3654,6 +3654,18 @@ f = foreach e generate AVG(d.age) as avg
 y = foreach a generate age/c.avg, age/f.avg;
 store y into ':OUTPATH:';\,
                     },
+                    {
+                    # test scalar with split
+                    'num' => 5,
+                    'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = group a all;
+c = foreach b generate AVG(a.age) as avg, COUNT(a.age) as cnt;
+d = foreach c generate avg;
+e = group d by $0;
+f = foreach e generate group, c.avg, c.cnt;
+store f into ':OUTPATH:';\,
+                    },
                 ]
             },
             {

Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld?rev=1727655&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld Fri Jan 29 22:19:35 2016
@@ -0,0 +1,109 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-61	->	Tez vertex scope-65,Tez vertex scope-69,
+Tez vertex scope-65	->	Tez vertex scope-69,
+Tez vertex scope-57	->	Tez vertex scope-67,Tez vertex scope-69,
+Tez vertex scope-67	->	Tez vertex scope-69,
+Tez vertex scope-69
+
+Tez vertex scope-61
+# Plan on vertex
+b: Split - scope-72
+|   |
+|   POValueOutputTez - scope-71	->	 [scope-69]
+|   |
+|   POValueOutputTez - scope-62	->	 [scope-65]
+|
+|---b: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-65
+# Plan on vertex
+c: Local Rearrange[tuple]{tuple}(false) - scope-29	->	 scope-69
+|   |
+|   Project[int][0] - scope-30
+|   |
+|   Project[int][1] - scope-31
+|
+|---c: New For Each(true,true)[tuple] - scope-28
+    |   |
+    |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-26
+    |   |
+    |   |---Constant(2) - scope-24
+    |   |
+    |   |---Constant(0) - scope-25
+    |   |
+    |   Project[tuple][*] - scope-27
+    |
+    |---POValueInputTez - scope-66	<-	 scope-61
+Tez vertex scope-57
+# Plan on vertex
+a: Split - scope-73
+|   |
+|   POValueOutputTez - scope-70	->	 [scope-69]
+|   |
+|   POValueOutputTez - scope-58	->	 [scope-67]
+|
+|---a: New For Each(false,false)[bag] - scope-18
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][0] - scope-12
+    |   |
+    |   Cast[int] - scope-16
+    |   |
+    |   |---Project[bytearray][1] - scope-15
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-11
+Tez vertex scope-67
+# Plan on vertex
+c: Local Rearrange[tuple]{tuple}(false) - scope-37	->	 scope-69
+|   |
+|   Project[int][0] - scope-38
+|   |
+|   Project[int][1] - scope-39
+|
+|---c: New For Each(true,true)[tuple] - scope-36
+    |   |
+    |   POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-34
+    |   |
+    |   |---Constant(2) - scope-32
+    |   |
+    |   |---Constant(1) - scope-33
+    |   |
+    |   Project[tuple][*] - scope-35
+    |
+    |---POValueInputTez - scope-68	<-	 scope-57
+Tez vertex scope-69
+# Plan on vertex
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-56
+|
+|---d: New For Each(false,false)[bag] - scope-55
+    |   |
+    |   POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-51
+    |   |
+    |   |---Constant(0) - scope-49
+    |   |
+    |   POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-54
+    |   |
+    |   |---Constant(1) - scope-52
+    |
+    |---c: New For Each(true,true)[tuple] - scope-42
+        |   |
+        |   Project[bag][1] - scope-40
+        |   |
+        |   Project[bag][2] - scope-41
+        |
+        |---Package(Packager)[tuple]{tuple} - scope-23

Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld?rev=1727655&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld Fri Jan 29 22:19:35 2016
@@ -0,0 +1,93 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-51	->	Tez vertex scope-53,Tez vertex scope-55,Tez vertex scope-57,
+Tez vertex scope-57	->	Tez vertex scope-59,
+Tez vertex scope-53	->	Tez vertex scope-59,
+Tez vertex scope-55	->	Tez vertex scope-59,
+Tez vertex scope-59
+
+Tez vertex scope-51
+# Plan on vertex
+POValueOutputTez - scope-52	->	 [scope-53, scope-55, scope-57]
+|
+|---a: New For Each(false)[bag] - scope-4
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-57
+# Plan on vertex
+d: Local Rearrange[tuple]{int}(false) - scope-13	->	 scope-59
+|   |
+|   Project[int][0] - scope-14
+|
+|---a: New For Each(false)[bag] - scope-8
+    |   |
+    |   Project[int][0] - scope-6
+    |
+    |---POValueInputTez - scope-58	<-	 scope-51
+Tez vertex scope-53
+# Plan on vertex
+POValueOutputTez - scope-60	->	 [scope-59]
+|
+|---b: New For Each(false)[bag] - scope-26
+    |   |
+    |   Project[int][0] - scope-24
+    |
+    |---b: Filter[bag] - scope-20
+        |   |
+        |   Equal To[boolean] - scope-23
+        |   |
+        |   |---Project[int][0] - scope-21
+        |   |
+        |   |---Constant(5) - scope-22
+        |
+        |---a: New For Each(false)[bag] - scope-17
+            |   |
+            |   Project[int][0] - scope-15
+            |
+            |---POValueInputTez - scope-54	<-	 scope-51
+Tez vertex scope-55
+# Plan on vertex
+POValueOutputTez - scope-61	->	 [scope-59]
+|
+|---c: New For Each(false)[bag] - scope-39
+    |   |
+    |   Project[int][0] - scope-37
+    |
+    |---c: Filter[bag] - scope-33
+        |   |
+        |   Equal To[boolean] - scope-36
+        |   |
+        |   |---Project[int][0] - scope-34
+        |   |
+        |   |---Constant(10) - scope-35
+        |
+        |---a: New For Each(false)[bag] - scope-30
+            |   |
+            |   Project[int][0] - scope-28
+            |
+            |---POValueInputTez - scope-56	<-	 scope-51
+Tez vertex scope-59
+# Plan on vertex
+e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-50
+|
+|---e: New For Each(false,false,false)[bag] - scope-49
+    |   |
+    |   Project[int][0] - scope-41
+    |   |
+    |   POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-45
+    |   |
+    |   |---Constant(0) - scope-43
+    |   |
+    |   POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-48
+    |   |
+    |   |---Constant(0) - scope-46
+    |
+    |---d: Package(Packager)[tuple]{int} - scope-12

Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld?rev=1727655&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld Fri Jan 29 22:19:35 2016
@@ -0,0 +1,91 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-51	->	Tez vertex scope-55,Tez vertex scope-57,Tez vertex scope-59,
+Tez vertex scope-57	->	Tez vertex scope-59,
+Tez vertex scope-55	->	Tez vertex scope-59,
+Tez vertex scope-59
+
+Tez vertex scope-51
+# Plan on vertex
+a: Split - scope-62
+|   |
+|   POValueOutputTez - scope-60	->	 [scope-59]
+|   |
+|   |---b: New For Each(false)[bag] - scope-26
+|       |   |
+|       |   Project[int][0] - scope-24
+|       |
+|       |---b: Filter[bag] - scope-20
+|           |   |
+|           |   Equal To[boolean] - scope-23
+|           |   |
+|           |   |---Project[int][0] - scope-21
+|           |   |
+|           |   |---Constant(5) - scope-22
+|           |
+|           |---a: New For Each(false)[bag] - scope-17
+|               |   |
+|               |   Project[int][0] - scope-15
+|   |
+|   POValueOutputTez - scope-52	->	 [scope-55, scope-57]
+|
+|---a: New For Each(false)[bag] - scope-4
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-57
+# Plan on vertex
+d: Local Rearrange[tuple]{int}(false) - scope-13	->	 scope-59
+|   |
+|   Project[int][0] - scope-14
+|
+|---a: New For Each(false)[bag] - scope-8
+    |   |
+    |   Project[int][0] - scope-6
+    |
+    |---POValueInputTez - scope-58	<-	 scope-51
+Tez vertex scope-55
+# Plan on vertex
+POValueOutputTez - scope-61	->	 [scope-59]
+|
+|---c: New For Each(false)[bag] - scope-39
+    |   |
+    |   Project[int][0] - scope-37
+    |
+    |---c: Filter[bag] - scope-33
+        |   |
+        |   Equal To[boolean] - scope-36
+        |   |
+        |   |---Project[int][0] - scope-34
+        |   |
+        |   |---Constant(10) - scope-35
+        |
+        |---a: New For Each(false)[bag] - scope-30
+            |   |
+            |   Project[int][0] - scope-28
+            |
+            |---POValueInputTez - scope-56	<-	 scope-51
+Tez vertex scope-59
+# Plan on vertex
+e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-50
+|
+|---e: New For Each(false,false,false)[bag] - scope-49
+    |   |
+    |   Project[int][0] - scope-41
+    |   |
+    |   POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-45
+    |   |
+    |   |---Constant(0) - scope-43
+    |   |
+    |   POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-48
+    |   |
+    |   |---Constant(0) - scope-46
+    |
+    |---d: Package(Packager)[tuple]{int} - scope-12

Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld?rev=1727655&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld Fri Jan 29 22:19:35 2016
@@ -0,0 +1,64 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-31	->	Tez vertex scope-33,Tez vertex scope-36,
+Tez vertex scope-35	->	Tez vertex group scope-42,
+Tez vertex scope-36	->	Tez vertex group scope-42,
+Tez vertex group scope-42	->	Tez vertex scope-33,
+Tez vertex scope-33
+
+Tez vertex scope-31
+# Plan on vertex
+POValueOutputTez - scope-32	->	 [scope-33, scope-36]
+|
+|---b: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-35
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-43	->	 scope-33
+|   |
+|   Project[int][0] - scope-44
+|
+|---a: New For Each(false,false)[bag] - scope-18
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][0] - scope-12
+    |   |
+    |   Cast[int] - scope-16
+    |   |
+    |   |---Project[bytearray][1] - scope-15
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-11
+Tez vertex scope-36
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-45	->	 scope-33
+|   |
+|   Project[int][0] - scope-46
+|
+|---POValueInputTez - scope-37	<-	 scope-31
+Tez vertex group scope-42	<-	 [scope-35, scope-36]	->	 scope-33
+# No plan on vertex group
+Tez vertex scope-33
+# Plan on vertex
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-30
+|
+|---d: FRJoin[tuple] - scope-24	<-	 scope-42
+    |   |
+    |   Project[int][0] - scope-22
+    |   |
+    |   Project[int][0] - scope-23
+    |
+    |---POValueInputTez - scope-34	<-	 scope-31

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Fri Jan 29 22:19:35 2016
@@ -161,6 +161,18 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testSelfJoinUnionReplicated() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = union a, b;" +
+                "d = join b by x, c by x using 'replicated';" +
+                "store d into 'file:///tmp/output';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld");
+    }
+
+    @Test
     public void testCross() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -185,6 +197,18 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testCrossScalarSplit() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = cross b, a;" +
+                "d = foreach c generate a.x, b.z;" + //Scalar
+                "store d into 'file:///tmp/output';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld");
+    }
+
+    @Test
     public void testSkewedJoin() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -505,6 +529,25 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testMultiQueryMultipleScalar() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = filter a by x == 5;" +
+                "b = foreach b generate $0 as b1;" +
+                "c = filter a by x == 10;" +
+                "c = foreach c generate $0 as c1;" +
+                "d = group a by x;" +
+                "e = foreach d generate group, b.b1, c.c1;" +
+                "store e into 'file:///tmp/output';";
+
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld");
+    }
+
+    @Test
     public void testUnionStore() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +