You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/05 19:51:03 UTC
svn commit: r1539099 - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
test/e2e/pig/tests/tez.conf
test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
test/org/apache/pig/tez/TestTezCompiler.java
Author: cheolsoo
Date: Tue Nov 5 18:51:03 2013
New Revision: 1539099
URL: http://svn.apache.org/r1539099
Log:
PIG-3536: Make distinct work (abain via cheolsoo)
Added:
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/test/e2e/pig/tests/tez.conf
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1539099&r1=1539098&r2=1539099&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Nov 5 18:51:03 2013
@@ -471,9 +471,36 @@ public class TezCompiler extends PhyPlan
@Override
public void visitDistinct(PODistinct op) throws VisitorException {
- int errCode = 2034;
- String msg = "Cannot compile " + op.getClass().getSimpleName();
- throw new TezCompilerException(msg, errCode, PigException.BUG);
+ try {
+ POLocalRearrange lr = getLocalRearrange();
+ lr.setDistinct(true);
+ curTezOp.plan.addAsLeaf(lr);
+ curTezOp.customPartitioner = op.getCustomPartitioner();
+
+ // Mark the start of a new TezOperator, connecting the inputs.
+ // TODO add distinct combiner as an optimization when supported by Tez
+ blocking();
+
+ POPackage pkg = getPackage();
+ pkg.setDistinct(true);
+ curTezOp.plan.add(pkg);
+
+ POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ project.setResultType(DataType.TUPLE);
+ project.setStar(false);
+ project.setColumn(0);
+ project.setOverloaded(false);
+
+ // Note that the PODistinct is not actually added to any Tez vertex, but rather is
+ // implemented by the action of the local rearrange, shuffle and project operations.
+ POForEach forEach = getForEach(project, op.getRequestedParallelism());
+ curTezOp.plan.addAsLeaf(forEach);
+ phyToTezOpMap.put(op, curTezOp);
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Cannot compile " + op.getClass().getSimpleName();
+ throw new TezCompilerException(msg, errCode, PigException.BUG);
+ }
}
@Override
@@ -522,7 +549,7 @@ public class TezCompiler extends PhyPlan
// Then add a POPackage and a POForEach to the start of the new tezOp.
POPackage pkg = getPackage();
- POForEach forEach = getPlainForEach();
+ POForEach forEach = getForEachPlain();
curTezOp.plan.add(pkg);
curTezOp.plan.addAsLeaf(forEach);
@@ -705,6 +732,31 @@ public class TezCompiler extends PhyPlan
}
}
+ private POForEach getForEach(POProject project, int rp) {
+ PhysicalPlan forEachPlan = new PhysicalPlan();
+ forEachPlan.add(project);
+
+ List<PhysicalPlan> forEachPlans = Lists.newArrayList();
+ forEachPlans.add(forEachPlan);
+
+ List<Boolean> flatten = Lists.newArrayList();
+ flatten.add(true);
+
+ POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), rp, forEachPlans, flatten);
+ forEach.setResultType(DataType.BAG);
+ return forEach;
+ }
+
+ // Get a plain POForEach: ForEach X generate flatten($1)
+ private POForEach getForEachPlain() {
+ POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ project.setResultType(DataType.TUPLE);
+ project.setStar(false);
+ project.setColumn(1);
+ project.setOverloaded(true);
+ return getForEach(project, -1);
+ }
+
private POLoad getLoad() {
POLoad ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope)));
ld.setPc(pigContext);
@@ -746,28 +798,6 @@ public class TezCompiler extends PhyPlan
return pkg;
}
- // Get a simple POForEach: ForEach X generate flatten($1)
- private POForEach getPlainForEach() {
- POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
- project.setResultType(DataType.TUPLE);
- project.setStar(false);
- project.setColumn(1);
- project.setOverloaded(true);
-
- PhysicalPlan addPlan = new PhysicalPlan();
- addPlan.add(project);
-
- List<PhysicalPlan> addPlans = Lists.newArrayList();
- addPlans.add(addPlan);
-
- List<Boolean> flatten = Lists.newArrayList();
- flatten.add(true);
-
- POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), -1, addPlans, flatten);
- forEach.setResultType(DataType.BAG);
- return forEach;
- }
-
private TezOperator getTezOp() {
return new TezOperator(new OperatorKey(scope, nig.getNextNodeId(scope)));
}
Modified: pig/branches/tez/test/e2e/pig/tests/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/tez.conf?rev=1539099&r1=1539098&r2=1539099&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/tez.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/tez.conf Tue Nov 5 18:51:03 2013
@@ -93,6 +93,14 @@ b = limit a 100;
c = filter b by age>30;
store c into ':OUTPATH:';\,
},
+ {
+ 'num' => 2,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:double);
+b = foreach a generate age;
+c = distinct b;
+d = filter c by age > 30;
+store d into ':OUTPATH:';\,
+ },
]
}
]
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld?rev=1539099&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld Tue Nov 5 18:51:03 2013
@@ -0,0 +1,31 @@
+#--------------------------------------------------
+# TEZ plan:
+#--------------------------------------------------
+Tez vertex scope-106
+Local Rearrange[tuple]{tuple}(true) - scope-108
+| |
+| Project[tuple][*] - scope-107
+|
+|---a: New For Each(false,false)[bag] - scope-100
+ | |
+ | Cast[int] - scope-95
+ | |
+ | |---Project[bytearray][0] - scope-94
+ | |
+ | Cast[int] - scope-98
+ | |
+ | |---Project[bytearray][1] - scope-97
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-93
+Tez vertex scope-109
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-105
+|
+|---c: New For Each(false)[bag] - scope-104
+ | |
+ | Project[int][1] - scope-102
+ |
+ |---New For Each(true)[bag] - scope-112
+ | |
+ | Project[tuple][0] - scope-111
+ |
+ |---Package[tuple]{tuple} - scope-110
Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1539099&r1=1539098&r2=1539099&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Tue Nov 5 18:51:03 2013
@@ -56,7 +56,8 @@ import org.junit.runner.RunWith;
"testRun1",
"testRun2",
"testRun3",
- "testRun4"
+ "testRun4",
+ "testRun5"
})
public class TestTezCompiler {
private static PigContext pc;
@@ -126,6 +127,18 @@ public class TestTezCompiler {
run(pp, "test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld");
}
+ @Test
+ public void testRun5() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input' as (x:int, y:int);" +
+ "b = distinct a;" +
+ "c = foreach b generate y;" +
+ "store c into 'file:///tmp/output';";
+
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+ run(pp, "test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld");
+ }
+
private void run(PhysicalPlan pp, String expectedFile) throws Exception {
TezCompiler comp = new TezCompiler(pp, pc);
TezOperPlan tezPlan = comp.compile();