You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/08/05 22:21:54 UTC

svn commit: r1694320 - in /pig/trunk: ./ conf/ src/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ test/org/apache/pig/test/data/GoldenFiles/tez/ test/org...

Author: rohini
Date: Wed Aug  5 20:21:53 2015
New Revision: 1694320

URL: http://svn.apache.org/r1694320
Log:
PIG-4649: [Pig on Tez] Union followed by HCatStorer misses some data (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/trunk/src/pig-default.properties
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug  5 20:21:53 2015
@@ -38,6 +38,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4649: [Pig on Tez] Union followed by HCatStorer misses some data (rohini)
+
 PIG-4636: Occurred spelled incorrectly in error message for Launcher and POMergeCogroup (stevenmz via daijy)
 
 PIG-4624: Error on ORC empty file without schema (daijy)

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Wed Aug  5 20:21:53 2015
@@ -568,12 +568,6 @@ hcat.bin=/usr/local/hcat/bin/hcat
 #
 # opt.fetch=true
 
-# Enable auto/grace parallelism in tez. These should be used by default unless
-# you encounter some bug in automatic parallelism. If pig.tez.auto.parallelism
-# to false, use 1 as default parallelism
-pig.tez.auto.parallelism=true
-pig.tez.grace.parallelism=true
-
 ###########################################################################
 #
 # Streaming properties
@@ -600,3 +594,23 @@ pig.tez.grace.parallelism=true
 # python2.7.
 #
 # pig.streaming.udf.python.command=python
+
+###########################################################################
+#
+# Tez specific properties
+#
+
+# Enable auto/grace parallelism in tez. Default is true and these should be 
+# used by default unless you encounter some bug in automatic parallelism.
+# If pig.tez.auto.parallelism is set to false, 1 is used as default parallelism
+
+#pig.tez.auto.parallelism=true
+#pig.tez.grace.parallelism=true
+
+# Union optimization (pig.tez.opt.union=true) in tez uses vertex groups to store
+# output from different vertices into one final output location.
+# If a StoreFunc's OutputCommitter does not work with multiple vertices
+# writing to same location, then you can disable union optimization just
+# for that StoreFunc. Refer PIG-4649
+
+#pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Wed Aug  5 20:21:53 2015
@@ -60,6 +60,7 @@ public class PigConfiguration {
      * This key is used to enable or disable union optimization in tez. True by default
      */
     public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union";
+    public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS = "pig.tez.opt.union.unsupported.storefuncs";
 
     /**
      * Boolean value to enable or disable partial aggregation in map. Disabled by default

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Wed Aug  5 20:21:53 2015
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
@@ -417,11 +419,18 @@ public class TezLauncher extends Launche
         }
 
         boolean isUnionOpt = conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
+        List<String> unionUnsupportedStoreFuncs = null;
+        String unionUnSupported = conf.get(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
+        if (unionUnSupported != null && unionUnSupported.trim().length() > 0) {
+            unionUnsupportedStoreFuncs = Arrays
+                    .asList(StringUtils.split(unionUnSupported.trim()));
+        }
+
         boolean isMultiQuery = conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
         if (isMultiQuery) {
             // reduces the number of TezOpers in the Tez plan generated
             // by multi-query (multi-store) script.
-            MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan, isUnionOpt);
+            MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan, isUnionOpt, unionUnsupportedStoreFuncs);
             mqOptimizer.visit();
         }
 
@@ -434,7 +443,7 @@ public class TezLauncher extends Launche
 
         // Use VertexGroup in Tez
         if (isUnionOpt) {
-            UnionOptimizer uo = new UnionOptimizer(tezPlan);
+            UnionOptimizer uo = new UnionOptimizer(tezPlan, unionUnsupportedStoreFuncs);
             uo.visit();
         }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Wed Aug  5 20:21:53 2015
@@ -46,10 +46,12 @@ import org.apache.pig.impl.plan.VisitorE
 public class MultiQueryOptimizerTez extends TezOpPlanVisitor {
 
     private boolean unionOptimizerOn;
+    private List<String> unionUnsupportedStoreFuncs;
 
-    public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn) {
+    public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn, List<String> unionUnsupportedStoreFuncs) {
         super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         this.unionOptimizerOn = unionOptimizerOn;
+        this.unionUnsupportedStoreFuncs = unionUnsupportedStoreFuncs;
     }
 
     private void addAllPredecessors(TezOperator tezOp, List<TezOperator> predsList) {
@@ -133,7 +135,7 @@ public class MultiQueryOptimizerTez exte
                     for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) {
                         if (succSuccessor.isUnion()) {
                             if (!(unionOptimizerOn
-                                    && UnionOptimizer.isOptimizable(succSuccessor))) {
+                                    && UnionOptimizer.isOptimizable(succSuccessor, unionUnsupportedStoreFuncs))) {
                                 toMergeSuccessors.add(succSuccessor);
                             }
                         } else if (successors.contains(succSuccessor)) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Wed Aug  5 20:21:53 2015
@@ -71,15 +71,27 @@ import org.apache.tez.runtime.library.ou
 public class UnionOptimizer extends TezOpPlanVisitor {
 
     private TezOperPlan tezPlan;
-    public UnionOptimizer(TezOperPlan plan) {
+    private List<String> unsupportedStoreFuncs;
+
+    public UnionOptimizer(TezOperPlan plan, List<String> unsupportedStoreFuncs) {
         super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         tezPlan = plan;
+        this.unsupportedStoreFuncs = unsupportedStoreFuncs;
     }
 
-    public static boolean isOptimizable(TezOperator tezOp) {
+    public static boolean isOptimizable(TezOperator tezOp, List<String> unsupportedStoreFuncs)
+            throws VisitorException {
         if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) {
             return false;
         }
+        if (unsupportedStoreFuncs != null) {
+            List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
+            for (POStoreTez store : stores) {
+                if (unsupportedStoreFuncs.contains(store.getStoreFunc().getClass().getName())) {
+                    return false;
+                }
+            }
+        }
         return true;
     }
 
@@ -89,7 +101,7 @@ public class UnionOptimizer extends TezO
             return;
         }
 
-        if (!isOptimizable(tezOp)) {
+        if (!isOptimizable(tezOp, unsupportedStoreFuncs)) {
             return;
         }
 

Modified: pig/trunk/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/trunk/src/pig-default.properties?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/src/pig-default.properties (original)
+++ pig/trunk/src/pig-default.properties Wed Aug  5 20:21:53 2015
@@ -58,3 +58,5 @@ pig.output.committer.recovery.support=fa
 
 pig.stats.output.size.reader=org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader
 pig.stats.output.size.reader.unsupported=org.apache.pig.builtin.mock.Storage,org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage
+
+pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer
\ No newline at end of file

Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld (original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld Wed Aug  5 20:21:53 2015
@@ -2,44 +2,44 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: pig-0_scope-1
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-45	->	Tez vertex scope-47,
-Tez vertex scope-46	->	Tez vertex scope-47,
-Tez vertex scope-47
+Tez vertex scope-18	->	Tez vertex scope-20,
+Tez vertex scope-19	->	Tez vertex scope-20,
+Tez vertex scope-20
 
-Tez vertex scope-45
+Tez vertex scope-18
 # Plan on vertex
-POValueOutputTez - scope-49	->	 [scope-47]
+POValueOutputTez - scope-22	->	 [scope-20]
 |
-|---a: New For Each(false,false)[bag] - scope-34
+|---a: New For Each(false,false)[bag] - scope-7
     |   |
-    |   Cast[int] - scope-29
+    |   Cast[int] - scope-2
     |   |
-    |   |---Project[bytearray][0] - scope-28
+    |   |---Project[bytearray][0] - scope-1
     |   |
-    |   Cast[chararray] - scope-32
+    |   Cast[chararray] - scope-5
     |   |
-    |   |---Project[bytearray][1] - scope-31
+    |   |---Project[bytearray][1] - scope-4
     |
-    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-27
-Tez vertex scope-46
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-19
 # Plan on vertex
-POValueOutputTez - scope-50	->	 [scope-47]
+POValueOutputTez - scope-23	->	 [scope-20]
 |
-|---c: New For Each(false,false)[bag] - scope-42
+|---c: New For Each(false,false)[bag] - scope-15
     |   |
-    |   Cast[int] - scope-37
+    |   Cast[int] - scope-10
     |   |
-    |   |---Project[bytearray][1] - scope-36
+    |   |---Project[bytearray][1] - scope-9
     |   |
-    |   Cast[chararray] - scope-40
+    |   Cast[chararray] - scope-13
     |   |
-    |   |---Project[bytearray][0] - scope-39
+    |   |---Project[bytearray][0] - scope-12
     |
-    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-35
-Tez vertex scope-47
+    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-20
 # Plan on vertex
-c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-44
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-17
 |
-|---POShuffledValueInputTez - scope-48	<-	 [scope-45, scope-46]
+|---POShuffledValueInputTez - scope-21	<-	 [scope-18, scope-19]

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1694320&r1=1694319&r2=1694320&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Wed Aug  5 20:21:53 2015
@@ -33,6 +33,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.Util;
@@ -499,11 +500,29 @@ public class TestTezCompiler {
 
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld");
+        resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
     }
 
     @Test
+    public void testUnionUnSupportedStore() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b;" +
+                "store c into 'file:///tmp/output';";
+
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        String oldConfigValue = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName());
+        // Plan should not have union optimization applied
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+        // Restore the value
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, oldConfigValue);
+    }
+
+    @Test
     public void testUnionGroupBy() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
@@ -815,8 +834,16 @@ public class TestTezCompiler {
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld");
     }
 
+    private String getProperty(String property) {
+        return pigServer.getPigContext().getProperties().getProperty(property);
+    }
+
     private void setProperty(String property, String value) {
-        pigServer.getPigContext().getProperties().setProperty(property, value);
+        if (value == null) {
+            pigServer.getPigContext().getProperties().remove(property);
+        } else {
+            pigServer.getPigContext().getProperties().setProperty(property, value);
+        }
     }
 
     private void run(String query, String expectedFile) throws Exception {