You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC
svn commit: r1642132 [14/14] - in /pig/branches/spark: ./ bin/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/
contrib/piggybank/java/s...
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld Thu Nov 27 12:49:54 2014
@@ -2,88 +2,88 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-119
+# TEZ DAG plan: pig-0_scope-1
#--------------------------------------------------
-Tez vertex scope-109 -> Tez vertex scope-110,
-Tez vertex scope-103 -> Tez vertex scope-110,
-Tez vertex scope-110
-
+Tez vertex scope-108 -> Tez vertex scope-109,
+Tez vertex scope-102 -> Tez vertex scope-109,
Tez vertex scope-109
+
+Tez vertex scope-108
# Plan on vertex
-POValueOutputTez - scope-113 -> [scope-110]
+POValueOutputTez - scope-112 -> [scope-109]
|
-|---c: New For Each(false,false)[bag] - scope-89
+|---c: New For Each(false,false)[bag] - scope-88
| |
- | Cast[int] - scope-84
+ | Cast[int] - scope-83
| |
- | |---Project[bytearray][1] - scope-83
+ | |---Project[bytearray][1] - scope-82
| |
- | Cast[chararray] - scope-87
+ | Cast[chararray] - scope-86
| |
- | |---Project[bytearray][0] - scope-86
+ | |---Project[bytearray][0] - scope-85
|
- |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-82
-Tez vertex scope-103
+ |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-81
+Tez vertex scope-102
# Plan on vertex
-1-12: Split - scope-121
+1-12: Split - scope-119
| |
-| a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-77
+| a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-76
| |
-| |---a2: Filter[bag] - scope-72
+| |---a2: Filter[bag] - scope-71
| | |
-| | Not[boolean] - scope-76
+| | Not[boolean] - scope-75
| | |
-| | |---Greater Than[boolean] - scope-75
+| | |---Greater Than[boolean] - scope-74
| | |
-| | |---Project[int][0] - scope-73
+| | |---Project[int][0] - scope-72
| | |
-| | |---Constant(100) - scope-74
+| | |---Constant(100) - scope-73
| |
-| POValueOutputTez - scope-112 -> [scope-110]
+| POValueOutputTez - scope-111 -> [scope-109]
| |
-| |---a1: Filter[bag] - scope-78
+| |---a1: Filter[bag] - scope-77
| | |
-| | Greater Than[boolean] - scope-81
+| | Greater Than[boolean] - scope-80
| | |
-| | |---Project[int][0] - scope-79
+| | |---Project[int][0] - scope-78
| | |
-| | |---Constant(100) - scope-80
+| | |---Constant(100) - scope-79
|
-|---a: New For Each(false,false)[bag] - scope-70
+|---a: New For Each(false,false)[bag] - scope-69
| |
- | Cast[int] - scope-65
+ | Cast[int] - scope-64
| |
- | |---Project[bytearray][0] - scope-64
+ | |---Project[bytearray][0] - scope-63
| |
- | Cast[chararray] - scope-68
+ | Cast[chararray] - scope-67
| |
- | |---Project[bytearray][1] - scope-67
+ | |---Project[bytearray][1] - scope-66
|
- |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-63
-Tez vertex scope-110
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-62
+Tez vertex scope-109
# Plan on vertex
-1-13: Split - scope-120
+1-13: Split - scope-118
| |
-| d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-96
+| d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-95
| |
-| |---d: Filter[bag] - scope-92
+| |---d: Filter[bag] - scope-91
| | |
-| | Greater Than[boolean] - scope-95
+| | Greater Than[boolean] - scope-94
| | |
-| | |---Project[int][0] - scope-93
+| | |---Project[int][0] - scope-92
| | |
-| | |---Constant(500) - scope-94
+| | |---Constant(500) - scope-93
| |
-| e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-102
+| e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-101
| |
-| |---e: Filter[bag] - scope-97
+| |---e: Filter[bag] - scope-96
| | |
-| | Not[boolean] - scope-101
+| | Not[boolean] - scope-100
| | |
-| | |---Greater Than[boolean] - scope-100
+| | |---Greater Than[boolean] - scope-99
| | |
-| | |---Project[int][0] - scope-98
+| | |---Project[int][0] - scope-97
| | |
-| | |---Constant(500) - scope-99
+| | |---Constant(500) - scope-98
|
-|---POShuffledValueInputTez - scope-111 <- [scope-109, scope-103]
+|---POShuffledValueInputTez - scope-110 <- [scope-108, scope-102]
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld Thu Nov 27 12:49:54 2014
@@ -2,16 +2,16 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-56
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
-Tez vertex scope-40 -> Tez vertex group scope-59,Tez vertex group scope-60,
-Tez vertex scope-46 -> Tez vertex group scope-59,Tez vertex group scope-60,
+Tez vertex scope-40 -> Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex scope-46 -> Tez vertex group scope-58,Tez vertex group scope-59,
Tez vertex group scope-59
-Tez vertex group scope-60
+Tez vertex group scope-58
Tez vertex scope-40
# Plan on vertex
-1-2: Split - scope-58
+1-2: Split - scope-57
| |
| a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-14
| |
@@ -25,7 +25,7 @@ Tez vertex scope-40
| | |
| | |---Constant(100) - scope-11
| |
-| 1-3: Split - scope-61
+| 1-3: Split - scope-60
| | |
| | d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-33
| | |
@@ -70,7 +70,7 @@ Tez vertex scope-40
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-46
# Plan on vertex
-1-3: Split - scope-62
+1-3: Split - scope-61
| |
| d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-33
| |
@@ -107,5 +107,5 @@ Tez vertex scope-46
|---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-19
Tez vertex group scope-59 <- [scope-40, scope-46] -> null
# No plan on vertex group
-Tez vertex group scope-60 <- [scope-40, scope-46] -> null
+Tez vertex group scope-58 <- [scope-40, scope-46] -> null
# No plan on vertex group
Modified: pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java Thu Nov 27 12:49:54 2014
@@ -21,15 +21,20 @@ package org.apache.pig.tez;
import static org.junit.Assert.assertEquals;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler;
import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.LoaderProcessor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.test.TestGroupConstParallel;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.tez.TezTaskStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezDAGStats;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.pig.tools.pigstats.tez.TezVertexStats;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Vertex;
import org.junit.Assume;
@@ -46,7 +51,9 @@ public class TestGroupConstParallelTez e
@Override
public void checkGroupAllWithParallelGraphResult(JobGraph jGraph) {
- TezTaskStats ts = (TezTaskStats)jGraph.getSinks().get(0);
+ TezDAGStats ds = (TezDAGStats) jGraph.getJobList().get(0);
+ jGraph = (JobGraph)ds.getPlan();
+ TezVertexStats ts = (TezVertexStats)jGraph.getSinks().get(0);
assertEquals(ts.getParallelism(), 1);
}
@@ -60,7 +67,7 @@ public class TestGroupConstParallelTez e
ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
parallelismSetter.visit();
- DAG tezDag = DAG.create("test");
+ DAG tezDag = getTezDAG(tezPlan, pc);
TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
dagBuilder.visit();
for (Vertex v : tezDag.getVertices()) {
@@ -80,7 +87,7 @@ public class TestGroupConstParallelTez e
ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
parallelismSetter.visit();
- DAG tezDag = DAG.create("test");
+ DAG tezDag = getTezDAG(tezPlan, pc);
TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
dagBuilder.visit();
for (Vertex v : tezDag.getVertices()) {
@@ -95,4 +102,13 @@ public class TestGroupConstParallelTez e
comp.compile();
return comp.getTezPlan();
}
+
+ private DAG getTezDAG(TezOperPlan tezPlan, PigContext pc) {
+ TezPlanContainerNode tezPlanNode = new TezPlanContainerNode(OperatorKey.genOpKey("DAGName"), tezPlan);
+ TezScriptState scriptState = new TezScriptState("test");
+ ScriptState.start(scriptState);
+ scriptState.setDAGScriptInfo(tezPlanNode);
+ DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
+ return tezDag;
+ }
}
Modified: pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java Thu Nov 27 12:49:54 2014
@@ -23,15 +23,19 @@ import static org.junit.Assert.assertTru
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler;
import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.LoaderProcessor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.test.TestJobSubmission;
import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Vertex;
@@ -57,7 +61,7 @@ public class TestJobSubmissionTez extend
ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
parallelismSetter.visit();
- DAG tezDag = DAG.create("test");
+ DAG tezDag = getTezDAG(tezPlan, pc);
TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
try {
dagBuilder.visit();
@@ -69,14 +73,14 @@ public class TestJobSubmissionTez extend
@Override
public void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
TezOperPlan tezPlan = buildTezPlan(pp, pc);
-
+
LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc);
loaderStorer.visit();
ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
parallelismSetter.visit();
- DAG tezDag = DAG.create("test");
+ DAG tezDag = getTezDAG(tezPlan, pc);
TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
dagBuilder.visit();
for (Vertex v : tezDag.getVertices()) {
@@ -96,4 +100,13 @@ public class TestJobSubmissionTez extend
comp.compile();
return comp.getTezPlan();
}
+
+ private DAG getTezDAG(TezOperPlan tezPlan, PigContext pc) {
+ TezPlanContainerNode tezPlanNode = new TezPlanContainerNode(OperatorKey.genOpKey("DAGName"), tezPlan);
+ TezScriptState scriptState = new TezScriptState("test");
+ ScriptState.start(scriptState);
+ scriptState.setDAGScriptInfo(tezPlanNode);
+ DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
+ return tezDag;
+ }
}
Modified: pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java Thu Nov 27 12:49:54 2014
@@ -20,10 +20,10 @@ package org.apache.pig.tez;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.CombinerOptimizer;
-import org.apache.pig.backend.hadoop.executionengine.tez.SecondaryKeyOptimizerTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.CombinerOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.SecondaryKeyOptimizerTez;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.TestSecondarySort;
@@ -47,12 +47,12 @@ public class TestSecondarySortTez extend
TezCompiler comp = new TezCompiler(pp, pc);
TezOperPlan tezPlan = comp.compile();
boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
- PigConfiguration.PROP_NO_COMBINER, "false"));
+ PigConfiguration.PIG_EXEC_NO_COMBINER, "false"));
// Run CombinerOptimizer on Tez plan
if (!nocombiner) {
boolean doMapAgg = Boolean.parseBoolean(pc.getProperties()
- .getProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG,
+ .getProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"false"));
CombinerOptimizer co = new CombinerOptimizer(tezPlan, doMapAgg);
co.visit();
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java Thu Nov 27 12:49:54 2014
@@ -18,10 +18,15 @@
package org.apache.pig.tez;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
import java.util.Random;
@@ -33,6 +38,9 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
import org.junit.After;
@@ -98,7 +106,7 @@ public class TestTezAutoParallelism {
}
w.close();
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
-
+
w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
for (String name : boyNames) {
w.println(name + "\t" + "M");
@@ -119,13 +127,14 @@ public class TestTezAutoParallelism {
// parallelism is 3 originally, reduce to 1
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = group A by name;");
pigServer.store("B", "output1");
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
+ @Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
@@ -141,7 +150,7 @@ public class TestTezAutoParallelism {
// order by parallelism is 3 originally, reduce to 1
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = group A by name parallel 3;");
@@ -150,6 +159,7 @@ public class TestTezAutoParallelism {
pigServer.store("D", "output2");
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){
+ @Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
@@ -173,6 +183,7 @@ public class TestTezAutoParallelism {
pigServer.store("D", "output3");
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){
+ @Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
@@ -188,7 +199,7 @@ public class TestTezAutoParallelism {
// skewed join parallelism is 4 originally, reduce to 1
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
@@ -196,6 +207,7 @@ public class TestTezAutoParallelism {
pigServer.store("C", "output4");
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){
+ @Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
@@ -218,6 +230,7 @@ public class TestTezAutoParallelism {
pigServer.store("C", "output5");
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
+ @Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
@@ -225,6 +238,40 @@ public class TestTezAutoParallelism {
return false;
}
});
- assertEquals(files.length, 5);
+ assertEquals(files.length, 4);
+ }
+
+ @Test
+ public void testSkewedJoinIncreaseIntermediateParallelism() throws IOException{
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+ StringWriter writer = new StringWriter();
+ // When there is a combiner operation involved user specified parallelism is overriden
+ Util.createLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism", writer);
+ try {
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+ pigServer.registerQuery("C = join A by name, B by name using 'skewed' parallel 1;");
+ pigServer.registerQuery("D = group C by A::name;");
+ pigServer.registerQuery("E = foreach D generate group, COUNT(C.A::name);");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ List<Tuple> expectedResults = Util
+ .getTuplesFromConstantTupleStrings(new String[] {
+ "('Abigail',56L)", "('Alexander',45L)", "('Ava',60L)",
+ "('Daniel',68L)", "('Elizabeth',42L)",
+ "('Emily',57L)", "('Emma',50L)", "('Ethan',50L)",
+ "('Isabella',43L)", "('Jacob',43L)", "('Jayden',59L)",
+ "('Liam',46L)", "('Madison',46L)", "('Mason',54L)",
+ "('Mia',51L)", "('Michael',47L)", "('Noah',38L)",
+ "('Olivia',50L)", "('Sophia',52L)", "('William',43L)" });
+
+ Util.checkQueryOutputsAfterSort(iter, expectedResults);
+ assertTrue(writer.toString().contains("Increased requested parallelism of scope-40 to 4"));
+ } finally {
+ Util.removeLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism");
+ }
}
}
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Thu Nov 27 12:49:54 2014
@@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.PrintStream;
-import java.util.Map;
import java.util.Properties;
import org.apache.pig.PigConfiguration;
@@ -32,13 +31,10 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainer;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainerNode;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainerPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.test.Util;
import org.apache.pig.test.utils.TestHelper;
import org.junit.AfterClass;
@@ -77,8 +73,8 @@ public class TestTezCompiler {
@Before
public void setUp() throws ExecException {
resetScope();
- pc.getProperties().remove(PigConfiguration.OPT_MULTIQUERY);
- pc.getProperties().remove(PigConfiguration.TEZ_OPT_UNION);
+ pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY);
+ pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION);
pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
pigServer = new PigServer(pc);
}
@@ -86,6 +82,7 @@ public class TestTezCompiler {
private void resetScope() {
NodeIdGenerator.reset();
PigServer.resetScope();
+ TezPlanContainer.resetScope();
}
@Test
@@ -96,7 +93,7 @@ public class TestTezCompiler {
"c = foreach b generate y;" +
"store c into 'file:///tmp/output';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld");
}
@Test
@@ -107,7 +104,7 @@ public class TestTezCompiler {
"c = foreach b generate group, COUNT(a.x);" +
"store c into 'file:///tmp/output';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld");
}
@Test
@@ -119,7 +116,7 @@ public class TestTezCompiler {
"d = foreach c generate a::x as x, y, z;" +
"store d into 'file:///tmp/output';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld");
}
@Test
@@ -131,7 +128,7 @@ public class TestTezCompiler {
"d = foreach c generate a::x as x, y, z;" +
"store d into 'file:///tmp/output';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld");
}
@Test
@@ -142,7 +139,31 @@ public class TestTezCompiler {
"c = foreach b generate y;" +
"store c into 'file:///tmp/output';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld");
+ }
+
+ @Test
+ public void testLimitOrderby() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input' as (x:int, y:int);" +
+ "b = order a by x, y;" +
+ "c = limit b 10;" +
+ "store c into 'file:///tmp/output';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld");
+ }
+
+ @Test
+ public void testLimitScalarOrderby() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input' as (x:int, y:int);" +
+ "b = order a by x, y;" +
+ "g = group a all;" +
+ "h = foreach g generate COUNT(a) as sum;" +
+ "c = limit b h.sum/2;" +
+ "store c into 'file:///tmp/output';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld");
}
@Test
@@ -153,7 +174,7 @@ public class TestTezCompiler {
"c = foreach b generate y;" +
"store c into 'file:///tmp/output';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld");
}
@Test
@@ -164,7 +185,7 @@ public class TestTezCompiler {
"c = foreach b { d = distinct a; generate COUNT(d); };" +
"store c into 'file:///tmp/output';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld");
}
@@ -178,7 +199,7 @@ public class TestTezCompiler {
"d = join a by x, b by x, c by x using 'replicated';" +
"store d into 'file:///tmp/output/d';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC10.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld");
}
@Test
@@ -191,7 +212,7 @@ public class TestTezCompiler {
"d = join b1 by group, c by x using 'replicated';" +
"store d into 'file:///tmp/output/e';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC11.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld");
}
@Test
@@ -201,7 +222,7 @@ public class TestTezCompiler {
"b = stream a through `stream.pl -n 5`;" +
"STORE b INTO 'file:///tmp/output';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC12.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld");
}
@Test
@@ -212,11 +233,11 @@ public class TestTezCompiler {
"c = foreach b { d = limit a 10; e = order d by $1; f = order e by $0; generate group, f;};"+
"store c INTO 'file:///tmp/output';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC14.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld");
// With optimization turned off
setProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, "true");
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC15.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-2.gld");
}
@Test
@@ -226,7 +247,7 @@ public class TestTezCompiler {
"b = order a by x;" +
"STORE b INTO 'file:///tmp/output';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld");
}
// PIG-3759, PIG-3781
@@ -240,7 +261,7 @@ public class TestTezCompiler {
"d = foreach c generate group, COUNT(a.y), COUNT(b.z);" +
"store d into 'file:///tmp/output/d';";
- run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC18.gld");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld");
}
@Test
@@ -252,9 +273,9 @@ public class TestTezCompiler {
"store c into 'file:///tmp/output/c';" +
"store d into 'file:///tmp/output/d';";
- setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true);
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld");
- setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false);
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld");
}
@@ -288,9 +309,9 @@ public class TestTezCompiler {
"store f1 into 'file:///tmp/output/f1';" +
"store f2 into 'file:///tmp/output/f2';";
- setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true);
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld");
- setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false);
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld");
}
@@ -305,9 +326,9 @@ public class TestTezCompiler {
"store b into 'file:///tmp/output/b';" +
"store c into 'file:///tmp/output/c';";
- setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true);
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld");
- setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false);
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3-OPTOFF.gld");
}
@@ -323,9 +344,9 @@ public class TestTezCompiler {
"store d into 'file:///tmp/output/d';" +
"store e into 'file:///tmp/output/e';";
- setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true);
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld");
- setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false);
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4-OPTOFF.gld");
}
@@ -342,9 +363,9 @@ public class TestTezCompiler {
"e = foreach d GENERATE a::x, a::y;" +
"store e into 'file:///tmp/output/e';";
- setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true);
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld");
- setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false);
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5-OPTOFF.gld");
}
@@ -356,9 +377,9 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"store c into 'file:///tmp/output';";
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld");
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
}
@@ -372,9 +393,9 @@ public class TestTezCompiler {
"e = foreach d generate group, SUM(c.y);" +
"store e into 'file:///tmp/output';";
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld");
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld");
}
@@ -388,9 +409,9 @@ public class TestTezCompiler {
"e = join c by x, d by x;" +
"store e into 'file:///tmp/output';";
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld");
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld");
}
@@ -406,9 +427,9 @@ public class TestTezCompiler {
"store e into 'file:///tmp/output';";
//TODO: PIG-3856 Not optimized
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld");
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld");
query =
@@ -420,9 +441,9 @@ public class TestTezCompiler {
"store e into 'file:///tmp/output';";
// Optimized
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld");
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld");
}
@@ -436,9 +457,9 @@ public class TestTezCompiler {
"e = join c by x, d by x using 'skewed';" +
"store e into 'file:///tmp/output';";
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld");
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld");
}
@@ -451,9 +472,9 @@ public class TestTezCompiler {
"d = order c by x;" +
"store d into 'file:///tmp/output';";
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld");
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld");
}
@@ -467,9 +488,9 @@ public class TestTezCompiler {
"d = limit c 1;" +
"store d into 'file:///tmp/output';";
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld");
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld");
}
@@ -485,9 +506,9 @@ public class TestTezCompiler {
"store d into 'file:///tmp/output/d';" +
"store e into 'file:///tmp/output/e';";
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld");
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld");
}
@@ -502,10 +523,10 @@ public class TestTezCompiler {
"f = group e by x;" +
"store f into 'file:///tmp/output';";
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld");
resetScope();
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld");
}
@@ -520,10 +541,10 @@ public class TestTezCompiler {
"e = union onschema c, d;" +
"store e into 'file:///tmp/output';";
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld");
resetScope();
- setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11-OPTOFF.gld");
}
@@ -557,10 +578,6 @@ public class TestTezCompiler {
TezLauncher launcher = new TezLauncher();
pc.inExplain = true;
TezPlanContainer tezPlanContainer = launcher.compile(pp, pc);
- for (Map.Entry<OperatorKey,TezPlanContainerNode> entry : tezPlanContainer.getKeys().entrySet()) {
- TezOperPlan tezPlan = entry.getValue().getNode();
- TezLauncher.optimize(tezPlan, pc);
- }
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java Thu Nov 27 12:49:54 2014
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -41,20 +40,22 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.MultiQueryOptimizerTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler;
import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.test.Util;
import org.apache.pig.test.junit.OrderedJUnit4Runner;
import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Vertex;
import org.junit.AfterClass;
@@ -190,7 +191,7 @@ public class TestTezJobControlCompiler {
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
String inputDir = job.getConfiguration().get(INPUT_DIR, "");
- String numSplitString = inputDir.substring(inputDir.lastIndexOf(File.separator)+1);
+ String numSplitString = inputDir.substring(inputDir.lastIndexOf("/")+1);
int numSplit = Integer.parseInt(numSplitString);
List<InputSplit> splits = new ArrayList<InputSplit>();
for (int i=0;i<numSplit;i++) {
@@ -254,7 +255,7 @@ public class TestTezJobControlCompiler {
Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
assertEquals(leafVertex.getParallelism(), 15);
}
-
+
@Test
public void testTezParallelismEstimatorSplitBranch() throws Exception{
pc.getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
@@ -270,7 +271,7 @@ public class TestTezJobControlCompiler {
Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
assertEquals(leafVertex.getParallelism(), 7);
}
-
+
@Test
public void testTezParallelismDefaultParallelism() throws Exception{
pc.getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
@@ -289,14 +290,15 @@ public class TestTezJobControlCompiler {
PhysicalPlan pp = Util.buildPp(pigServer, query);
TezCompiler comp = new TezCompiler(pp, pc);
TezOperPlan tezPlan = comp.compile();
+ TezLauncher.processLoadAndParallelism(tezPlan, pc);
+
+ TezPlanContainerNode tezPlanNode = new TezPlanContainerNode(OperatorKey.genOpKey("DAGName"), tezPlan);
+ TezScriptState scriptState = new TezScriptState("test");
+ ScriptState.start(scriptState);
+ scriptState.setDAGScriptInfo(tezPlanNode);
+
TezJobCompiler jobComp = new TezJobCompiler(pc, new Configuration());
- MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan);
- mqOptimizer.visit();
- LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc);
- loaderStorer.visit();
- ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
- parallelismSetter.visit();
- DAG dag = jobComp.buildDAG(tezPlan, new HashMap<String, LocalResource>());
+ DAG dag = jobComp.buildDAG(tezPlanNode, new HashMap<String, LocalResource>());
return new Pair<TezOperPlan, DAG>(tezPlan, dag);
}
}
Modified: pig/branches/spark/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/tez-tests?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/tez-tests (original)
+++ pig/branches/spark/test/tez-tests Thu Nov 27 12:49:54 2014
@@ -1,3 +1,4 @@
+**/TestStreamingUDF.java
**/TestAccumulator.java
**/TestAlgebraicEval.java
**/TestBZip.java
@@ -11,11 +12,11 @@
**/TestCustomPartitioner.java
**/TestEvalPipeline.java
**/TestEvalPipeline2.java
+**/TestFRJoin.java
+**/TestFRJoinNullValue.java
**/TestFilterUDF.java
**/TestFinish.java
**/TestForEachNestedPlan.java
-**/TestFRJoin.java
-**/TestFRJoinNullValue.java
**/TestGrunt.java
**/TestImplicitSplit.java
**/TestInputOutputMiniClusterFileValidator.java
@@ -29,11 +30,14 @@
**/TestMapReduce.java
**/TestMapSideCogroup.java
**/TestMapReduce2.java
+**/TestMergeJoin.java
**/TestMergeJoinOuter.java
+**/TestNativeMapReduce.java
**/TestNestedForeach.java
**/TestNewPlanImplicitSplit.java
**/TestParser.java
**/TestPigContext.java
+**/TestPigProgressReporting.java
**/TestPigServer.java
**/TestPigServerWithMacros.java
**/TestPigSplit.java
@@ -52,21 +56,14 @@
**/TestStoreInstances.java
**/TestStoreOld.java
**/TestStreaming.java
-**/TestStreamingUDF.java
**/TestToolsPigServer.java
**/TestUDF.java
**/TestUDFContext.java
+**/TestGroupConstParallelTez.java
+**/TestJobSubmissionTez.java
+**/TestLoaderStorerShipCacheFilesTez.java
**/TestSecondarySortTez.java
**/TestTezAutoParallelism.java
**/TestTezCompiler.java
**/TestTezJobControlCompiler.java
**/TestTezLauncher.java
-**/TestAccumuloPigCluster.java
-**/TestBigTypeSort.java
-**/TestCurrentTime.java
-**/TestInvokerGenerator.java
-**/TestGroupConstParallelTez.java
-**/TestJobSubmissionTez.java
-**/TestMergeJoin.java
-**/TestNativeMapReduce.java
-**/TestPigProgressReporting.java