You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/08/05 22:21:54 UTC
svn commit: r1694320 - in /pig/trunk: ./ conf/ src/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/
test/org/apache/pig/test/data/GoldenFiles/tez/ test/org...
Author: rohini
Date: Wed Aug 5 20:21:53 2015
New Revision: 1694320
URL: http://svn.apache.org/r1694320
Log:
PIG-4649: [Pig on Tez] Union followed by HCatStorer misses some data (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/trunk/src/pig-default.properties
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld
pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug 5 20:21:53 2015
@@ -38,6 +38,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4649: [Pig on Tez] Union followed by HCatStorer misses some data (rohini)
+
PIG-4636: Occurred spelled incorrectly in error message for Launcher and POMergeCogroup (stevenmz via daijy)
PIG-4624: Error on ORC empty file without schema (daijy)
Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Wed Aug 5 20:21:53 2015
@@ -568,12 +568,6 @@ hcat.bin=/usr/local/hcat/bin/hcat
#
# opt.fetch=true
-# Enable auto/grace parallelism in tez. These should be used by default unless
-# you encounter some bug in automatic parallelism. If pig.tez.auto.parallelism
-# to false, use 1 as default parallelism
-pig.tez.auto.parallelism=true
-pig.tez.grace.parallelism=true
-
###########################################################################
#
# Streaming properties
@@ -600,3 +594,23 @@ pig.tez.grace.parallelism=true
# python2.7.
#
# pig.streaming.udf.python.command=python
+
+###########################################################################
+#
+# Tez specific properties
+#
+
+# Enable auto/grace parallelism in tez. Default is true and these should be
+# used by default unless you encounter some bug in automatic parallelism.
+# If pig.tez.auto.parallelism is set to false, 1 is used as default parallelism
+
+#pig.tez.auto.parallelism=true
+#pig.tez.grace.parallelism=true
+
+# Union optimization (pig.tez.opt.union=true) in tez uses vertex groups to store
+# output from different vertices into one final output location.
+# If a StoreFunc's OutputCommitter does not work with multiple vertices
+# writing to same location, then you can disable union optimization just
+# for that StoreFunc. Refer PIG-4649
+
+#pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Wed Aug 5 20:21:53 2015
@@ -60,6 +60,7 @@ public class PigConfiguration {
* This key is used to enable or disable union optimization in tez. True by default
*/
public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union";
+ public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS = "pig.tez.opt.union.unsupported.storefuncs";
/**
* Boolean value to enable or disable partial aggregation in map. Disabled by default
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Wed Aug 5 20:21:53 2015
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.hadoop.util.StringUtils;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
@@ -417,11 +419,18 @@ public class TezLauncher extends Launche
}
boolean isUnionOpt = conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
+ List<String> unionUnsupportedStoreFuncs = null;
+ String unionUnSupported = conf.get(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
+ if (unionUnSupported != null && unionUnSupported.trim().length() > 0) {
+ unionUnsupportedStoreFuncs = Arrays
+ .asList(StringUtils.split(unionUnSupported.trim()));
+ }
+
boolean isMultiQuery = conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
if (isMultiQuery) {
// reduces the number of TezOpers in the Tez plan generated
// by multi-query (multi-store) script.
- MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan, isUnionOpt);
+ MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan, isUnionOpt, unionUnsupportedStoreFuncs);
mqOptimizer.visit();
}
@@ -434,7 +443,7 @@ public class TezLauncher extends Launche
// Use VertexGroup in Tez
if (isUnionOpt) {
- UnionOptimizer uo = new UnionOptimizer(tezPlan);
+ UnionOptimizer uo = new UnionOptimizer(tezPlan, unionUnsupportedStoreFuncs);
uo.visit();
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Wed Aug 5 20:21:53 2015
@@ -46,10 +46,12 @@ import org.apache.pig.impl.plan.VisitorE
public class MultiQueryOptimizerTez extends TezOpPlanVisitor {
private boolean unionOptimizerOn;
+ private List<String> unionUnsupportedStoreFuncs;
- public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn) {
+ public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn, List<String> unionUnsupportedStoreFuncs) {
super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
this.unionOptimizerOn = unionOptimizerOn;
+ this.unionUnsupportedStoreFuncs = unionUnsupportedStoreFuncs;
}
private void addAllPredecessors(TezOperator tezOp, List<TezOperator> predsList) {
@@ -133,7 +135,7 @@ public class MultiQueryOptimizerTez exte
for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) {
if (succSuccessor.isUnion()) {
if (!(unionOptimizerOn
- && UnionOptimizer.isOptimizable(succSuccessor))) {
+ && UnionOptimizer.isOptimizable(succSuccessor, unionUnsupportedStoreFuncs))) {
toMergeSuccessors.add(succSuccessor);
}
} else if (successors.contains(succSuccessor)) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Wed Aug 5 20:21:53 2015
@@ -71,15 +71,27 @@ import org.apache.tez.runtime.library.ou
public class UnionOptimizer extends TezOpPlanVisitor {
private TezOperPlan tezPlan;
- public UnionOptimizer(TezOperPlan plan) {
+ private List<String> unsupportedStoreFuncs;
+
+ public UnionOptimizer(TezOperPlan plan, List<String> unsupportedStoreFuncs) {
super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
tezPlan = plan;
+ this.unsupportedStoreFuncs = unsupportedStoreFuncs;
}
- public static boolean isOptimizable(TezOperator tezOp) {
+ public static boolean isOptimizable(TezOperator tezOp, List<String> unsupportedStoreFuncs)
+ throws VisitorException {
if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) {
return false;
}
+ if (unsupportedStoreFuncs != null) {
+ List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
+ for (POStoreTez store : stores) {
+ if (unsupportedStoreFuncs.contains(store.getStoreFunc().getClass().getName())) {
+ return false;
+ }
+ }
+ }
return true;
}
@@ -89,7 +101,7 @@ public class UnionOptimizer extends TezO
return;
}
- if (!isOptimizable(tezOp)) {
+ if (!isOptimizable(tezOp, unsupportedStoreFuncs)) {
return;
}
Modified: pig/trunk/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/trunk/src/pig-default.properties?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/src/pig-default.properties (original)
+++ pig/trunk/src/pig-default.properties Wed Aug 5 20:21:53 2015
@@ -58,3 +58,5 @@ pig.output.committer.recovery.support=fa
pig.stats.output.size.reader=org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader
pig.stats.output.size.reader.unsupported=org.apache.pig.builtin.mock.Storage,org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage
+
+pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer
\ No newline at end of file
Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld (original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld Wed Aug 5 20:21:53 2015
@@ -2,44 +2,44 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: pig-0_scope-1
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
-Tez vertex scope-45 -> Tez vertex scope-47,
-Tez vertex scope-46 -> Tez vertex scope-47,
-Tez vertex scope-47
+Tez vertex scope-18 -> Tez vertex scope-20,
+Tez vertex scope-19 -> Tez vertex scope-20,
+Tez vertex scope-20
-Tez vertex scope-45
+Tez vertex scope-18
# Plan on vertex
-POValueOutputTez - scope-49 -> [scope-47]
+POValueOutputTez - scope-22 -> [scope-20]
|
-|---a: New For Each(false,false)[bag] - scope-34
+|---a: New For Each(false,false)[bag] - scope-7
| |
- | Cast[int] - scope-29
+ | Cast[int] - scope-2
| |
- | |---Project[bytearray][0] - scope-28
+ | |---Project[bytearray][0] - scope-1
| |
- | Cast[chararray] - scope-32
+ | Cast[chararray] - scope-5
| |
- | |---Project[bytearray][1] - scope-31
+ | |---Project[bytearray][1] - scope-4
|
- |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-27
-Tez vertex scope-46
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
# Plan on vertex
-POValueOutputTez - scope-50 -> [scope-47]
+POValueOutputTez - scope-23 -> [scope-20]
|
-|---c: New For Each(false,false)[bag] - scope-42
+|---c: New For Each(false,false)[bag] - scope-15
| |
- | Cast[int] - scope-37
+ | Cast[int] - scope-10
| |
- | |---Project[bytearray][1] - scope-36
+ | |---Project[bytearray][1] - scope-9
| |
- | Cast[chararray] - scope-40
+ | Cast[chararray] - scope-13
| |
- | |---Project[bytearray][0] - scope-39
+ | |---Project[bytearray][0] - scope-12
|
- |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-35
-Tez vertex scope-47
+ |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-20
# Plan on vertex
-c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-44
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-17
|
-|---POShuffledValueInputTez - scope-48 <- [scope-45, scope-46]
+|---POShuffledValueInputTez - scope-21 <- [scope-18, scope-19]
Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Wed Aug 5 20:21:53 2015
@@ -33,6 +33,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.test.Util;
@@ -499,11 +500,29 @@ public class TestTezCompiler {
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld");
+ resetScope();
setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
}
@Test
+ public void testUnionUnSupportedStore() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+ "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+ "c = union onschema a, b;" +
+ "store c into 'file:///tmp/output';";
+
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+ String oldConfigValue = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
+ // Plan should not have union optimization applied
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+ // Restore the value
+ setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, oldConfigValue);
+ }
+
+ @Test
public void testUnionGroupBy() throws Exception {
String query =
"a = load 'file:///tmp/input' as (x:int, y:int);" +
@@ -815,8 +834,16 @@ public class TestTezCompiler {
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld");
}
+ private String getProperty(String property) {
+ return pigServer.getPigContext().getProperties().getProperty(property);
+ }
+
private void setProperty(String property, String value) {
- pigServer.getPigContext().getProperties().setProperty(property, value);
+ if (value == null) {
+ pigServer.getPigContext().getProperties().remove(property);
+ } else {
+ pigServer.getPigContext().getProperties().setProperty(property, value);
+ }
}
private void run(String query, String expectedFile) throws Exception {