You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC

svn commit: r1642132 [14/14] - in /pig/branches/spark: ./ bin/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/ contrib/piggybank/java/s...

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld Thu Nov 27 12:49:54 2014
@@ -2,88 +2,88 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-119
+# TEZ DAG plan: pig-0_scope-1
 #--------------------------------------------------
-Tez vertex scope-109	->	Tez vertex scope-110,
-Tez vertex scope-103	->	Tez vertex scope-110,
-Tez vertex scope-110
-
+Tez vertex scope-108	->	Tez vertex scope-109,
+Tez vertex scope-102	->	Tez vertex scope-109,
 Tez vertex scope-109
+
+Tez vertex scope-108
 # Plan on vertex
-POValueOutputTez - scope-113	->	 [scope-110]
+POValueOutputTez - scope-112	->	 [scope-109]
 |
-|---c: New For Each(false,false)[bag] - scope-89
+|---c: New For Each(false,false)[bag] - scope-88
     |   |
-    |   Cast[int] - scope-84
+    |   Cast[int] - scope-83
     |   |
-    |   |---Project[bytearray][1] - scope-83
+    |   |---Project[bytearray][1] - scope-82
     |   |
-    |   Cast[chararray] - scope-87
+    |   Cast[chararray] - scope-86
     |   |
-    |   |---Project[bytearray][0] - scope-86
+    |   |---Project[bytearray][0] - scope-85
     |
-    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-82
-Tez vertex scope-103
+    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-81
+Tez vertex scope-102
 # Plan on vertex
-1-12: Split - scope-121
+1-12: Split - scope-119
 |   |
-|   a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-77
+|   a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-76
 |   |
-|   |---a2: Filter[bag] - scope-72
+|   |---a2: Filter[bag] - scope-71
 |       |   |
-|       |   Not[boolean] - scope-76
+|       |   Not[boolean] - scope-75
 |       |   |
-|       |   |---Greater Than[boolean] - scope-75
+|       |   |---Greater Than[boolean] - scope-74
 |       |       |
-|       |       |---Project[int][0] - scope-73
+|       |       |---Project[int][0] - scope-72
 |       |       |
-|       |       |---Constant(100) - scope-74
+|       |       |---Constant(100) - scope-73
 |   |
-|   POValueOutputTez - scope-112	->	 [scope-110]
+|   POValueOutputTez - scope-111	->	 [scope-109]
 |   |
-|   |---a1: Filter[bag] - scope-78
+|   |---a1: Filter[bag] - scope-77
 |       |   |
-|       |   Greater Than[boolean] - scope-81
+|       |   Greater Than[boolean] - scope-80
 |       |   |
-|       |   |---Project[int][0] - scope-79
+|       |   |---Project[int][0] - scope-78
 |       |   |
-|       |   |---Constant(100) - scope-80
+|       |   |---Constant(100) - scope-79
 |
-|---a: New For Each(false,false)[bag] - scope-70
+|---a: New For Each(false,false)[bag] - scope-69
     |   |
-    |   Cast[int] - scope-65
+    |   Cast[int] - scope-64
     |   |
-    |   |---Project[bytearray][0] - scope-64
+    |   |---Project[bytearray][0] - scope-63
     |   |
-    |   Cast[chararray] - scope-68
+    |   Cast[chararray] - scope-67
     |   |
-    |   |---Project[bytearray][1] - scope-67
+    |   |---Project[bytearray][1] - scope-66
     |
-    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-63
-Tez vertex scope-110
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-62
+Tez vertex scope-109
 # Plan on vertex
-1-13: Split - scope-120
+1-13: Split - scope-118
 |   |
-|   d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-96
+|   d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-95
 |   |
-|   |---d: Filter[bag] - scope-92
+|   |---d: Filter[bag] - scope-91
 |       |   |
-|       |   Greater Than[boolean] - scope-95
+|       |   Greater Than[boolean] - scope-94
 |       |   |
-|       |   |---Project[int][0] - scope-93
+|       |   |---Project[int][0] - scope-92
 |       |   |
-|       |   |---Constant(500) - scope-94
+|       |   |---Constant(500) - scope-93
 |   |
-|   e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-102
+|   e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-101
 |   |
-|   |---e: Filter[bag] - scope-97
+|   |---e: Filter[bag] - scope-96
 |       |   |
-|       |   Not[boolean] - scope-101
+|       |   Not[boolean] - scope-100
 |       |   |
-|       |   |---Greater Than[boolean] - scope-100
+|       |   |---Greater Than[boolean] - scope-99
 |       |       |
-|       |       |---Project[int][0] - scope-98
+|       |       |---Project[int][0] - scope-97
 |       |       |
-|       |       |---Constant(500) - scope-99
+|       |       |---Constant(500) - scope-98
 |
-|---POShuffledValueInputTez - scope-111	<-	 [scope-109, scope-103]
+|---POShuffledValueInputTez - scope-110	<-	 [scope-108, scope-102]

Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld Thu Nov 27 12:49:54 2014
@@ -2,16 +2,16 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-56
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-40	->	Tez vertex group scope-59,Tez vertex group scope-60,
-Tez vertex scope-46	->	Tez vertex group scope-59,Tez vertex group scope-60,
+Tez vertex scope-40	->	Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex scope-46	->	Tez vertex group scope-58,Tez vertex group scope-59,
 Tez vertex group scope-59
-Tez vertex group scope-60
+Tez vertex group scope-58
 
 Tez vertex scope-40
 # Plan on vertex
-1-2: Split - scope-58
+1-2: Split - scope-57
 |   |
 |   a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-14
 |   |
@@ -25,7 +25,7 @@ Tez vertex scope-40
 |       |       |
 |       |       |---Constant(100) - scope-11
 |   |
-|   1-3: Split - scope-61
+|   1-3: Split - scope-60
 |   |   |
 |   |   d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-33
 |   |   |
@@ -70,7 +70,7 @@ Tez vertex scope-40
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-46
 # Plan on vertex
-1-3: Split - scope-62
+1-3: Split - scope-61
 |   |
 |   d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-33
 |   |
@@ -107,5 +107,5 @@ Tez vertex scope-46
     |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-19
 Tez vertex group scope-59	<-	 [scope-40, scope-46]	->	 null
 # No plan on vertex group
-Tez vertex group scope-60	<-	 [scope-40, scope-46]	->	 null
+Tez vertex group scope-58	<-	 [scope-40, scope-46]	->	 null
 # No plan on vertex group

Modified: pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java Thu Nov 27 12:49:54 2014
@@ -21,15 +21,20 @@ package org.apache.pig.tez;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.LoaderProcessor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.test.TestGroupConstParallel;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.tez.TezTaskStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezDAGStats;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.pig.tools.pigstats.tez.TezVertexStats;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Vertex;
 import org.junit.Assume;
@@ -46,7 +51,9 @@ public class TestGroupConstParallelTez e
 
     @Override
     public void checkGroupAllWithParallelGraphResult(JobGraph jGraph) {
-        TezTaskStats ts = (TezTaskStats)jGraph.getSinks().get(0);
+        TezDAGStats ds = (TezDAGStats) jGraph.getJobList().get(0);
+        jGraph = (JobGraph)ds.getPlan();
+        TezVertexStats ts = (TezVertexStats)jGraph.getSinks().get(0);
         assertEquals(ts.getParallelism(), 1);
     }
 
@@ -60,7 +67,7 @@ public class TestGroupConstParallelTez e
         ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
         parallelismSetter.visit();
 
-        DAG tezDag = DAG.create("test");
+        DAG tezDag = getTezDAG(tezPlan, pc);
         TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
         dagBuilder.visit();
         for (Vertex v : tezDag.getVertices()) {
@@ -80,7 +87,7 @@ public class TestGroupConstParallelTez e
         ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
         parallelismSetter.visit();
 
-        DAG tezDag = DAG.create("test");
+        DAG tezDag = getTezDAG(tezPlan, pc);
         TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
         dagBuilder.visit();
         for (Vertex v : tezDag.getVertices()) {
@@ -95,4 +102,13 @@ public class TestGroupConstParallelTez e
         comp.compile();
         return comp.getTezPlan();
     }
+
+    private DAG getTezDAG(TezOperPlan tezPlan, PigContext pc) {
+        TezPlanContainerNode tezPlanNode = new TezPlanContainerNode(OperatorKey.genOpKey("DAGName"), tezPlan);
+        TezScriptState scriptState = new TezScriptState("test");
+        ScriptState.start(scriptState);
+        scriptState.setDAGScriptInfo(tezPlanNode);
+        DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
+        return tezDag;
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java Thu Nov 27 12:49:54 2014
@@ -23,15 +23,19 @@ import static org.junit.Assert.assertTru
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.LoaderProcessor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.test.TestJobSubmission;
 import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Vertex;
@@ -57,7 +61,7 @@ public class TestJobSubmissionTez extend
         ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
         parallelismSetter.visit();
 
-        DAG tezDag = DAG.create("test");
+        DAG tezDag = getTezDAG(tezPlan, pc);
         TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
         try {
             dagBuilder.visit();
@@ -69,14 +73,14 @@ public class TestJobSubmissionTez extend
     @Override
     public void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
         TezOperPlan tezPlan = buildTezPlan(pp, pc);
-        
+
         LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc);
         loaderStorer.visit();
 
         ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
         parallelismSetter.visit();
 
-        DAG tezDag = DAG.create("test");
+        DAG tezDag = getTezDAG(tezPlan, pc);
         TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
         dagBuilder.visit();
         for (Vertex v : tezDag.getVertices()) {
@@ -96,4 +100,13 @@ public class TestJobSubmissionTez extend
         comp.compile();
         return comp.getTezPlan();
     }
+
+    private DAG getTezDAG(TezOperPlan tezPlan, PigContext pc) {
+        TezPlanContainerNode tezPlanNode = new TezPlanContainerNode(OperatorKey.genOpKey("DAGName"), tezPlan);
+        TezScriptState scriptState = new TezScriptState("test");
+        ScriptState.start(scriptState);
+        scriptState.setDAGScriptInfo(tezPlanNode);
+        DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
+        return tezDag;
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestSecondarySortTez.java Thu Nov 27 12:49:54 2014
@@ -20,10 +20,10 @@ package org.apache.pig.tez;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.CombinerOptimizer;
-import org.apache.pig.backend.hadoop.executionengine.tez.SecondaryKeyOptimizerTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.CombinerOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.SecondaryKeyOptimizerTez;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.TestSecondarySort;
@@ -47,12 +47,12 @@ public class TestSecondarySortTez extend
         TezCompiler comp = new TezCompiler(pp, pc);
         TezOperPlan tezPlan = comp.compile();
         boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
-                PigConfiguration.PROP_NO_COMBINER, "false"));
+                PigConfiguration.PIG_EXEC_NO_COMBINER, "false"));
 
         // Run CombinerOptimizer on Tez plan
         if (!nocombiner) {
             boolean doMapAgg = Boolean.parseBoolean(pc.getProperties()
-                    .getProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG,
+                    .getProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
                             "false"));
             CombinerOptimizer co = new CombinerOptimizer(tezPlan, doMapAgg);
             co.visit();

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java Thu Nov 27 12:49:54 2014
@@ -18,10 +18,15 @@
 package org.apache.pig.tez;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
@@ -33,6 +38,9 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
 import org.junit.After;
@@ -98,7 +106,7 @@ public class TestTezAutoParallelism {
         }
         w.close();
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
-        
+
         w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
         for (String name : boyNames) {
             w.println(name + "\t" + "M");
@@ -119,13 +127,14 @@ public class TestTezAutoParallelism {
         // parallelism is 3 originally, reduce to 1
         pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
         pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name;");
         pigServer.store("B", "output1");
         FileSystem fs = cluster.getFileSystem();
         FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
+            @Override
             public boolean accept(Path path) {
                 if (path.getName().startsWith("part")) {
                     return true;
@@ -141,7 +150,7 @@ public class TestTezAutoParallelism {
         // order by parallelism is 3 originally, reduce to 1
         pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
         pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name parallel 3;");
@@ -150,6 +159,7 @@ public class TestTezAutoParallelism {
         pigServer.store("D", "output2");
         FileSystem fs = cluster.getFileSystem();
         FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){
+            @Override
             public boolean accept(Path path) {
                 if (path.getName().startsWith("part")) {
                     return true;
@@ -173,6 +183,7 @@ public class TestTezAutoParallelism {
         pigServer.store("D", "output3");
         FileSystem fs = cluster.getFileSystem();
         FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){
+            @Override
             public boolean accept(Path path) {
                 if (path.getName().startsWith("part")) {
                     return true;
@@ -188,7 +199,7 @@ public class TestTezAutoParallelism {
         // skewed join parallelism is 4 originally, reduce to 1
         pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
         pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
@@ -196,6 +207,7 @@ public class TestTezAutoParallelism {
         pigServer.store("C", "output4");
         FileSystem fs = cluster.getFileSystem();
         FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){
+            @Override
             public boolean accept(Path path) {
                 if (path.getName().startsWith("part")) {
                     return true;
@@ -218,6 +230,7 @@ public class TestTezAutoParallelism {
         pigServer.store("C", "output5");
         FileSystem fs = cluster.getFileSystem();
         FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
+            @Override
             public boolean accept(Path path) {
                 if (path.getName().startsWith("part")) {
                     return true;
@@ -225,6 +238,40 @@ public class TestTezAutoParallelism {
                 return false;
             }
         });
-        assertEquals(files.length, 5);
+        assertEquals(files.length, 4);
+    }
+
+    @Test
+    public void testSkewedJoinIncreaseIntermediateParallelism() throws IOException{
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+        StringWriter writer = new StringWriter();
+        // When there is a combiner operation involved user specified parallelism is overriden
+        Util.createLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism", writer);
+        try {
+            pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+            pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
+            pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
+            pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+            pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+            pigServer.registerQuery("C = join A by name, B by name using 'skewed' parallel 1;");
+            pigServer.registerQuery("D = group C by A::name;");
+            pigServer.registerQuery("E = foreach D generate group, COUNT(C.A::name);");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            List<Tuple> expectedResults = Util
+                    .getTuplesFromConstantTupleStrings(new String[] {
+                            "('Abigail',56L)", "('Alexander',45L)", "('Ava',60L)",
+                            "('Daniel',68L)", "('Elizabeth',42L)",
+                            "('Emily',57L)", "('Emma',50L)", "('Ethan',50L)",
+                            "('Isabella',43L)", "('Jacob',43L)", "('Jayden',59L)",
+                            "('Liam',46L)", "('Madison',46L)", "('Mason',54L)",
+                            "('Mia',51L)", "('Michael',47L)", "('Noah',38L)",
+                            "('Olivia',50L)", "('Sophia',52L)", "('William',43L)" });
+
+            Util.checkQueryOutputsAfterSort(iter, expectedResults);
+            assertTrue(writer.toString().contains("Increased requested parallelism of scope-40 to 4"));
+        } finally {
+            Util.removeLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism");
+        }
     }
 }

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Thu Nov 27 12:49:54 2014
@@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.pig.PigConfiguration;
@@ -32,13 +31,10 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainer;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainerNode;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainerPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.test.Util;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.AfterClass;
@@ -77,8 +73,8 @@ public class TestTezCompiler {
     @Before
     public void setUp() throws ExecException {
         resetScope();
-        pc.getProperties().remove(PigConfiguration.OPT_MULTIQUERY);
-        pc.getProperties().remove(PigConfiguration.TEZ_OPT_UNION);
+        pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY);
+        pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION);
         pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
         pigServer = new PigServer(pc);
     }
@@ -86,6 +82,7 @@ public class TestTezCompiler {
     private void resetScope() {
         NodeIdGenerator.reset();
         PigServer.resetScope();
+        TezPlanContainer.resetScope();
     }
 
     @Test
@@ -96,7 +93,7 @@ public class TestTezCompiler {
                 "c = foreach b generate y;" +
                 "store c into 'file:///tmp/output';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld");
     }
 
     @Test
@@ -107,7 +104,7 @@ public class TestTezCompiler {
                 "c = foreach b generate group, COUNT(a.x);" +
                 "store c into 'file:///tmp/output';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld");
     }
 
     @Test
@@ -119,7 +116,7 @@ public class TestTezCompiler {
                 "d = foreach c generate a::x as x, y, z;" +
                 "store d into 'file:///tmp/output';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld");
     }
 
     @Test
@@ -131,7 +128,7 @@ public class TestTezCompiler {
                 "d = foreach c generate a::x as x, y, z;" +
                 "store d into 'file:///tmp/output';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld");
     }
 
     @Test
@@ -142,7 +139,31 @@ public class TestTezCompiler {
                 "c = foreach b generate y;" +
                 "store c into 'file:///tmp/output';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld");
+    }
+
+    @Test
+    public void testLimitOrderby() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "b = order a by x, y;" +
+                "c = limit b 10;" +
+                "store c into 'file:///tmp/output';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld");
+    }
+
+    @Test
+    public void testLimitScalarOrderby() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "b = order a by x, y;" +
+                "g = group a all;" +
+                "h = foreach g generate COUNT(a) as sum;" +
+                "c = limit b h.sum/2;" +
+                "store c into 'file:///tmp/output';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld");
     }
 
     @Test
@@ -153,7 +174,7 @@ public class TestTezCompiler {
                 "c = foreach b generate y;" +
                 "store c into 'file:///tmp/output';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld");
     }
 
     @Test
@@ -164,7 +185,7 @@ public class TestTezCompiler {
                 "c = foreach b { d = distinct a; generate COUNT(d); };" +
                 "store c into 'file:///tmp/output';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC13.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld");
     }
 
 
@@ -178,7 +199,7 @@ public class TestTezCompiler {
                 "d = join a by x, b by x, c by x using 'replicated';" +
                 "store d into 'file:///tmp/output/d';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC10.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld");
     }
 
     @Test
@@ -191,7 +212,7 @@ public class TestTezCompiler {
                 "d = join b1 by group, c by x using 'replicated';" +
                 "store d into 'file:///tmp/output/e';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC11.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld");
     }
 
     @Test
@@ -201,7 +222,7 @@ public class TestTezCompiler {
                 "b = stream a through `stream.pl -n 5`;" +
                 "STORE b INTO 'file:///tmp/output';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC12.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld");
     }
 
     @Test
@@ -212,11 +233,11 @@ public class TestTezCompiler {
                 "c = foreach b { d = limit a 10; e = order d by $1; f = order e by $0; generate group, f;};"+
                 "store c INTO 'file:///tmp/output';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC14.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld");
 
         // With optimization turned off
         setProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, "true");
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC15.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-2.gld");
     }
 
     @Test
@@ -226,7 +247,7 @@ public class TestTezCompiler {
                 "b = order a by x;" +
                 "STORE b INTO 'file:///tmp/output';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld");
     }
 
     // PIG-3759, PIG-3781
@@ -240,7 +261,7 @@ public class TestTezCompiler {
                 "d = foreach c generate group, COUNT(a.y), COUNT(b.z);" +
                 "store d into 'file:///tmp/output/d';";
 
-        run(query, "test/org/apache/pig/test/data/GoldenFiles/TEZC18.gld");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld");
     }
 
     @Test
@@ -252,9 +273,9 @@ public class TestTezCompiler {
                 "store c into 'file:///tmp/output/c';" +
                 "store d into 'file:///tmp/output/d';";
 
-        setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true);
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld");
-        setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false);
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld");
     }
 
@@ -288,9 +309,9 @@ public class TestTezCompiler {
                 "store f1 into 'file:///tmp/output/f1';" +
                 "store f2 into 'file:///tmp/output/f2';";
 
-        setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true);
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld");
-        setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false);
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld");
     }
 
@@ -305,9 +326,9 @@ public class TestTezCompiler {
                 "store b into 'file:///tmp/output/b';" +
                 "store c into 'file:///tmp/output/c';";
 
-        setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true);
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld");
-        setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false);
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3-OPTOFF.gld");
     }
 
@@ -323,9 +344,9 @@ public class TestTezCompiler {
                 "store d into 'file:///tmp/output/d';" +
                 "store e into 'file:///tmp/output/e';";
 
-        setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true);
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld");
-        setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false);
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4-OPTOFF.gld");
     }
 
@@ -342,9 +363,9 @@ public class TestTezCompiler {
                 "e = foreach d GENERATE a::x, a::y;" +
                 "store e into 'file:///tmp/output/e';";
 
-        setProperty(PigConfiguration.OPT_MULTIQUERY, "" + true);
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld");
-        setProperty(PigConfiguration.OPT_MULTIQUERY, "" + false);
+        setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5-OPTOFF.gld");
     }
 
@@ -356,9 +377,9 @@ public class TestTezCompiler {
                 "c = union onschema a, b;" +
                 "store c into 'file:///tmp/output';";
 
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld");
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
     }
 
@@ -372,9 +393,9 @@ public class TestTezCompiler {
                 "e = foreach d generate group, SUM(c.y);" +
                 "store e into 'file:///tmp/output';";
 
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld");
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld");
     }
 
@@ -388,9 +409,9 @@ public class TestTezCompiler {
                 "e = join c by x, d by x;" +
                 "store e into 'file:///tmp/output';";
 
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld");
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld");
     }
 
@@ -406,9 +427,9 @@ public class TestTezCompiler {
                 "store e into 'file:///tmp/output';";
 
         //TODO: PIG-3856 Not optimized
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld");
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld");
 
         query =
@@ -420,9 +441,9 @@ public class TestTezCompiler {
                 "store e into 'file:///tmp/output';";
 
         // Optimized
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld");
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld");
     }
 
@@ -436,9 +457,9 @@ public class TestTezCompiler {
                 "e = join c by x, d by x using 'skewed';" +
                 "store e into 'file:///tmp/output';";
 
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld");
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld");
     }
 
@@ -451,9 +472,9 @@ public class TestTezCompiler {
                 "d = order c by x;" +
                 "store d into 'file:///tmp/output';";
 
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld");
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld");
     }
 
@@ -467,9 +488,9 @@ public class TestTezCompiler {
                 "d = limit c 1;" +
                 "store d into 'file:///tmp/output';";
 
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld");
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld");
     }
 
@@ -485,9 +506,9 @@ public class TestTezCompiler {
                 "store d into 'file:///tmp/output/d';" +
                 "store e into 'file:///tmp/output/e';";
 
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld");
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld");
     }
 
@@ -502,10 +523,10 @@ public class TestTezCompiler {
                 "f = group e by x;" +
                 "store f into 'file:///tmp/output';";
 
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld");
         resetScope();
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld");
     }
 
@@ -520,10 +541,10 @@ public class TestTezCompiler {
                 "e = union onschema c, d;" +
                 "store e into 'file:///tmp/output';";
 
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + true);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld");
         resetScope();
-        setProperty(PigConfiguration.TEZ_OPT_UNION, "" + false);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11-OPTOFF.gld");
     }
 
@@ -557,10 +578,6 @@ public class TestTezCompiler {
         TezLauncher launcher = new TezLauncher();
         pc.inExplain = true;
         TezPlanContainer tezPlanContainer = launcher.compile(pp, pc);
-        for (Map.Entry<OperatorKey,TezPlanContainerNode> entry : tezPlanContainer.getKeys().entrySet()) {
-            TezOperPlan tezPlan = entry.getValue().getNode();
-            TezLauncher.optimize(tezPlan, pc);
-        }
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java Thu Nov 27 12:49:54 2014
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -41,20 +40,22 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.MultiQueryOptimizerTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor;
-import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.test.Util;
 import org.apache.pig.test.junit.OrderedJUnit4Runner;
 import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Vertex;
 import org.junit.AfterClass;
@@ -190,7 +191,7 @@ public class TestTezJobControlCompiler {
         @Override
         public List<InputSplit> getSplits(JobContext job) throws IOException {
             String inputDir = job.getConfiguration().get(INPUT_DIR, "");
-            String numSplitString = inputDir.substring(inputDir.lastIndexOf(File.separator)+1);
+            String numSplitString = inputDir.substring(inputDir.lastIndexOf("/")+1);
             int numSplit = Integer.parseInt(numSplitString);
             List<InputSplit> splits = new ArrayList<InputSplit>();
             for (int i=0;i<numSplit;i++) {
@@ -254,7 +255,7 @@ public class TestTezJobControlCompiler {
         Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
         assertEquals(leafVertex.getParallelism(), 15);
     }
-    
+
     @Test
     public void testTezParallelismEstimatorSplitBranch() throws Exception{
         pc.getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
@@ -270,7 +271,7 @@ public class TestTezJobControlCompiler {
         Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
         assertEquals(leafVertex.getParallelism(), 7);
     }
-    
+
     @Test
     public void testTezParallelismDefaultParallelism() throws Exception{
         pc.getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
@@ -289,14 +290,15 @@ public class TestTezJobControlCompiler {
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         TezCompiler comp = new TezCompiler(pp, pc);
         TezOperPlan tezPlan = comp.compile();
+        TezLauncher.processLoadAndParallelism(tezPlan, pc);
+
+        TezPlanContainerNode tezPlanNode = new TezPlanContainerNode(OperatorKey.genOpKey("DAGName"), tezPlan);
+        TezScriptState scriptState = new TezScriptState("test");
+        ScriptState.start(scriptState);
+        scriptState.setDAGScriptInfo(tezPlanNode);
+
         TezJobCompiler jobComp = new TezJobCompiler(pc, new Configuration());
-        MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan);
-        mqOptimizer.visit();
-        LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc);
-        loaderStorer.visit();
-        ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
-        parallelismSetter.visit();
-        DAG dag = jobComp.buildDAG(tezPlan, new HashMap<String, LocalResource>());
+        DAG dag = jobComp.buildDAG(tezPlanNode, new HashMap<String, LocalResource>());
         return new Pair<TezOperPlan, DAG>(tezPlan, dag);
     }
 }

Modified: pig/branches/spark/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/tez-tests?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/tez-tests (original)
+++ pig/branches/spark/test/tez-tests Thu Nov 27 12:49:54 2014
@@ -1,3 +1,4 @@
+**/TestStreamingUDF.java
 **/TestAccumulator.java
 **/TestAlgebraicEval.java
 **/TestBZip.java
@@ -11,11 +12,11 @@
 **/TestCustomPartitioner.java
 **/TestEvalPipeline.java
 **/TestEvalPipeline2.java
+**/TestFRJoin.java
+**/TestFRJoinNullValue.java
 **/TestFilterUDF.java
 **/TestFinish.java
 **/TestForEachNestedPlan.java
-**/TestFRJoin.java
-**/TestFRJoinNullValue.java
 **/TestGrunt.java
 **/TestImplicitSplit.java
 **/TestInputOutputMiniClusterFileValidator.java
@@ -29,11 +30,14 @@
 **/TestMapReduce.java
 **/TestMapSideCogroup.java
 **/TestMapReduce2.java
+**/TestMergeJoin.java
 **/TestMergeJoinOuter.java
+**/TestNativeMapReduce.java
 **/TestNestedForeach.java
 **/TestNewPlanImplicitSplit.java
 **/TestParser.java
 **/TestPigContext.java
+**/TestPigProgressReporting.java
 **/TestPigServer.java
 **/TestPigServerWithMacros.java
 **/TestPigSplit.java
@@ -52,21 +56,14 @@
 **/TestStoreInstances.java
 **/TestStoreOld.java
 **/TestStreaming.java
-**/TestStreamingUDF.java
 **/TestToolsPigServer.java
 **/TestUDF.java
 **/TestUDFContext.java
+**/TestGroupConstParallelTez.java
+**/TestJobSubmissionTez.java
+**/TestLoaderStorerShipCacheFilesTez.java
 **/TestSecondarySortTez.java
 **/TestTezAutoParallelism.java
 **/TestTezCompiler.java
 **/TestTezJobControlCompiler.java
 **/TestTezLauncher.java
-**/TestAccumuloPigCluster.java
-**/TestBigTypeSort.java
-**/TestCurrentTime.java
-**/TestInvokerGenerator.java
-**/TestGroupConstParallelTez.java
-**/TestJobSubmissionTez.java
-**/TestMergeJoin.java
-**/TestNativeMapReduce.java
-**/TestPigProgressReporting.java