You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/05 19:51:03 UTC

svn commit: r1539099 - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java test/e2e/pig/tests/tez.conf test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld test/org/apache/pig/tez/TestTezCompiler.java

Author: cheolsoo
Date: Tue Nov  5 18:51:03 2013
New Revision: 1539099

URL: http://svn.apache.org/r1539099
Log:
PIG-3536: Make distinct work (abain via cheolsoo)

Added:
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/test/e2e/pig/tests/tez.conf
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1539099&r1=1539098&r2=1539099&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Nov  5 18:51:03 2013
@@ -471,9 +471,36 @@ public class TezCompiler extends PhyPlan
 
     @Override
     public void visitDistinct(PODistinct op) throws VisitorException {
-        int errCode = 2034;
-        String msg = "Cannot compile " + op.getClass().getSimpleName();
-        throw new TezCompilerException(msg, errCode, PigException.BUG);
+        try {
+            POLocalRearrange lr = getLocalRearrange();
+            lr.setDistinct(true);
+            curTezOp.plan.addAsLeaf(lr);
+            curTezOp.customPartitioner = op.getCustomPartitioner();
+
+            // Mark the start of a new TezOperator, connecting the inputs.
+            // TODO add distinct combiner as an optimization when supported by Tez
+            blocking();
+
+            POPackage pkg = getPackage();
+            pkg.setDistinct(true);
+            curTezOp.plan.add(pkg);
+
+            POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+            project.setResultType(DataType.TUPLE);
+            project.setStar(false);
+            project.setColumn(0);
+            project.setOverloaded(false);
+
+            // Note that the PODistinct is not actually added to any Tez vertex, but rather is
+            // implemented by the action of the local rearrange, shuffle and project operations.
+            POForEach forEach = getForEach(project, op.getRequestedParallelism());
+            curTezOp.plan.addAsLeaf(forEach);
+            phyToTezOpMap.put(op, curTezOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Cannot compile " + op.getClass().getSimpleName();
+            throw new TezCompilerException(msg, errCode, PigException.BUG);
+        }
     }
 
     @Override
@@ -522,7 +549,7 @@ public class TezCompiler extends PhyPlan
 
             // Then add a POPackage and a POForEach to the start of the new tezOp.
             POPackage pkg = getPackage();
-            POForEach forEach = getPlainForEach();
+            POForEach forEach = getForEachPlain();
             curTezOp.plan.add(pkg);
             curTezOp.plan.addAsLeaf(forEach);
 
@@ -705,6 +732,31 @@ public class TezCompiler extends PhyPlan
         }
     }
 
+    private POForEach getForEach(POProject project, int rp) {
+        PhysicalPlan forEachPlan = new PhysicalPlan();
+        forEachPlan.add(project);
+
+        List<PhysicalPlan> forEachPlans = Lists.newArrayList();
+        forEachPlans.add(forEachPlan);
+
+        List<Boolean> flatten = Lists.newArrayList();
+        flatten.add(true);
+
+        POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), rp, forEachPlans, flatten);
+        forEach.setResultType(DataType.BAG);
+        return forEach;
+    }
+
+    // Get a plain POForEach: ForEach X generate flatten($1)
+    private POForEach getForEachPlain() {
+        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        project.setResultType(DataType.TUPLE);
+        project.setStar(false);
+        project.setColumn(1);
+        project.setOverloaded(true);
+        return getForEach(project, -1);
+    }
+
     private POLoad getLoad() {
         POLoad ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope)));
         ld.setPc(pigContext);
@@ -746,28 +798,6 @@ public class TezCompiler extends PhyPlan
         return pkg;
     }
 
-    // Get a simple POForEach: ForEach X generate flatten($1)
-    private POForEach getPlainForEach() {
-        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
-        project.setResultType(DataType.TUPLE);
-        project.setStar(false);
-        project.setColumn(1);
-        project.setOverloaded(true);
-
-        PhysicalPlan addPlan = new PhysicalPlan();
-        addPlan.add(project);
-
-        List<PhysicalPlan> addPlans = Lists.newArrayList();
-        addPlans.add(addPlan);
-
-        List<Boolean> flatten = Lists.newArrayList();
-        flatten.add(true);
-
-        POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), -1, addPlans, flatten);
-        forEach.setResultType(DataType.BAG);
-        return forEach;
-    }
-
     private TezOperator getTezOp() {
         return new TezOperator(new OperatorKey(scope, nig.getNextNodeId(scope)));
     }

Modified: pig/branches/tez/test/e2e/pig/tests/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/tez.conf?rev=1539099&r1=1539098&r2=1539099&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/tez.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/tez.conf Tue Nov  5 18:51:03 2013
@@ -93,6 +93,14 @@ b = limit a 100;
 c = filter b by age>30;
 store c into ':OUTPATH:';\,
                         },
+                        {
+                        'num' => 2,
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:double);
+b = foreach a generate age;
+c = distinct b;
+d = filter c by age > 30;
+store d into ':OUTPATH:';\,
+                        },
                   ]
                 }
          ]

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld?rev=1539099&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld Tue Nov  5 18:51:03 2013
@@ -0,0 +1,31 @@
+#--------------------------------------------------
+# TEZ plan:
+#--------------------------------------------------
+Tez vertex scope-106
+Local Rearrange[tuple]{tuple}(true) - scope-108
+|   |
+|   Project[tuple][*] - scope-107
+|
+|---a: New For Each(false,false)[bag] - scope-100
+    |   |
+    |   Cast[int] - scope-95
+    |   |
+    |   |---Project[bytearray][0] - scope-94
+    |   |
+    |   Cast[int] - scope-98
+    |   |
+    |   |---Project[bytearray][1] - scope-97
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-93
+Tez vertex scope-109
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-105
+|
+|---c: New For Each(false)[bag] - scope-104
+    |   |
+    |   Project[int][1] - scope-102
+    |
+    |---New For Each(true)[bag] - scope-112
+        |   |
+        |   Project[tuple][0] - scope-111
+        |
+        |---Package[tuple]{tuple} - scope-110

Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1539099&r1=1539098&r2=1539099&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Tue Nov  5 18:51:03 2013
@@ -56,7 +56,8 @@ import org.junit.runner.RunWith;
     "testRun1",
     "testRun2",
     "testRun3",
-    "testRun4"
+    "testRun4",
+    "testRun5"
 })
 public class TestTezCompiler {
     private static PigContext pc;
@@ -126,6 +127,18 @@ public class TestTezCompiler {
         run(pp, "test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld");
     }
 
+    @Test
+    public void testRun5() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "b = distinct a;" +
+                "c = foreach b generate y;" +
+                "store c into 'file:///tmp/output';";
+
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        run(pp, "test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld");
+    }
+
     private void run(PhysicalPlan pp, String expectedFile) throws Exception {
         TezCompiler comp = new TezCompiler(pp, pc);
         TezOperPlan tezPlan = comp.compile();