You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC
svn commit: r1783988 [23/24] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -66,7 +66,7 @@ e: Local Rearrange[tuple]{int}(false) -
|---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-67
Tez vertex scope-94
# Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-86
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-86
|
|---e: New For Each(true,true)[tuple] - scope-85
| |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld Wed Feb 22 09:43:41 2017
@@ -65,7 +65,7 @@ Tez vertex group scope-45 <- [scope-37,
# No plan on vertex group
Tez vertex scope-44
# Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-36
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-36
|
|---e: New For Each(true,true)[tuple] - scope-35
| |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -58,7 +58,7 @@ Local Rearrange[tuple]{int}(false) - sco
|---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-58
Tez vertex scope-77
# Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-74
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-74
|
|---e: FRJoin[tuple] - scope-68 <- scope-81
| |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld Wed Feb 22 09:43:41 2017
@@ -58,7 +58,7 @@ Local Rearrange[tuple]{int}(false) - sco
|---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
Tez vertex scope-36
# Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-33
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-33
|
|---e: FRJoin[tuple] - scope-27 <- scope-40
| |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -48,7 +48,7 @@ Local Rearrange[tuple]{int}(false) - sco
|---POShuffledValueInputTez - scope-166 <- [scope-163, scope-164]
Tez vertex scope-162
# Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-161
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-161
|
|---e: FRJoin[tuple] - scope-155 <- scope-165
| |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld Wed Feb 22 09:43:41 2017
@@ -47,7 +47,7 @@ Tez vertex group scope-123 <- [scope-11
# No plan on vertex group
Tez vertex scope-116
# Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-115
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-115
|
|---e: FRJoin[tuple] - scope-109 <- scope-123
| |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -48,7 +48,7 @@ Local Rearrange[tuple]{tuple}(false) - s
| |
| Constant(DummyVal) - scope-122
|
-|---New For Each(true,true)[tuple] - scope-127
+|---New For Each(false,true)[tuple] - scope-127
| |
| Project[int][0] - scope-110
| |
@@ -104,7 +104,7 @@ Partition Rearrange[tuple]{int}(false) -
|---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-102
Tez vertex scope-142
# Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-113
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-113
|
|---New For Each(true,true)[tuple] - scope-146
| |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld Wed Feb 22 09:43:41 2017
@@ -19,7 +19,7 @@ Local Rearrange[tuple]{tuple}(false) - s
| |
| Constant(DummyVal) - scope-68
|
-|---New For Each(true,true)[tuple] - scope-74
+|---New For Each(false,true)[tuple] - scope-74
| |
| Project[int][0] - scope-71
| |
@@ -50,7 +50,7 @@ Local Rearrange[tuple]{tuple}(false) - s
| |
| Constant(DummyVal) - scope-78
|
-|---New For Each(true,true)[tuple] - scope-84
+|---New For Each(false,true)[tuple] - scope-84
| |
| Project[int][0] - scope-81
| |
@@ -120,7 +120,7 @@ Partition Rearrange[tuple]{int}(false) -
|---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
Tez vertex scope-57
# Plan on vertex
-e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-28
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-28
|
|---New For Each(true,true)[tuple] - scope-61
| |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -86,7 +86,7 @@ POIdentityInOutTez - scope-118 <- scope
| Project[int][0] - scope-90
Tez vertex scope-119
# Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-92
+d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-92
|
|---New For Each(true)[tuple] - scope-122
| |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld Wed Feb 22 09:43:41 2017
@@ -102,7 +102,7 @@ POIdentityInOutTez - scope-45 <- scope-
| Project[int][0] - scope-17
Tez vertex scope-46
# Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-19
+d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-19
|
|---New For Each(true)[tuple] - scope-49
| |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -64,7 +64,7 @@ POValueOutputTez - scope-73 -> [scope-7
|---POValueInputTez - scope-68 <- scope-65
Tez vertex scope-70
# Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-59
+d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-59
|
|---d: Limit - scope-58
|
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld Wed Feb 22 09:43:41 2017
@@ -64,7 +64,7 @@ POValueOutputTez - scope-36 -> [scope-3
|---POValueInputTez - scope-31 <- scope-28
Tez vertex scope-33
# Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-22
+d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-22
|
|---d: Limit - scope-21
|
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld Wed Feb 22 09:43:41 2017
@@ -15,7 +15,7 @@ Tez vertex scope-45
| |
| a2: Split - scope-68
| | |
-| | a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-17
+| | a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-17
| | |
| | POValueOutputTez - scope-60 -> [scope-57]
| |
@@ -74,7 +74,7 @@ Tez vertex scope-57
# Plan on vertex
1-3: Split - scope-67
| |
-| d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-38
+| d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-38
| |
| |---d: Filter[bag] - scope-34
| | |
@@ -84,7 +84,7 @@ Tez vertex scope-57
| | |
| | |---Constant(500) - scope-36
| |
-| e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-44
+| e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-44
| |
| |---e: Filter[bag] - scope-39
| | |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld (original)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld Wed Feb 22 09:43:41 2017
@@ -4,7 +4,7 @@
#--------------------------------------------------
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
-Tez vertex scope-45 -> Tez vertex group scope-70,Tez vertex group scope-70,Tez vertex group scope-71,Tez vertex group scope-71,
+Tez vertex scope-45 -> Tez vertex group scope-70,Tez vertex group scope-71,
Tez vertex scope-56 -> Tez vertex group scope-70,Tez vertex group scope-71,
Tez vertex group scope-70
Tez vertex group scope-71
@@ -15,11 +15,11 @@ Tez vertex scope-45
| |
| a2: Split - scope-68
| | |
-| | a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-17
+| | a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-17
| | |
| | 1-3: Split - scope-72
| | | |
-| | | d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-77 -> scope-38
+| | | d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-77 -> scope-38
| | | |
| | | |---d: Filter[bag] - scope-73
| | | | |
@@ -29,7 +29,7 @@ Tez vertex scope-45
| | | | |
| | | | |---Constant(500) - scope-75
| | | |
-| | | e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-83 -> scope-44
+| | | e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-83 -> scope-44
| | | |
| | | |---e: Filter[bag] - scope-78
| | | | |
@@ -53,7 +53,7 @@ Tez vertex scope-45
| |
| 1-3: Split - scope-84
| | |
-| | d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-89 -> scope-38
+| | d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-89 -> scope-38
| | |
| | |---d: Filter[bag] - scope-85
| | | |
@@ -63,7 +63,7 @@ Tez vertex scope-45
| | | |
| | | |---Constant(500) - scope-87
| | |
-| | e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-95 -> scope-44
+| | e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-95 -> scope-44
| | |
| | |---e: Filter[bag] - scope-90
| | | |
@@ -98,7 +98,7 @@ Tez vertex scope-56
# Plan on vertex
1-3: Split - scope-96
| |
-| d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-101 -> scope-38
+| d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-101 -> scope-38
| |
| |---d: Filter[bag] - scope-97
| | |
@@ -108,7 +108,7 @@ Tez vertex scope-56
| | |
| | |---Constant(500) - scope-99
| |
-| e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-107 -> scope-44
+| e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-107 -> scope-44
| |
| |---e: Filter[bag] - scope-102
| | |
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java Wed Feb 22 09:43:41 2017
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
@@ -47,6 +48,7 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
+import org.apache.tez.dag.api.TezConfiguration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -62,12 +64,23 @@ public class TestTezAutoParallelism {
private static Properties properties;
private static MiniGenericCluster cluster;
+ private static final PathFilter PART_FILE_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith("part")) {
+ return true;
+ }
+ return false;
+ }
+ };
+
@BeforeClass
public static void oneTimeSetUp() throws Exception {
cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ);
properties = cluster.getProperties();
//Test spilling to disk as tests here have multiple splits
properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, "10");
+ properties.setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false");
createFiles();
}
@@ -84,6 +97,11 @@ public class TestTezAutoParallelism {
@After
public void tearDown() throws Exception {
+ removeProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION);
+ removeProperty(MRConfiguration.MAX_SPLIT_SIZE);
+ removeProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
+ removeProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+ removeProperty(TezConfiguration.TEZ_AM_LOG_LEVEL);
pigServer.shutdown();
pigServer = null;
}
@@ -131,32 +149,53 @@ public class TestTezAutoParallelism {
@Test
public void testGroupBy() throws IOException{
// parallelism is 3 originally, reduce to 1
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = group A by name;");
pigServer.store("B", "output1");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER);
assertEquals(files.length, 1);
+ fs.delete(new Path("output1"), true);
+ }
+
+ @Test
+ public void testBytesPerReducer() throws IOException{
+
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+
+ StringWriter writer = new StringWriter();
+ Util.createLogAppender("testAutoParallelism", writer, TezDagBuilder.class);
+ try {
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = group A by name;");
+ pigServer.store("B", "output1");
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER);
+ assertEquals(files.length, 10);
+ String log = writer.toString();
+ assertTrue(log.contains("For vertex - scope-13: parallelism=3"));
+ assertTrue(log.contains("For vertex - scope-14: parallelism=10"));
+ } finally {
+ Util.removeLogAppender("testAutoParallelism", TezDagBuilder.class);
+ Util.deleteFile(cluster, "output1");
+ }
}
@Test
public void testOrderbyDecreaseParallelism() throws IOException{
// order by parallelism is 3 originally, reduce to 1
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = group A by name parallel 3;");
@@ -164,86 +203,54 @@ public class TestTezAutoParallelism {
pigServer.registerQuery("D = order C by age;");
pigServer.store("D", "output2");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output2"), PART_FILE_FILTER);
assertEquals(files.length, 1);
}
@Test
public void testOrderbyIncreaseParallelism() throws IOException{
// order by parallelism is 3 originally, increase to 4
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = group A by name parallel 3;");
pigServer.registerQuery("C = foreach B generate group as name, AVG(A.age) as age;");
pigServer.registerQuery("D = order C by age;");
pigServer.store("D", "output3");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output3"), PART_FILE_FILTER);
assertEquals(files.length, 4);
}
@Test
public void testSkewedJoinDecreaseParallelism() throws IOException{
// skewed join parallelism is 4 originally, reduce to 1
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
pigServer.store("C", "output4");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output4"), PART_FILE_FILTER);
assertEquals(files.length, 1);
}
@Test
public void testSkewedJoinIncreaseParallelism() throws IOException{
// skewed join parallelism is 3 originally, increase to 5
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
pigServer.store("C", "output5");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER);
assertEquals(files.length, 5);
}
@@ -251,23 +258,15 @@ public class TestTezAutoParallelism {
public void testSkewedFullJoinIncreaseParallelism() throws IOException{
// skewed full join parallelism take the initial setting, since the join vertex has a broadcast(sample) dependency,
// which prevent it changing parallelism
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
pigServer.registerQuery("C = join A by name full, B by name using 'skewed';");
pigServer.store("C", "output6");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER);
assertEquals(files.length, 5);
}
@@ -275,9 +274,9 @@ public class TestTezAutoParallelism {
public void testSkewedJoinIncreaseParallelismWithScalar() throws IOException{
// skewed join parallelism take the initial setting, since the join vertex has a broadcast(scalar) dependency,
// which prevent it changing parallelism
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
@@ -287,19 +286,29 @@ public class TestTezAutoParallelism {
pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
pigServer.store("G", "output7");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output7"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output7"), PART_FILE_FILTER);
assertEquals(files.length, 4);
}
@Test
+ public void testSkewedJoinRightInputAutoParallelism() throws IOException{
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+ setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "1.0");
+ setProperty(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = FILTER B by name == 'Noah';");
+ pigServer.registerQuery("B1 = group B by name;");
+ pigServer.registerQuery("C = join A by name, B1 by group using 'skewed';");
+ pigServer.store("C", "output8");
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus[] files = fs.listStatus(new Path("output8"), PART_FILE_FILTER);
+ assertEquals(5, files.length);
+ }
+
+ @Test
public void testFlattenParallelism() throws IOException{
String outputDir = "/tmp/testFlattenParallelism";
String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"
@@ -386,9 +395,9 @@ public class TestTezAutoParallelism {
// When there is a combiner operation involved user specified parallelism is overriden
Util.createLogAppender("testAutoParallelism", writer, classesToLog);
try {
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
pigServer.setBatchOn();
pigServer.registerScript(new ByteArrayInputStream(script.getBytes()));
pigServer.executeBatch();
@@ -416,4 +425,12 @@ public class TestTezAutoParallelism {
Util.deleteFile(cluster, outputDir);
}
}
+
+ private void setProperty(String property, String value) {
+ pigServer.getPigContext().getProperties().setProperty(property, value);
+ }
+
+ private void removeProperty(String property) {
+ pigServer.getPigContext().getProperties().remove(property);
+ }
}
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Wed Feb 22 09:43:41 2017
@@ -20,13 +20,21 @@ package org.apache.pig.tez;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.PrintStream;
import java.util.Properties;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
+import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
@@ -35,7 +43,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
import org.apache.pig.builtin.OrcStorage;
import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
import org.apache.pig.test.Util;
@@ -66,11 +76,14 @@ public class TestTezCompiler {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ resetFileLocalizer();
pc = new PigContext(new TezLocalExecType(), new Properties());
+ FileUtils.deleteDirectory(new File("/tmp/pigoutput"));
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ resetFileLocalizer();
}
@Before
@@ -79,6 +92,7 @@ public class TestTezCompiler {
pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY);
pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION);
pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
+ pc.getProperties().remove(PigConfiguration.PIG_BLOOMJOIN_STRATEGY);
pigServer = new PigServer(pc);
}
@@ -88,13 +102,20 @@ public class TestTezCompiler {
TezPlanContainer.resetScope();
}
+ private static void resetFileLocalizer() {
+ FileLocalizer.deleteTempFiles();
+ FileLocalizer.setInitialized(false);
+ // Set random seed to generate deterministic temporary paths
+ FileLocalizer.setR(new Random(1331L));
+ }
+
@Test
public void testStoreLoad() throws Exception {
String query =
"a = load 'file:///tmp/input' as (x:int, y:int);" +
- "store a into 'file:///tmp/output';" +
- "b = load 'file:///tmp/output' as (x:int, y:int);" +
- "store b into 'file:///tmp/output1';";
+ "store a into 'file:///tmp/pigoutput';" +
+ "b = load 'file:///tmp/pigoutput' as (x:int, y:int);" +
+ "store b into 'file:///tmp/pigoutput1';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-1.gld");
}
@@ -103,25 +124,85 @@ public class TestTezCompiler {
public void testStoreLoadMultiple() throws Exception {
String query =
"a = load 'file:///tmp/input';" +
- "store a into 'file:///tmp/output/Dir1';" +
- "a = load 'file:///tmp/output/Dir1';" +
- "store a into 'file:///tmp/output/Dir2' using BinStorage();" +
- "a = load 'file:///tmp/output/Dir1';" +
- "store a into 'file:///tmp/output/Dir3';" +
- "a = load 'file:///tmp/output/Dir2' using BinStorage();" +
- "store a into 'file:///tmp/output/Dir4';" +
- "a = load 'file:///tmp/output/Dir3';" +
- "b = load 'file:///tmp/output/Dir2' using BinStorage();" +
- "c = load 'file:///tmp/output/Dir1';" +
+ "store a into 'file:///tmp/pigoutput/Dir1';" +
+ "a = load 'file:///tmp/pigoutput/Dir1';" +
+ "store a into 'file:///tmp/pigoutput/Dir2' using BinStorage();" +
+ "a = load 'file:///tmp/pigoutput/Dir1';" +
+ "store a into 'file:///tmp/pigoutput/Dir3';" +
+ "a = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" +
+ "store a into 'file:///tmp/pigoutput/Dir4';" +
+ "a = load 'file:///tmp/pigoutput/Dir3';" +
+ "b = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" +
+ "c = load 'file:///tmp/pigoutput/Dir1';" +
"d = cogroup a by $0, b by $0, c by $0;" +
- "store d into 'file:///tmp/output/Dir5';";
+ "store d into 'file:///tmp/pigoutput/Dir5';";
- // To get around difference in ordering of operators in plan due to JDK7 and JDK8
- if (System.getProperties().getProperty("java.version").startsWith("1.8")) {
- run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld");
- } else {
- run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld");
- }
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld");
+ }
+
+ @Test
+ public void testStoreLoadJoinMultiple() throws Exception {
+ // Case where different store load statements are used in a single join
+ String query =
+ "a = load 'file:///tmp/pigoutput/Dir1';" +
+ "b = filter a by $0 == 1;" +
+ "c = filter a by $0 == 2;" +
+ "store b into 'file:///tmp/pigoutput/Dir2';" +
+ "store c into 'file:///tmp/pigoutput/Dir3';" +
+ "d = load 'file:///tmp/pigoutput/Dir2';" +
+ "e = load 'file:///tmp/pigoutput/Dir3';" +
+ "f = join d by $0, e by $0;" +
+ "store f into 'file:///tmp/pigoutput/Dir5';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-3.gld");
+
+ resetScope();
+ query =
+ "a = load 'file:///tmp/pigoutput/Dir1';" +
+ "b = distinct a;" +
+ "c = group a by $0;" +
+ "store b into 'file:///tmp/pigoutput/Dir2';" +
+ "store c into 'file:///tmp/pigoutput/Dir3';" +
+ "d = load 'file:///tmp/pigoutput/Dir2';" +
+ "e = load 'file:///tmp/pigoutput/Dir3';" +
+ "f = load 'file:///tmp/pigoutput/Dir4';" +
+ "g = join d by $0, f by $0 using 'repl';" +
+ "h = join e by $0, f by $0 using 'repl';" +
+ "store g into 'file:///tmp/pigoutput/Dir4';" +
+ "store h into 'file:///tmp/pigoutput/Dir5';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-4.gld");
+ }
+
+ @Test
+ public void testStoreLoadSplit() throws Exception {
+ // Cases where segmenting into two DAGs is not straight forward due to Split.
+ // The Split operator is required in both the segments.
+
+ resetFileLocalizer();
+ // Split operator as root vertex
+ String query =
+ "a = load 'file:///tmp/input';" +
+ "a1 = filter a by $0 == 5;" +
+ "store a1 into 'file:///tmp/pigoutput/Dir1';" +
+ "b = load 'file:///tmp/pigoutput/Dir1';" +
+ "c = join a by $0, b by $0;" +
+ "store c into 'file:///tmp/pigoutput/Dir2';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-5.gld");
+
+ // Split operator as intermediate vertex
+ query =
+ "a = load 'file:///tmp/input';" +
+ "a = distinct a;" +
+ "store a into 'file:///tmp/pigoutput/Dir1';" +
+ "b = load 'file:///tmp/pigoutput/Dir1';" +
+ "c = join a by $0, b by $0;" +
+ "store c into 'file:///tmp/pigoutput/Dir2';";
+
+ resetScope();
+ resetFileLocalizer();
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld");
}
@Test
@@ -129,7 +210,7 @@ public class TestTezCompiler {
String query =
"a = load 'file:///tmp/input' as (x:int, y:int);" +
"b = native 'hadoop-examples.jar' Store a into '/tmp/table_testNativeMRJobSimple_input' Load '/tmp/table_testNativeMRJobSimple_output' `wordcount /tmp/table_testNativeMRJobSimple_input /tmp/table_testNativeMRJobSimple_output`;" +
- "store b into 'file:///tmp/output';";
+ "store b into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Native-1.gld");
}
@@ -140,7 +221,7 @@ public class TestTezCompiler {
"a = load 'file:///tmp/input' as (x:int, y:int);" +
"b = filter a by x > 0;" +
"c = foreach b generate y;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld");
}
@@ -151,7 +232,7 @@ public class TestTezCompiler {
"a = load 'file:///tmp/input' as (x:int, y:int);" +
"b = group a by x;" +
"c = foreach b generate group, COUNT(a.x);" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld");
}
@@ -163,12 +244,131 @@ public class TestTezCompiler {
"b = load 'file:///tmp/input2' as (x:int, z:int);" +
"c = join a by x, b by x;" +
"d = foreach c generate a::x as x, y, z;" +
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld");
}
@Test
+ public void testBloomJoin() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x, y:int);" +
+ "b = load 'file:///tmp/input2' as (x, z:int);" +
+ "c = load 'file:///tmp/input2' as (x, w:int);" +
+ "d = join b by x, a by x, c by x using 'bloom';" +
+ "e = foreach d generate a::x as x, y, z, w;" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld");
+ }
+
+ @Test
+ public void testBloomJoinLeftOuter() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:chararray, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:chararray, z:int);" +
+ "d = join a by x left, b by x using 'bloom';" +
+ "e = foreach d generate a::x as x, y, z;" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld");
+ }
+
+ @Test
+ public void testBloomJoinUnion() throws Exception {
+ // Left input from a union
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "c = load 'file:///tmp/input3' as (x:int, z:int);" +
+ "b = union b, c;" +
+ "d = join a by x, b by x using 'bloom';" +
+ "e = foreach d generate a::x as x, y, z;" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld");
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null);
+
+ resetScope();
+ // Right input from a union
+ query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "c = load 'file:///tmp/input3' as (x:int, z:int);" +
+ "b = union b, c;" +
+ "d = join b by x, a by x using 'bloom';" +
+ "e = foreach d generate a::x as x, y, z;" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ // Needs shared edges and PIG-3856 to be a more optimial plan
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld");
+ }
+
+ @Test
+ public void testBloomJoinSplit() throws Exception {
+ // Left input from a split
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "a1 = filter a by x == 3;" +
+ "a2 = filter a by x == 4;" +
+ "d = join a1 by x, a2 by x, b by x using 'bloom';" +
+ "e = foreach d generate a1::x as x, a1::y as y1, a2::y as y2, z;" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld");
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null);
+
+ resetScope();
+ // Right input from a split
+ query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "a1 = filter a by x == 3;" +
+ "a2 = filter a by x == 4;" +
+ "d = join b by x, a1 by x using 'bloom';" +
+ "e = foreach d generate a1::x as x, y, z;" +
+ "store a2 into 'file:///tmp/pigoutput/a2';" +
+ "store e into 'file:///tmp/pigoutput/e';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld");
+ }
+
+ @Test
+ public void testBloomSelfJoin() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = filter a by x < 5;" +
+ "c = filter a by x == 10;" +
+ "d = filter a by x > 10;" +
+ "e = join b by x, c by x, d by x using 'bloom';" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld");
+ }
+
+ @Test
public void testSelfJoin() throws Exception {
String query =
"a = load 'file:///tmp/input1' as (x:int, y:int);" +
@@ -176,7 +376,7 @@ public class TestTezCompiler {
"c = filter a by x == 10;" +
"d = filter a by x > 10;" +
"e = join b by x, c by x, d by x;" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-1.gld");
}
@@ -188,7 +388,7 @@ public class TestTezCompiler {
"b = filter a by x < 5;" +
"c = filter a by x == 10;" +
"d = join b by x, c by x using 'skewed';" +
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld");
}
@@ -201,7 +401,7 @@ public class TestTezCompiler {
"c = filter a by x == 10;" +
"d = filter a by x > 10;" +
"e = join b by x, c by x, d by x using 'replicated';" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-3.gld");
}
@@ -213,7 +413,7 @@ public class TestTezCompiler {
"b = load 'file:///tmp/input2' as (x:int, z:int);" +
"c = union a, b;" +
"d = join b by x, c by x using 'replicated';" +
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld");
}
@@ -226,7 +426,7 @@ public class TestTezCompiler {
"a2 = filter a by x < 2;" +
"b = union a1, a2;" +
"c = join b by x, a by x;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld");
}
@@ -242,7 +442,7 @@ public class TestTezCompiler {
"a5 = foreach a4 generate a2::x as x, a3::y as y;" +
"b = union a1, a5;" +
"c = join b by x, a by x;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld");
}
@@ -254,7 +454,7 @@ public class TestTezCompiler {
"b = load 'file:///tmp/input2' as (x:int, z:int);" +
"c = cross a, b;" +
"d = foreach c generate a::x as x, y, z;" +
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-1.gld");
}
@@ -266,7 +466,7 @@ public class TestTezCompiler {
"b = filter a by x < 5;" +
"c = filter a by x == 10;" +
"d = cross b, c;" +
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-2.gld");
}
@@ -278,7 +478,7 @@ public class TestTezCompiler {
"b = load 'file:///tmp/input2' as (x:int, z:int);" +
"c = cross b, a;" +
"d = foreach c generate a.x, a.y, z;" + //Scalar
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld");
}
@@ -290,7 +490,7 @@ public class TestTezCompiler {
"b = load 'file:///tmp/input2' as (x:int, z:int);" +
"c = join a by x, b by x using 'skewed';" +
"d = foreach c generate a::x as x, y, z;" +
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld");
}
@@ -303,7 +503,7 @@ public class TestTezCompiler {
"b = load 'file:///tmp/input2' as (x:int, z:int);" +
"c = join a by x, b by x using 'skewed';" +
"d = foreach c generate a::x as x, y, z;" +
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld");
}
@@ -314,7 +514,7 @@ public class TestTezCompiler {
"a = load 'file:///tmp/input' as (x:int, y:int);" +
"b = limit a 10;" +
"c = foreach b generate y;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld");
}
@@ -325,7 +525,7 @@ public class TestTezCompiler {
"a = load 'file:///tmp/input' as (x:int, y:int);" +
"b = order a by x, y;" +
"c = limit b 10;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld");
}
@@ -338,18 +538,31 @@ public class TestTezCompiler {
"g = group a all;" +
"h = foreach g generate COUNT(a) as sum;" +
"c = limit b h.sum/2;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld");
}
@Test
+ public void testLimitReplJoin() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input' as (x:int, y:int);" +
+ "c = limit a 1;" +
+ "d = join c by x, b by x using 'replicated';" +
+ "store a into 'file:///tmp/pigoutput/a';" +
+ "store d into 'file:///tmp/pigoutput/d';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-4.gld");
+ }
+
+ @Test
public void testDistinct() throws Exception {
String query =
"a = load 'file:///tmp/input' as (x:int, y:int);" +
"b = distinct a;" +
"c = foreach b generate y;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld");
}
@@ -360,7 +573,7 @@ public class TestTezCompiler {
"a = load 'file:///tmp/input' as (x:int, y:int);" +
"b = group a by x;" +
"c = foreach b { d = distinct a; generate COUNT(d); };" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld");
}
@@ -374,7 +587,7 @@ public class TestTezCompiler {
"b = load 'file:///tmp/input2' as (x:int, z:int);" +
"c = load 'file:///tmp/input3' as (x:int, z:int);" +
"d = join a by x, b by x, c by x using 'replicated';" +
- "store d into 'file:///tmp/output/d';";
+ "store d into 'file:///tmp/pigoutput/d';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld");
}
@@ -387,7 +600,7 @@ public class TestTezCompiler {
"b1 = foreach b generate group, COUNT(a.y);" +
"c = load 'file:///tmp/input2' as (x:int, z:int);" +
"d = join b1 by group, c by x using 'replicated';" +
- "store d into 'file:///tmp/output/e';";
+ "store d into 'file:///tmp/pigoutput/e';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld");
}
@@ -397,7 +610,7 @@ public class TestTezCompiler {
String query =
"a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" +
"b = stream a through `stream.pl -n 5`;" +
- "STORE b INTO 'file:///tmp/output';";
+ "STORE b INTO 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld");
}
@@ -408,7 +621,7 @@ public class TestTezCompiler {
"a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int, z:int);" +
"b = group a by $0;" +
"c = foreach b { d = limit a 10; e = order d by $1; f = order e by $0; generate group, f;};"+
- "store c INTO 'file:///tmp/output';";
+ "store c INTO 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld");
@@ -422,7 +635,7 @@ public class TestTezCompiler {
String query =
"a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" +
"b = order a by x;" +
- "STORE b INTO 'file:///tmp/output';";
+ "STORE b INTO 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld");
}
@@ -433,7 +646,7 @@ public class TestTezCompiler {
"a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" +
"b = filter a by x == 1;" +
"c = order b by x;" +
- "STORE c INTO 'file:///tmp/output';";
+ "STORE c INTO 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld");
}
@@ -444,7 +657,7 @@ public class TestTezCompiler {
String query =
"a = load 'file:///tmp/input' using org.apache.pig.backend.hadoop.hbase.HBaseStorage(',') as (x:int, y:int);" +
"b = order a by x;" +
- "STORE b INTO 'file:///tmp/output';";
+ "STORE b INTO 'file:///tmp/pigoutput';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld");
setProperty("pig.sort.readonce.loadfuncs", null);
@@ -459,7 +672,7 @@ public class TestTezCompiler {
"b = load 'file:///tmp/input2' as (x:int, z:int);" +
"c = cogroup a by x, b by x;" +
"d = foreach c generate group, COUNT(a.y), COUNT(b.z);" +
- "store d into 'file:///tmp/output/d';";
+ "store d into 'file:///tmp/pigoutput/d';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld");
}
@@ -469,9 +682,9 @@ public class TestTezCompiler {
String query =
"a = load 'file:///tmp/input' as (x:int, y:int);" +
"split a into b if x <= 5, c if x <= 10, d if x >10;" +
- "store b into 'file:///tmp/output/b';" +
- "store c into 'file:///tmp/output/c';" +
- "store d into 'file:///tmp/output/d';";
+ "store b into 'file:///tmp/pigoutput/b';" +
+ "store c into 'file:///tmp/pigoutput/c';" +
+ "store d into 'file:///tmp/pigoutput/d';";
setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld");
@@ -500,14 +713,14 @@ public class TestTezCompiler {
// Needs to be removed in Tez plan as well.
"f1 = limit f 1;" +
"f2 = union d1, f1;" +
- "store b1 into 'file:///tmp/output/b1';" +
- "store b2 into 'file:///tmp/output/b2';" +
- "store c1 into 'file:///tmp/output/c1';" +
- "store c3 into 'file:///tmp/output/c1';" +
- "store d1 into 'file:///tmp/output/d1';" +
- "store e1 into 'file:///tmp/output/e1';" +
- "store f1 into 'file:///tmp/output/f1';" +
- "store f2 into 'file:///tmp/output/f2';";
+ "store b1 into 'file:///tmp/pigoutput/b1';" +
+ "store b2 into 'file:///tmp/pigoutput/b2';" +
+ "store c1 into 'file:///tmp/pigoutput/c1';" +
+ "store c3 into 'file:///tmp/pigoutput/c1';" +
+ "store d1 into 'file:///tmp/pigoutput/d1';" +
+ "store e1 into 'file:///tmp/pigoutput/e1';" +
+ "store f1 into 'file:///tmp/pigoutput/f1';" +
+ "store f2 into 'file:///tmp/pigoutput/f2';";
setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld");
@@ -523,8 +736,8 @@ public class TestTezCompiler {
"b = foreach b generate group, COUNT(a.x);" +
"c = group a by (x,y);" +
"c = foreach c generate group, COUNT(a.y);" +
- "store b into 'file:///tmp/output/b';" +
- "store c into 'file:///tmp/output/c';";
+ "store b into 'file:///tmp/pigoutput/b';" +
+ "store c into 'file:///tmp/pigoutput/c';";
setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld");
@@ -540,9 +753,9 @@ public class TestTezCompiler {
"c = join a by x, b by x;" +
"d = foreach c generate $0, $1, $3;" +
"e = foreach c generate $0, $1, $2, $3;" +
- "store c into 'file:///tmp/output/c';" +
- "store d into 'file:///tmp/output/d';" +
- "store e into 'file:///tmp/output/e';";
+ "store c into 'file:///tmp/pigoutput/c';" +
+ "store d into 'file:///tmp/pigoutput/d';" +
+ "store e into 'file:///tmp/pigoutput/e';";
setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld");
@@ -555,13 +768,13 @@ public class TestTezCompiler {
String query =
"a = load 'file:///tmp/input' as (x:int, y:int);" +
"b = group a by x;" + //b: {group: int,a: {(x: int,y: int)}}
- "store b into 'file:///tmp/output/b';" +
+ "store b into 'file:///tmp/pigoutput/b';" +
"c = foreach b generate a.x, a.y;" + //c: {{(x: int)},{(y: int)}}
- "store c into 'file:///tmp/output/c';" +
+ "store c into 'file:///tmp/pigoutput/c';" +
"d = foreach b GENERATE FLATTEN(a);" + //d: {a::x: int,a::y: int}
- "store d into 'file:///tmp/output/d';" +
+ "store d into 'file:///tmp/pigoutput/d';" +
"e = foreach d GENERATE a::x, a::y;" +
- "store e into 'file:///tmp/output/e';";
+ "store e into 'file:///tmp/pigoutput/e';";
setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld");
@@ -576,8 +789,8 @@ public class TestTezCompiler {
"b = group a by x;" +
"c = foreach b generate group, COUNT(a) as cnt;" +
"SPLIT a into d if (2 * c.cnt) < y, e OTHERWISE;" +
- "store d into 'file:///tmp/output1';" +
- "store e into 'file:///tmp/output2';";
+ "store d into 'file:///tmp/pigoutput1';" +
+ "store e into 'file:///tmp/pigoutput2';";
setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6.gld");
@@ -594,7 +807,7 @@ public class TestTezCompiler {
"c = join a by $0, b by $0 using 'replicated';" +
"d = join a by $1, b by $1 using 'replicated';" +
"e = union c,d;" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7.gld");
@@ -613,7 +826,7 @@ public class TestTezCompiler {
"c = foreach c generate $0 as c1;" +
"d = group a by x;" +
"e = foreach d generate group, b.b1, c.c1;" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld");
@@ -623,18 +836,67 @@ public class TestTezCompiler {
}
@Test
+ public void testMultiQueryMultipleReplicateJoinWithUnion() throws Exception {
+ // Replicate joins are from a split
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, y:int);" +
+ "c = load 'file:///tmp/input3' as (x:int, y:int);" +
+ "d = union a, b;" +
+ "e = filter c by y < 2;" +
+ "f = filter c by y > 5;" +
+ "g = join d by x, e by x using 'replicated';" +
+ "h = join g by d::x, f by x using 'replicated';" +
+ "store h into 'file:///tmp/pigoutput';";
+
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9-OPTOFF.gld");
+
+ // Union is also from a split
+ query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = filter a by x == 2;" +
+ "c = load 'file:///tmp/input3' as (x:int, y:int);" +
+ "d = union a, b;" +
+ "e = filter c by y < 2;" +
+ "f = filter c by y > 5;" +
+ "g = join d by x, e by x using 'replicated';" +
+ "h = join g by d::x, f by x using 'replicated';" +
+ "store h into 'file:///tmp/pigoutput';";
+
+ resetScope();
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10-OPTOFF.gld");
+ }
+
+ @Test
public void testUnionStore() throws Exception {
String query =
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +
"b = load 'file:///tmp/input' as (y:chararray, x:int);" +
"c = union onschema a, b;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld");
resetScope();
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+ query =
+ "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+ "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+ "c = union onschema a, b PARALLEL 15;" +
+ "store c into 'file:///tmp/pigoutput';";
+ // Union optimization should be turned off if PARALLEL clause is specified
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
}
@Test
@@ -643,14 +905,15 @@ public class TestTezCompiler {
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +
"b = load 'file:///tmp/input' as (y:chararray, x:int);" +
"c = union onschema a, b;" +
- "store c into 'file:///tmp/output';";
+ "store c into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
String oldSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS);
String oldUnSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
- // Plan should not have union optimization applied
+ // Plan should not have union optimization applied as PigStorage is unsupported
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+
resetScope();
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, null);
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, OrcStorage.class.getName());
@@ -658,27 +921,37 @@ public class TestTezCompiler {
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +
"b = load 'file:///tmp/input' as (y:chararray, x:int);" +
"c = union onschema a, b;" +
- "store c into 'file:///tmp/output' using " + DummyStoreWithOutputFormat.class.getName() + "();";
- // Plan should not have union optimization applied
+ "store c into 'file:///tmp/pigoutput' using " + DummyStoreWithOutputFormat.class.getName() + "();";
+ // Plan should not have union optimization applied as only ORC is supported
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
resetScope();
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null);
+ query =
+ "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+ "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+ "c = union onschema a, b;" +
+ "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();";
+ // Plan should not have union optimization applied as supportsParallelWriteToStoreLocation returns false
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld");
+
+ resetScope();
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null);
query =
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +
"split a into b if x > 5, c if x == 7, d if x == 8, e otherwise;" +
"u1 = union onschema b, c;" +
- "store u1 into 'file:///tmp/output/u1';" +
+ "store u1 into 'file:///tmp/pigoutput/u1';" +
//TODO: multiple levels of split not merged
"u2 = union onschema a, b, c;" +
- "store u2 into 'file:///tmp/output/u2';" +
+ "store u2 into 'file:///tmp/pigoutput/u2';" +
"u3 = union onschema d, e;" +
- "store u3 into 'file:///tmp/output/u3';" +
+ "store u3 into 'file:///tmp/pigoutput/u3';" +
"j1 = join d by x, a by x using 'replicated';" +
"j1 = foreach j1 generate d::x as x, d::y as y;" +
"u4 = union onschema j1, a;" +
- "store u4 into 'file:///tmp/output/u4';";
+ "store u4 into 'file:///tmp/pigoutput/u4';";
// Plan should have union optimization applied even for unsupported storefunc
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld");
@@ -696,7 +969,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = group c by x;" +
"e = foreach d generate group, SUM(c.y);" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld");
@@ -712,7 +985,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = join c by x, d by x;" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld");
@@ -729,7 +1002,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = join c by x, d by x using 'replicated';" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
//TODO: PIG-3856 Not optimized
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -743,7 +1016,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = join d by x, c by x using 'replicated';" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
// Optimized
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -762,7 +1035,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = join c by x, d by x using 'skewed';" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld");
@@ -779,7 +1052,7 @@ public class TestTezCompiler {
"b = load 'file:///tmp/input' as (y:chararray, x:int);" +
"c = union onschema a, b;" +
"d = order c by x;" +
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld");
@@ -795,7 +1068,7 @@ public class TestTezCompiler {
"b = load 'file:///tmp/input' as (y:chararray, x:int);" +
"c = union onschema a, b;" +
"d = limit c 1;" +
- "store d into 'file:///tmp/output';";
+ "store d into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld");
@@ -811,9 +1084,9 @@ public class TestTezCompiler {
"split a into a1 if x > 100, a2 otherwise;" +
"c = union onschema a1, a2, b;" +
"split c into d if x > 500, e otherwise;" +
- "store a2 into 'file:///tmp/output/a2';" +
- "store d into 'file:///tmp/output/d';" +
- "store e into 'file:///tmp/output/e';";
+ "store a2 into 'file:///tmp/pigoutput/a2';" +
+ "store d into 'file:///tmp/pigoutput/d';" +
+ "store e into 'file:///tmp/pigoutput/e';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld");
@@ -831,8 +1104,8 @@ public class TestTezCompiler {
"d = load 'file:///tmp/input1' as (x:int, y:chararray);" +
"e = union onschema c, d;" +
"f = group e by x;" +
- "store e into 'file:///tmp/output1';" +
- "store f into 'file:///tmp/output2';";
+ "store e into 'file:///tmp/pigoutput1';" +
+ "store f into 'file:///tmp/pigoutput2';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld");
@@ -850,7 +1123,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, y:chararray);" +
"e = union onschema c, d;" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld");
@@ -872,10 +1145,10 @@ public class TestTezCompiler {
"c2 = foreach c generate y, x;" +
"c3 = union c1, c2;" +
"a1 = union onschema b3, c3;" +
- "store a1 into 'file:///tmp/output1';" +
+ "store a1 into 'file:///tmp/pigoutput1';" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = join a1 by x, d by x using 'skewed';" +
- "store e into 'file:///tmp/output2';";
+ "store e into 'file:///tmp/pigoutput2';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12.gld");
@@ -892,7 +1165,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = join c by x, d by x using 'replicated';" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld");
@@ -906,7 +1179,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = join d by x, c by x using 'replicated';" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
resetScope();
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -924,7 +1197,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = join c by x, d by x using 'skewed';" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld");
@@ -938,7 +1211,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = join d by x, c by x using 'skewed';" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
resetScope();
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -956,7 +1229,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = filter c by x == d.x;" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17.gld");
@@ -970,7 +1243,7 @@ public class TestTezCompiler {
"c = union onschema a, b;" +
"d = load 'file:///tmp/input1' as (x:int, z:chararray);" +
"e = filter d by x == c.x;" +
- "store e into 'file:///tmp/output';";
+ "store e into 'file:///tmp/pigoutput';";
resetScope();
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
@@ -981,11 +1254,76 @@ public class TestTezCompiler {
}
@Test
+ public void testUnionSplitUnionStore() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+ "b = load 'file:///tmp/input1' as (y:chararray, x:int);" +
+ "c = union onschema a, b;" +
+ "split c into d if x <= 5, e if x <= 10, f if x >10, g if y == '6';" +
+ "h = union onschema d, e;" +
+ "i = union onschema f, g;" +
+ "store h into 'file:///tmp/pigoutput/1';" +
+ "store i into 'file:///tmp/pigoutput/2';";
+
+ resetScope();
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19-OPTOFF.gld");
+
+ // With a join in between
+ query =
+ "a = load 'file:///tmp/input' as (x:chararray);" +
+ "b = load 'file:///tmp/input' as (x:chararray);" +
+ "c = load 'file:///tmp/input' as (y:chararray);" +
+ "u1 = union onschema a, b;" +
+ "SPLIT u1 INTO r IF x != '', s OTHERWISE;" +
+ "d = JOIN r BY x LEFT, c BY y;" +
+ "u2 = UNION ONSCHEMA d, s;" +
+ "e = FILTER u2 BY x == '';" +
+ "f = FILTER u2 BY x == 'm';" +
+ "u3 = UNION ONSCHEMA e, f;" +
+ "store u3 into 'file:///tmp/pigoutput';";
+
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20-OPTOFF.gld");
+ }
+
+ @Test
+ public void testUnionSplitUnionLimitStore() throws Exception {
+ // Similar to previous testcase but a LIMIT at the end to test a non-store vertex group
+ String query =
+ "a = load 'file:///tmp/input' as (x:chararray);" +
+ "b = load 'file:///tmp/input' as (x:chararray);" +
+ "c = load 'file:///tmp/input' as (y:chararray);" +
+ "u1 = union onschema a, b;" +
+ "SPLIT u1 INTO r IF x != '', s OTHERWISE;" +
+ "d = JOIN r BY x LEFT, c BY y;" +
+ "u2 = UNION ONSCHEMA d, s;" +
+ "e = FILTER u2 BY x == '';" +
+ "f = FILTER u2 BY x == 'm';" +
+ "u3 = UNION ONSCHEMA e, f;" +
+ "SPLIT u3 INTO t if x != '', u OTHERWISE;" +
+ "v = LIMIT t 10;" +
+ "store v into 'file:///tmp/pigoutput';";
+
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21-OPTOFF.gld");
+ }
+
+ @Test
public void testRank() throws Exception {
String query =
"a = load 'file:///tmp/input1' as (x:int, y:int);" +
"b = rank a;" +
- "store b into 'file:///tmp/output/d';";
+ "store b into 'file:///tmp/pigoutput/d';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-1.gld");
}
@@ -996,7 +1334,7 @@ public class TestTezCompiler {
String query =
"a = load 'file:///tmp/input1' as (x:int, y:int);" +
"b = rank a by x;" +
- "store b into 'file:///tmp/output/d';";
+ "store b into 'file:///tmp/pigoutput/d';";
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld");
}
@@ -1052,5 +1390,32 @@ public class TestTezCompiler {
assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)),
TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean)));
}
+
+ public static class TestDummyStoreFunc extends StoreFunc {
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job)
+ throws IOException {
+ }
+
+ @Override
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+ }
+
+ @Override
+ public void putNext(Tuple t) throws IOException {
+ }
+
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return false;
+ }
+
+ }
}