You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/01/09 00:39:34 UTC

svn commit: r1556673 - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/tez/ test/e2e/pig/tests/ test/org/apache/pig/test/data/GoldenFiles/ test/org/apache/pig/tez/

Author: cheolsoo
Date: Wed Jan  8 23:39:33 2014
New Revision: 1556673

URL: http://svn.apache.org/r1556673
Log:
PIG-3562: Implement combiner optimizations for DISTINCT (abain via cheolsoo)

Added:
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld
Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/test/e2e/pig/tests/tez.conf
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1556673&r1=1556672&r2=1556673&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Wed Jan  8 23:39:33 2014
@@ -636,25 +636,21 @@ public class TezCompiler extends PhyPlan
             lr.setDistinct(true);
             curTezOp.plan.addAsLeaf(lr);
             curTezOp.customPartitioner = op.getCustomPartitioner();
+            TezOperator lastOp = curTezOp;
 
             // Mark the start of a new TezOperator, connecting the inputs.
-            // TODO add distinct combiner as an optimization when supported by Tez
             blocking();
 
-            POPackage pkg = getPackage(1, DataType.TUPLE);
-            pkg.getPkgr().setDistinct(true);
-            curTezOp.plan.add(pkg);
+            // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented
+            // with a global variable and a specific DistinctCombiner class. This seems better.
+            PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan;
+            addDistinctPlan(combinePlan, 1);
+
+            POLocalRearrangeTez clr = localRearrangeFactory.create();
+            clr.setDistinct(true);
+            combinePlan.addAsLeaf(clr);
 
-            POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
-            project.setResultType(DataType.TUPLE);
-            project.setStar(false);
-            project.setColumn(0);
-            project.setOverloaded(false);
-
-            // Note that the PODistinct is not actually added to any Tez vertex, but rather is
-            // implemented by the action of the local rearrange, shuffle and project operations.
-            POForEach forEach = getForEach(project, op.getRequestedParallelism());
-            curTezOp.plan.addAsLeaf(forEach);
+            addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
             phyToTezOpMap.put(op, curTezOp);
         } catch (Exception e) {
             int errCode = 2034;
@@ -663,6 +659,23 @@ public class TezCompiler extends PhyPlan
         }
     }
 
+    // Adds the plan for DISTINCT. Note that the PODistinct is not actually added to the plan, but
+    // rather is implemented by the action of the local rearrange, shuffle and project operations.
+    private void addDistinctPlan(PhysicalPlan plan, int rp) throws PlanException {
+        POPackage pkg = getPackage(1, DataType.TUPLE);
+        pkg.getPkgr().setDistinct(true);
+        plan.addAsLeaf(pkg);
+
+        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        project.setResultType(DataType.TUPLE);
+        project.setStar(false);
+        project.setColumn(0);
+        project.setOverloaded(false);
+
+        POForEach forEach = getForEach(project, rp);
+        plan.addAsLeaf(forEach);
+    }
+
     @Override
     public void visitFilter(POFilter op) throws VisitorException {
         try {

Modified: pig/branches/tez/test/e2e/pig/tests/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/tez.conf?rev=1556673&r1=1556672&r2=1556673&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/tez.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/tez.conf Wed Jan  8 23:39:33 2014
@@ -83,6 +83,7 @@ b = limit a 100;
 store b into ':OUTPATH:';\,
                         },
                         {
+                        # Distinct
                         'num' => 2,
                         'pig' => q\set pig.tez.session.reuse false;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
@@ -92,17 +93,29 @@ d = filter c by age > 30;
 store d into ':OUTPATH:';\,
                         },
                         {
-                        # Order by simple
+                        # Distinct with algebraic udf combiner optimization
                         'num' => 3,
                         'pig' => q\set pig.tez.session.reuse false;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+b = group a by age;
+c = foreach b {
+      x = distinct a;
+      generate COUNT(x);
+};
+store c into ':OUTPATH:';\,
+                        },
+                        {
+                        # Order by simple
+                        'num' => 4,
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = order a by age parallel 2;
 store b into ':OUTPATH:';\,
                         'sortArgs' => ['-t', '	', '-k', '2,2'],
                         },
                         {
                         # Order by simple no schema
-                        'num' => 4,
+                        'num' => 5,
                         'pig' => q\set pig.tez.session.reuse false;
 a = load ':INPATH:/singlefile/studenttab10k';
 b = order a by $1;
@@ -111,7 +124,7 @@ store b into ':OUTPATH:';\,
                         },
                         {
                         # Order by after group
-                        'num' => 5,
+                        'num' => 6,
                         'pig' => q\set pig.tez.session.reuse false;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = group a by name;
@@ -121,14 +134,14 @@ store d into ':OUTPATH:';\,
                         'sortArgs' => ['-t', '	', '-k', '2,2'],
                         },
                         {
-                        'num' => 6,
+                        'num' => 7,
                         'pig' => q\set pig.tez.session.reuse false;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = sample a 1;
 store b into ':OUTPATH:';\,
                         },
                         {
-                        'num' => 7,
+                        'num' => 8,
                         'pig' => q\set pig.tez.session.reuse false;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
@@ -137,7 +150,7 @@ store c into ':OUTPATH:';\,
                         },
                         {
                         # Descending order by string
-                        'num' => 8,
+                        'num' => 9,
                         'pig' => q\set pig.tez.session.reuse false;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = order a by name desc parallel 2;

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld?rev=1556673&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld Wed Jan  8 23:39:33 2014
@@ -0,0 +1,63 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-39
+#--------------------------------------------------
+Tez vertex scope-18	->	Tez vertex scope-19,
+Tez vertex scope-19
+
+Tez vertex scope-18
+# Plan on vertex
+b: Local Rearrange[tuple]{int}(false) - scope-32	->	 scope-19
+|   |
+|   Project[int][0] - scope-34
+|
+|---c: New For Each(false,false)[bag] - scope-21
+    |   |
+    |   Project[int][0] - scope-22
+    |   |
+    |   POUserFunc(org.apache.pig.builtin.Distinct$Initial)[tuple] - scope-23
+    |   |
+    |   |---Project[tuple][1] - scope-24
+    |
+    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-35
+        |
+        |---a: New For Each(false,false)[bag] - scope-7
+            |   |
+            |   Cast[int] - scope-2
+            |   |
+            |   |---Project[bytearray][0] - scope-1
+            |   |
+            |   Cast[int] - scope-5
+            |   |
+            |   |---Project[bytearray][1] - scope-4
+            |
+            |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
+# Combine plan on edge <scope-18>
+b: Local Rearrange[tuple]{int}(false) - scope-36	->	 scope-19
+|   |
+|   Project[int][0] - scope-38
+|
+|---c: New For Each(false,false)[bag] - scope-25
+    |   |
+    |   Project[int][0] - scope-26
+    |   |
+    |   POUserFunc(org.apache.pig.builtin.Distinct$Intermediate)[tuple] - scope-27
+    |   |
+    |   |---Project[bag][1] - scope-28
+    |
+    |---b: Package(CombinerPackager)[tuple]{int} - scope-31
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-17
+|
+|---c: New For Each(false)[bag] - scope-16
+    |   |
+    |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-13
+    |   |
+    |   |---POUserFunc(org.apache.pig.builtin.Distinct$Final)[bag] - scope-20
+    |       |
+    |       |---Project[bag][1] - scope-29
+    |
+    |---b: Package(CombinerPackager)[tuple]{int} - scope-9

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld?rev=1556673&r1=1556672&r2=1556673&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld Wed Jan  8 23:39:33 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-20
+# TEZ DAG plan: scope-25
 #--------------------------------------------------
 Tez vertex scope-13	->	Tez vertex scope-16,
 Tez vertex scope-16
@@ -25,6 +25,16 @@ Local Rearrange[tuple]{tuple}(true) - sc
     |
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-16
+# Combine plan on edge <scope-13>
+Local Rearrange[tuple]{tuple}(true) - scope-21	->	 scope-16
+|   |
+|   Project[tuple][*] - scope-20
+|
+|---New For Each(true)[bag] - scope-19
+    |   |
+    |   Project[tuple][0] - scope-18
+    |
+    |---Package(Packager)[tuple]{tuple} - scope-17
 # Plan on vertex
 c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-12
 |
@@ -32,8 +42,8 @@ c: Store(file:///tmp/output:org.apache.p
     |   |
     |   Project[int][1] - scope-9
     |
-    |---New For Each(true)[bag] - scope-19
+    |---New For Each(true)[bag] - scope-24
         |   |
-        |   Project[tuple][0] - scope-18
+        |   Project[tuple][0] - scope-23
         |
-        |---Package(Packager)[tuple]{tuple} - scope-17
\ No newline at end of file
+        |---Package(Packager)[tuple]{tuple} - scope-22
\ No newline at end of file

Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1556673&r1=1556672&r2=1556673&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Wed Jan  8 23:39:33 2014
@@ -132,6 +132,18 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testDistinctAlgebraicUdfCombiner() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "b = group a by x;" +
+                "c = foreach b { d = distinct a; generate COUNT(d); };" +
+                "store c into 'file:///tmp/output';";
+
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        run(pp, "test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld");
+    }
+
+    @Test
     public void testSplitSingleVertex() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +