You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/01/09 00:39:34 UTC
svn commit: r1556673 - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/tez/ test/e2e/pig/tests/
test/org/apache/pig/test/data/GoldenFiles/ test/org/apache/pig/tez/
Author: cheolsoo
Date: Wed Jan 8 23:39:33 2014
New Revision: 1556673
URL: http://svn.apache.org/r1556673
Log:
PIG-3562: Implement combiner optimizations for DISTINCT (abain via cheolsoo)
Added:
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/test/e2e/pig/tests/tez.conf
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1556673&r1=1556672&r2=1556673&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Wed Jan 8 23:39:33 2014
@@ -636,25 +636,21 @@ public class TezCompiler extends PhyPlan
lr.setDistinct(true);
curTezOp.plan.addAsLeaf(lr);
curTezOp.customPartitioner = op.getCustomPartitioner();
+ TezOperator lastOp = curTezOp;
// Mark the start of a new TezOperator, connecting the inputs.
- // TODO add distinct combiner as an optimization when supported by Tez
blocking();
- POPackage pkg = getPackage(1, DataType.TUPLE);
- pkg.getPkgr().setDistinct(true);
- curTezOp.plan.add(pkg);
+ // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented
+ // with a global variable and a specific DistinctCombiner class. This seems better.
+ PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan;
+ addDistinctPlan(combinePlan, 1);
+
+ POLocalRearrangeTez clr = localRearrangeFactory.create();
+ clr.setDistinct(true);
+ combinePlan.addAsLeaf(clr);
- POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
- project.setResultType(DataType.TUPLE);
- project.setStar(false);
- project.setColumn(0);
- project.setOverloaded(false);
-
- // Note that the PODistinct is not actually added to any Tez vertex, but rather is
- // implemented by the action of the local rearrange, shuffle and project operations.
- POForEach forEach = getForEach(project, op.getRequestedParallelism());
- curTezOp.plan.addAsLeaf(forEach);
+ addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
phyToTezOpMap.put(op, curTezOp);
} catch (Exception e) {
int errCode = 2034;
@@ -663,6 +659,23 @@ public class TezCompiler extends PhyPlan
}
}
+ // Adds the plan for DISTINCT. Note that the PODistinct is not actually added to the plan, but
+ // rather is implemented by the action of the local rearrange, shuffle and project operations.
+ private void addDistinctPlan(PhysicalPlan plan, int rp) throws PlanException {
+ POPackage pkg = getPackage(1, DataType.TUPLE);
+ pkg.getPkgr().setDistinct(true);
+ plan.addAsLeaf(pkg);
+
+ POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ project.setResultType(DataType.TUPLE);
+ project.setStar(false);
+ project.setColumn(0);
+ project.setOverloaded(false);
+
+ POForEach forEach = getForEach(project, rp);
+ plan.addAsLeaf(forEach);
+ }
+
@Override
public void visitFilter(POFilter op) throws VisitorException {
try {
Modified: pig/branches/tez/test/e2e/pig/tests/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/tez.conf?rev=1556673&r1=1556672&r2=1556673&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/tez.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/tez.conf Wed Jan 8 23:39:33 2014
@@ -83,6 +83,7 @@ b = limit a 100;
store b into ':OUTPATH:';\,
},
{
+ # Distinct
'num' => 2,
'pig' => q\set pig.tez.session.reuse false;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
@@ -92,17 +93,29 @@ d = filter c by age > 30;
store d into ':OUTPATH:';\,
},
{
- # Order by simple
+ # Distinct with algebraic udf combiner optimization
'num' => 3,
'pig' => q\set pig.tez.session.reuse false;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+b = group a by age;
+c = foreach b {
+ x = distinct a;
+ generate COUNT(x);
+};
+store c into ':OUTPATH:';\,
+ },
+ {
+ # Order by simple
+ 'num' => 4,
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = order a by age parallel 2;
store b into ':OUTPATH:';\,
'sortArgs' => ['-t', ' ', '-k', '2,2'],
},
{
# Order by simple no schema
- 'num' => 4,
+ 'num' => 5,
'pig' => q\set pig.tez.session.reuse false;
a = load ':INPATH:/singlefile/studenttab10k';
b = order a by $1;
@@ -111,7 +124,7 @@ store b into ':OUTPATH:';\,
},
{
# Order by after group
- 'num' => 5,
+ 'num' => 6,
'pig' => q\set pig.tez.session.reuse false;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = group a by name;
@@ -121,14 +134,14 @@ store d into ':OUTPATH:';\,
'sortArgs' => ['-t', ' ', '-k', '2,2'],
},
{
- 'num' => 6,
+ 'num' => 7,
'pig' => q\set pig.tez.session.reuse false;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = sample a 1;
store b into ':OUTPATH:';\,
},
{
- 'num' => 7,
+ 'num' => 8,
'pig' => q\set pig.tez.session.reuse false;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
@@ -137,7 +150,7 @@ store c into ':OUTPATH:';\,
},
{
# Descending order by string
- 'num' => 8,
+ 'num' => 9,
'pig' => q\set pig.tez.session.reuse false;
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = order a by name desc parallel 2;
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld?rev=1556673&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld Wed Jan 8 23:39:33 2014
@@ -0,0 +1,63 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-39
+#--------------------------------------------------
+Tez vertex scope-18 -> Tez vertex scope-19,
+Tez vertex scope-19
+
+Tez vertex scope-18
+# Plan on vertex
+b: Local Rearrange[tuple]{int}(false) - scope-32 -> scope-19
+| |
+| Project[int][0] - scope-34
+|
+|---c: New For Each(false,false)[bag] - scope-21
+ | |
+ | Project[int][0] - scope-22
+ | |
+ | POUserFunc(org.apache.pig.builtin.Distinct$Initial)[tuple] - scope-23
+ | |
+ | |---Project[tuple][1] - scope-24
+ |
+ |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-35
+ |
+ |---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
+# Combine plan on edge <scope-18>
+b: Local Rearrange[tuple]{int}(false) - scope-36 -> scope-19
+| |
+| Project[int][0] - scope-38
+|
+|---c: New For Each(false,false)[bag] - scope-25
+ | |
+ | Project[int][0] - scope-26
+ | |
+ | POUserFunc(org.apache.pig.builtin.Distinct$Intermediate)[tuple] - scope-27
+ | |
+ | |---Project[bag][1] - scope-28
+ |
+ |---b: Package(CombinerPackager)[tuple]{int} - scope-31
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-17
+|
+|---c: New For Each(false)[bag] - scope-16
+ | |
+ | POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-13
+ | |
+ | |---POUserFunc(org.apache.pig.builtin.Distinct$Final)[bag] - scope-20
+ | |
+ | |---Project[bag][1] - scope-29
+ |
+ |---b: Package(CombinerPackager)[tuple]{int} - scope-9
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld?rev=1556673&r1=1556672&r2=1556673&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld Wed Jan 8 23:39:33 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-20
+# TEZ DAG plan: scope-25
#--------------------------------------------------
Tez vertex scope-13 -> Tez vertex scope-16,
Tez vertex scope-16
@@ -25,6 +25,16 @@ Local Rearrange[tuple]{tuple}(true) - sc
|
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-16
+# Combine plan on edge <scope-13>
+Local Rearrange[tuple]{tuple}(true) - scope-21 -> scope-16
+| |
+| Project[tuple][*] - scope-20
+|
+|---New For Each(true)[bag] - scope-19
+ | |
+ | Project[tuple][0] - scope-18
+ |
+ |---Package(Packager)[tuple]{tuple} - scope-17
# Plan on vertex
c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-12
|
@@ -32,8 +42,8 @@ c: Store(file:///tmp/output:org.apache.p
| |
| Project[int][1] - scope-9
|
- |---New For Each(true)[bag] - scope-19
+ |---New For Each(true)[bag] - scope-24
| |
- | Project[tuple][0] - scope-18
+ | Project[tuple][0] - scope-23
|
- |---Package(Packager)[tuple]{tuple} - scope-17
\ No newline at end of file
+ |---Package(Packager)[tuple]{tuple} - scope-22
\ No newline at end of file
Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1556673&r1=1556672&r2=1556673&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Wed Jan 8 23:39:33 2014
@@ -132,6 +132,18 @@ public class TestTezCompiler {
}
@Test
+ public void testDistinctAlgebraicUdfCombiner() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input' as (x:int, y:int);" +
+ "b = group a by x;" +
+ "c = foreach b { d = distinct a; generate COUNT(d); };" +
+ "store c into 'file:///tmp/output';";
+
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+ run(pp, "test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld");
+ }
+
+ @Test
public void testSplitSingleVertex() throws Exception {
String query =
"a = load 'file:///tmp/input' as (x:int, y:int);" +