You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/01/29 23:19:35 UTC
svn commit: r1727655 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/
src/org/apache/pig/backend/hadoop/executionengine/tez/util/
src/org/apache/pig/impl/builtin/ test/e2e/pig/tests/
test/org/apache/pig/test/dat...
Author: rohini
Date: Fri Jan 29 22:19:35 2016
New Revision: 1727655
URL: http://svn.apache.org/viewvc?rev=1727655&view=rev
Log:
PIG-4690: Union with self replicate join will fail in Tez (rohini)
Added:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java
pig/trunk/test/e2e/pig/tests/multiquery.conf
pig/trunk/test/e2e/pig/tests/nightly.conf
pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jan 29 22:19:35 2016
@@ -85,6 +85,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4690: Union with self replicate join will fail in Tez (rohini)
+
PIG-4791: PORelationToExprProject filters records instead of returning emptybag in nested foreach after union (rohini)
PIG-4779: testBZ2Concatenation[pig.bzip.use.hadoop.inputformat = true] failing due to successful read (knoguchi)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Jan 29 22:19:35 2016
@@ -69,6 +69,7 @@ public class MultiQueryOptimizerTez exte
}
List<TezOperator> splittees = new ArrayList<TezOperator>();
+ Set<TezOperator> mergedNonPackageInputSuccessors = new HashSet<TezOperator>();
List<TezOperator> successors = getPlan().getSuccessors(tezOp);
for (TezOperator successor : successors) {
@@ -117,12 +118,18 @@ public class MultiQueryOptimizerTez exte
// since Tez does not handle double edge between vertexes
// Successor could be
// - union operator (if no union optimizer changing it to vertex group which supports multiple edges)
- // - self replicate join
- // - self skewed join
- // Self hash joins can write to same output edge and is handled by POShuffleTezLoad
- // TODO: PIG-3876 to handle this by writing to same edge
+ // - self replicate join, self skewed join or scalar
+ // - POPackage (Self hash joins can write to same output edge and is handled by POShuffleTezLoad)
Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>();
+ // These successors should not be merged due to diamond shape
+ Set<TezOperator> toNotMergeSuccessors = new HashSet<TezOperator>();
+ // These successors can be merged
Set<TezOperator> toMergeSuccessors = new HashSet<TezOperator>();
+ // These successors (Scalar, POFRJoinTez) can be merged if they are the only input.
+ // Only in case of POPackage(POShuffleTezLoad) multiple inputs can be handled from a Split
+ Set<TezOperator> nonPackageInputSuccessors = new HashSet<TezOperator>();
+ boolean canMerge = true;
+
mergedSuccessors.addAll(successors);
for (TezOperator splittee : splittees) {
if (getPlan().getSuccessors(splittee) != null) {
@@ -136,18 +143,52 @@ public class MultiQueryOptimizerTez exte
UnionOptimizer.isOptimizable(succSuccessor,
unionSupportedStoreFuncs,
unionUnsupportedStoreFuncs))) {
+ toNotMergeSuccessors.add(succSuccessor);
+ } else {
toMergeSuccessors.add(succSuccessor);
+ List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor);
+ if (unionSuccessors != null) {
+ for (TezOperator unionSuccessor : unionSuccessors) {
+ if (TezCompilerUtil.isNonPackageInput(succSuccessor.getOperatorKey().toString(), unionSuccessor)) {
+ canMerge = canMerge ? nonPackageInputSuccessors.add(unionSuccessor) : false;
+ } else {
+ toMergeSuccessors.add(unionSuccessor);
+ }
+ }
+ }
}
- } else if (successors.contains(succSuccessor)) {
- // Self replicate/skewed join
- toMergeSuccessors.add(succSuccessor);
+ } else if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) {
+ // Output goes to scalar or POFRJoinTez instead of POPackage
+ // POPackage/POShuffleTezLoad can handle multiple inputs from a Split.
+ // But if input is sent to any other operator like
+ // scalar, POFRJoinTez then we need to ensure it is the only one.
+ canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false;
+ } else {
+ toMergeSuccessors.add(succSuccessor);
}
}
}
- mergedSuccessors.retainAll(toMergeSuccessors);
+ if (canMerge) {
+ if (!nonPackageInputSuccessors.isEmpty() || !mergedNonPackageInputSuccessors.isEmpty()) {
+ // If a non-POPackage input successor is already merged or
+ // if there is a POPackage and non-POPackage to be merged,
+ // then skip as it will become diamond shape
+ // For eg: POFRJoinTez+Scalar, POFRJoinTez/Scalar+POPackage
+ if (nonPackageInputSuccessors.removeAll(mergedSuccessors)
+ || toMergeSuccessors.removeAll(mergedNonPackageInputSuccessors)
+ || toMergeSuccessors.removeAll(nonPackageInputSuccessors)) {
+ continue;
+ }
+ }
+ } else {
+ continue;
+ }
+
+ mergedSuccessors.retainAll(toNotMergeSuccessors);
if (mergedSuccessors.isEmpty()) { // no shared edge after merge
splittees.add(successor);
+ mergedNonPackageInputSuccessors.addAll(nonPackageInputSuccessors);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Jan 29 22:19:35 2016
@@ -36,14 +36,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
@@ -147,6 +145,12 @@ public class TezOperDependencyParallelis
roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
}
+ if (roundedEstimatedParallelism == 0) {
+ throw new IOException("Estimated parallelism for "
+ + tezOper.getOperatorKey().toString()
+ + " is 0 which is unexpected");
+ }
+
return roundedEstimatedParallelism;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Fri Jan 29 22:19:35 2016
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang.ArrayUtils;
@@ -193,6 +192,29 @@ public class TezCompilerUtil {
} catch (VisitorException e) {
throw new PlanException(e);
}
+ }
+
+ public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException {
+ try {
+ List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
+ for (TezInput input : inputs) {
+ if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
+ return true;
+ }
+ }
+ List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
+ for (POUserFunc userFunc : userFuncs) {
+ if (userFunc.getFunc() instanceof ReadScalarsTez) {
+ TezInput input = (TezInput)userFunc.getFunc();
+ if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (VisitorException e) {
+ throw new PlanException(e);
+ }
}
static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) {
Modified: pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java Fri Jan 29 22:19:35 2016
@@ -18,8 +18,11 @@
package org.apache.pig.impl.builtin;
import java.io.IOException;
+import java.text.MessageFormat;
import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
@@ -32,6 +35,9 @@ import org.apache.pig.impl.util.UDFConte
public class GFCross extends EvalFunc<DataBag> {
+
+ private static final Log LOG = LogFactory.getLog(GFCross.class);
+
private int numInputs, myNumber, numGroupsPerInput, numGroupsGoingTo;
private BagFactory mBagFactory = BagFactory.getInstance();
private TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -70,6 +76,12 @@ public class GFCross extends EvalFunc<Da
numGroupsPerInput = (int) Math.ceil(Math.pow(parallelism, 1.0/numInputs));
numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
+
+ LOG.info(MessageFormat.format("Parallelism = {0}, numInputs = {1}, myNumber = {2},"
+ + " numGroupsPerInput = {3}, numGroupsGoingTo = {4}",
+ parallelism, numInputs, myNumber,
+ numGroupsPerInput, numGroupsGoingTo));
+
}
DataBag output = mBagFactory.newDefaultBag();
Modified: pig/trunk/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/multiquery.conf?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/trunk/test/e2e/pig/tests/multiquery.conf Fri Jan 29 22:19:35 2016
@@ -811,6 +811,16 @@ b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = join c by name full outer, b by name PARALLEL 2;
store d into ':OUTPATH:';\,
+ },
+ {
+ # Self join union replicated
+ 'num' => 9,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+a1 = filter a by gpa == 0.00;
+a2 = filter a by gpa == 4.00;
+b = union a1, a2;
+c = JOIN a by name, b by name using 'replicated';
+store c into ':OUTPATH:';\,
}
] # end of tests
},
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Fri Jan 29 22:19:35 2016
@@ -3654,6 +3654,18 @@ f = foreach e generate AVG(d.age) as avg
y = foreach a generate age/c.avg, age/f.avg;
store y into ':OUTPATH:';\,
},
+ {
+ # test scalar with split
+ 'num' => 5,
+ 'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = group a all;
+c = foreach b generate AVG(a.age) as avg, COUNT(a.age) as cnt;
+d = foreach c generate avg;
+e = group d by $0;
+f = foreach e generate group, c.avg, c.cnt;
+store f into ':OUTPATH:';\,
+ },
]
},
{
Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld?rev=1727655&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld Fri Jan 29 22:19:35 2016
@@ -0,0 +1,109 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-61 -> Tez vertex scope-65,Tez vertex scope-69,
+Tez vertex scope-65 -> Tez vertex scope-69,
+Tez vertex scope-57 -> Tez vertex scope-67,Tez vertex scope-69,
+Tez vertex scope-67 -> Tez vertex scope-69,
+Tez vertex scope-69
+
+Tez vertex scope-61
+# Plan on vertex
+b: Split - scope-72
+| |
+| POValueOutputTez - scope-71 -> [scope-69]
+| |
+| POValueOutputTez - scope-62 -> [scope-65]
+|
+|---b: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-65
+# Plan on vertex
+c: Local Rearrange[tuple]{tuple}(false) - scope-29 -> scope-69
+| |
+| Project[int][0] - scope-30
+| |
+| Project[int][1] - scope-31
+|
+|---c: New For Each(true,true)[tuple] - scope-28
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-26
+ | |
+ | |---Constant(2) - scope-24
+ | |
+ | |---Constant(0) - scope-25
+ | |
+ | Project[tuple][*] - scope-27
+ |
+ |---POValueInputTez - scope-66 <- scope-61
+Tez vertex scope-57
+# Plan on vertex
+a: Split - scope-73
+| |
+| POValueOutputTez - scope-70 -> [scope-69]
+| |
+| POValueOutputTez - scope-58 -> [scope-67]
+|
+|---a: New For Each(false,false)[bag] - scope-18
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][0] - scope-12
+ | |
+ | Cast[int] - scope-16
+ | |
+ | |---Project[bytearray][1] - scope-15
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-11
+Tez vertex scope-67
+# Plan on vertex
+c: Local Rearrange[tuple]{tuple}(false) - scope-37 -> scope-69
+| |
+| Project[int][0] - scope-38
+| |
+| Project[int][1] - scope-39
+|
+|---c: New For Each(true,true)[tuple] - scope-36
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.GFCross)[bag] - scope-34
+ | |
+ | |---Constant(2) - scope-32
+ | |
+ | |---Constant(1) - scope-33
+ | |
+ | Project[tuple][*] - scope-35
+ |
+ |---POValueInputTez - scope-68 <- scope-57
+Tez vertex scope-69
+# Plan on vertex
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-56
+|
+|---d: New For Each(false,false)[bag] - scope-55
+ | |
+ | POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-51
+ | |
+ | |---Constant(0) - scope-49
+ | |
+ | POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-54
+ | |
+ | |---Constant(1) - scope-52
+ |
+ |---c: New For Each(true,true)[tuple] - scope-42
+ | |
+ | Project[bag][1] - scope-40
+ | |
+ | Project[bag][2] - scope-41
+ |
+ |---Package(Packager)[tuple]{tuple} - scope-23
Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld?rev=1727655&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld Fri Jan 29 22:19:35 2016
@@ -0,0 +1,93 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-51 -> Tez vertex scope-53,Tez vertex scope-55,Tez vertex scope-57,
+Tez vertex scope-57 -> Tez vertex scope-59,
+Tez vertex scope-53 -> Tez vertex scope-59,
+Tez vertex scope-55 -> Tez vertex scope-59,
+Tez vertex scope-59
+
+Tez vertex scope-51
+# Plan on vertex
+POValueOutputTez - scope-52 -> [scope-53, scope-55, scope-57]
+|
+|---a: New For Each(false)[bag] - scope-4
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-57
+# Plan on vertex
+d: Local Rearrange[tuple]{int}(false) - scope-13 -> scope-59
+| |
+| Project[int][0] - scope-14
+|
+|---a: New For Each(false)[bag] - scope-8
+ | |
+ | Project[int][0] - scope-6
+ |
+ |---POValueInputTez - scope-58 <- scope-51
+Tez vertex scope-53
+# Plan on vertex
+POValueOutputTez - scope-60 -> [scope-59]
+|
+|---b: New For Each(false)[bag] - scope-26
+ | |
+ | Project[int][0] - scope-24
+ |
+ |---b: Filter[bag] - scope-20
+ | |
+ | Equal To[boolean] - scope-23
+ | |
+ | |---Project[int][0] - scope-21
+ | |
+ | |---Constant(5) - scope-22
+ |
+ |---a: New For Each(false)[bag] - scope-17
+ | |
+ | Project[int][0] - scope-15
+ |
+ |---POValueInputTez - scope-54 <- scope-51
+Tez vertex scope-55
+# Plan on vertex
+POValueOutputTez - scope-61 -> [scope-59]
+|
+|---c: New For Each(false)[bag] - scope-39
+ | |
+ | Project[int][0] - scope-37
+ |
+ |---c: Filter[bag] - scope-33
+ | |
+ | Equal To[boolean] - scope-36
+ | |
+ | |---Project[int][0] - scope-34
+ | |
+ | |---Constant(10) - scope-35
+ |
+ |---a: New For Each(false)[bag] - scope-30
+ | |
+ | Project[int][0] - scope-28
+ |
+ |---POValueInputTez - scope-56 <- scope-51
+Tez vertex scope-59
+# Plan on vertex
+e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-50
+|
+|---e: New For Each(false,false,false)[bag] - scope-49
+ | |
+ | Project[int][0] - scope-41
+ | |
+ | POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-45
+ | |
+ | |---Constant(0) - scope-43
+ | |
+ | POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-48
+ | |
+ | |---Constant(0) - scope-46
+ |
+ |---d: Package(Packager)[tuple]{int} - scope-12
Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld?rev=1727655&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld Fri Jan 29 22:19:35 2016
@@ -0,0 +1,91 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-51 -> Tez vertex scope-55,Tez vertex scope-57,Tez vertex scope-59,
+Tez vertex scope-57 -> Tez vertex scope-59,
+Tez vertex scope-55 -> Tez vertex scope-59,
+Tez vertex scope-59
+
+Tez vertex scope-51
+# Plan on vertex
+a: Split - scope-62
+| |
+| POValueOutputTez - scope-60 -> [scope-59]
+| |
+| |---b: New For Each(false)[bag] - scope-26
+| | |
+| | Project[int][0] - scope-24
+| |
+| |---b: Filter[bag] - scope-20
+| | |
+| | Equal To[boolean] - scope-23
+| | |
+| | |---Project[int][0] - scope-21
+| | |
+| | |---Constant(5) - scope-22
+| |
+| |---a: New For Each(false)[bag] - scope-17
+| | |
+| | Project[int][0] - scope-15
+| |
+| POValueOutputTez - scope-52 -> [scope-55, scope-57]
+|
+|---a: New For Each(false)[bag] - scope-4
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-57
+# Plan on vertex
+d: Local Rearrange[tuple]{int}(false) - scope-13 -> scope-59
+| |
+| Project[int][0] - scope-14
+|
+|---a: New For Each(false)[bag] - scope-8
+ | |
+ | Project[int][0] - scope-6
+ |
+ |---POValueInputTez - scope-58 <- scope-51
+Tez vertex scope-55
+# Plan on vertex
+POValueOutputTez - scope-61 -> [scope-59]
+|
+|---c: New For Each(false)[bag] - scope-39
+ | |
+ | Project[int][0] - scope-37
+ |
+ |---c: Filter[bag] - scope-33
+ | |
+ | Equal To[boolean] - scope-36
+ | |
+ | |---Project[int][0] - scope-34
+ | |
+ | |---Constant(10) - scope-35
+ |
+ |---a: New For Each(false)[bag] - scope-30
+ | |
+ | Project[int][0] - scope-28
+ |
+ |---POValueInputTez - scope-56 <- scope-51
+Tez vertex scope-59
+# Plan on vertex
+e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-50
+|
+|---e: New For Each(false,false,false)[bag] - scope-49
+ | |
+ | Project[int][0] - scope-41
+ | |
+ | POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-45
+ | |
+ | |---Constant(0) - scope-43
+ | |
+ | POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez)[int] - scope-48
+ | |
+ | |---Constant(0) - scope-46
+ |
+ |---d: Package(Packager)[tuple]{int} - scope-12
Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld?rev=1727655&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld (added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld Fri Jan 29 22:19:35 2016
@@ -0,0 +1,64 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-31 -> Tez vertex scope-33,Tez vertex scope-36,
+Tez vertex scope-35 -> Tez vertex group scope-42,
+Tez vertex scope-36 -> Tez vertex group scope-42,
+Tez vertex group scope-42 -> Tez vertex scope-33,
+Tez vertex scope-33
+
+Tez vertex scope-31
+# Plan on vertex
+POValueOutputTez - scope-32 -> [scope-33, scope-36]
+|
+|---b: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-35
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-43 -> scope-33
+| |
+| Project[int][0] - scope-44
+|
+|---a: New For Each(false,false)[bag] - scope-18
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][0] - scope-12
+ | |
+ | Cast[int] - scope-16
+ | |
+ | |---Project[bytearray][1] - scope-15
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-11
+Tez vertex scope-36
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-45 -> scope-33
+| |
+| Project[int][0] - scope-46
+|
+|---POValueInputTez - scope-37 <- scope-31
+Tez vertex group scope-42 <- [scope-35, scope-36] -> scope-33
+# No plan on vertex group
+Tez vertex scope-33
+# Plan on vertex
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-30
+|
+|---d: FRJoin[tuple] - scope-24 <- scope-42
+ | |
+ | Project[int][0] - scope-22
+ | |
+ | Project[int][0] - scope-23
+ |
+ |---POValueInputTez - scope-34 <- scope-31
Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1727655&r1=1727654&r2=1727655&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Fri Jan 29 22:19:35 2016
@@ -161,6 +161,18 @@ public class TestTezCompiler {
}
@Test
+ public void testSelfJoinUnionReplicated() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "c = union a, b;" +
+ "d = join b by x, c by x using 'replicated';" +
+ "store d into 'file:///tmp/output';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld");
+ }
+
+ @Test
public void testCross() throws Exception {
String query =
"a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -185,6 +197,18 @@ public class TestTezCompiler {
}
@Test
+ public void testCrossScalarSplit() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "c = cross b, a;" +
+ "d = foreach c generate a.x, b.z;" + //Scalar
+ "store d into 'file:///tmp/output';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld");
+ }
+
+ @Test
public void testSkewedJoin() throws Exception {
String query =
"a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -505,6 +529,25 @@ public class TestTezCompiler {
}
@Test
+ public void testMultiQueryMultipleScalar() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = filter a by x == 5;" +
+ "b = foreach b generate $0 as b1;" +
+ "c = filter a by x == 10;" +
+ "c = foreach c generate $0 as c1;" +
+ "d = group a by x;" +
+ "e = foreach d generate group, b.b1, c.c1;" +
+ "store e into 'file:///tmp/output';";
+
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld");
+ }
+
+ @Test
public void testUnionStore() throws Exception {
String query =
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +