You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/10/16 22:48:46 UTC

svn commit: r1709109 - in /pig/branches/branch-0.15: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ src/org/apa...

Author: daijy
Date: Fri Oct 16 20:48:45 2015
New Revision: 1709109

URL: http://svn.apache.org/viewvc?rev=1709109&view=rev
Log:
PIG-4703: TezOperator.stores shall not ship to backend

Modified:
    pig/branches/branch-0.15/CHANGES.txt
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezGraceParallelism.java

Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Fri Oct 16 20:48:45 2015
@@ -28,6 +28,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-4703: TezOperator.stores shall not ship to backend (daijy)
+
 PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)
 
 PIG-4635: NPE while running pig script in tez mode (daijy)

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Oct 16 20:48:45 2015
@@ -490,7 +490,7 @@ public class TezDagBuilder extends TezOp
             // usually followed by limit other than store. But would benefit
             // cases like skewed join followed by group by.
             if (tezOp.getSortOperator().getEstimatedParallelism() != -1
-                    && TezCompilerUtil.isIntermediateReducer(tezOp.getSortOperator())) {
+                    && tezOp.getSortOperator().isIntermediateReducer()) {
                 payloadConf.setLong(
                         InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                         intermediateTaskInputSize);

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Fri Oct 16 20:48:45 2015
@@ -32,7 +32,7 @@ import org.apache.tez.runtime.library.ou
  */
 public class TezEdgeDescriptor implements Serializable {
     // Combiner runs on both input and output of Tez edge.
-    public PhysicalPlan combinePlan;
+    transient public PhysicalPlan combinePlan;
 
     public String inputClassName;
     public String outputClassName;

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Fri Oct 16 20:48:45 2015
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -35,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezOperDependencyParallelismEstimator.TezParallelismFactorVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -145,7 +147,7 @@ public class TezOperator extends Operato
 
     private double parallelismFactor = -1;
 
-    private LinkedList<POStore> stores = null;
+    private Boolean intermediateReducer = null;
 
     // Types of blocking operators. For now, we only support the following ones.
     public static enum OPER_FEATURE {
@@ -634,11 +636,22 @@ public class TezOperator extends Operato
         return parallelismFactor;
     }
 
-    public LinkedList<POStore> getStores() throws VisitorException {
-        if (stores == null) {
-            stores = PlanHelper.getPhysicalOperators(plan, POStore.class);
+    public Boolean isIntermediateReducer() throws IOException {
+        if (intermediateReducer == null) {
+            intermediateReducer = false;
+            // set intermediateReducer to true if are no loads or stores in a TezOperator
+            LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(plan, POStore.class);
+            // Not map and not final reducer
+            if (stores.size() <= 0 &&
+                    (getLoaderInfo().getLoads() == null || getLoaderInfo().getLoads().size() <= 0)) {
+                intermediateReducer = true;
+            }
         }
-        return stores;
+        return intermediateReducer;
+    }
+
+    public void setIntermediateReducer(Boolean intermediateReducer) {
+        this.intermediateReducer = intermediateReducer;
     }
 
     public static class VertexGroupInfo {

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Oct 16 20:48:45 2015
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
 
-import java.util.LinkedList;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -26,13 +25,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
@@ -80,11 +77,6 @@ public class ParallelismSetter extends T
             // Can only set parallelism here if the parallelism isn't derived from
             // splits
             int parallelism = -1;
-            boolean intermediateReducer = false;
-            LinkedList<POStore> stores = tezOp.getStores();
-            if (stores.size() <= 0) {
-                intermediateReducer = true;
-            }
             if (tezOp.getLoaderInfo().getLoads() != null && tezOp.getLoaderInfo().getLoads().size() > 0) {
                 // requestedParallelism of Loader vertex is handled in LoaderProcessor
                 // propogate to vertexParallelism
@@ -93,7 +85,6 @@ public class ParallelismSetter extends T
             } else {
                 int prevParallelism = -1;
                 boolean isOneToOneParallelism = false;
-                intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOp);
 
                 for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry : tezOp.inEdges.entrySet()) {
                     if (entry.getValue().dataMovementType == DataMovementType.ONE_TO_ONE) {
@@ -126,7 +117,7 @@ public class ParallelismSetter extends T
                     boolean overrideRequestedParallelism = false;
                     if (parallelism != -1
                             && autoParallelismEnabled
-                            && intermediateReducer
+                            && tezOp.isIntermediateReducer()
                             && !tezOp.isDontEstimateParallelism()
                             && tezOp.isOverrideIntermediateParallelism()) {
                         overrideRequestedParallelism = true;

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Oct 16 20:48:45 2015
@@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -79,15 +80,13 @@ public class TezOperDependencyParallelis
             return -1;
         }
 
-        boolean intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOper);
-
         // TODO: If map opts and reduce opts are same estimate higher parallelism
         // for tasks based on the count of number of map tasks else be conservative as now
         maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
                 PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
 
         // If parallelism is set explicitly, respect it
-        if (!intermediateReducer && tezOper.getRequestedParallelism()!=-1) {
+        if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
             return tezOper.getRequestedParallelism();
         }
 
@@ -129,7 +128,7 @@ public class TezOperDependencyParallelis
 
         int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism);
 
-        if (intermediateReducer && tezOper.isOverrideIntermediateParallelism()) {
+        if (tezOper.isIntermediateReducer() && tezOper.isOverrideIntermediateParallelism()) {
             // Estimated reducers should not be more than the configured limit
             roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
             int userSpecifiedParallelism = pc.defaultParallel;

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Fri Oct 16 20:48:45 2015
@@ -192,19 +192,4 @@ public class TezCompilerUtil {
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);
     }
 
-    /**
-     * Returns true if there are no loads or stores in a TezOperator.
-     * To be called only after LoaderProcessor is called
-     */
-    static public boolean isIntermediateReducer(TezOperator tezOper) throws VisitorException {
-        boolean intermediateReducer = false;
-        LinkedList<POStore> stores = tezOper.getStores();
-        // Not map and not final reducer
-        if (stores.size() <= 0 &&
-                (tezOper.getLoaderInfo().getLoads() == null || tezOper.getLoaderInfo().getLoads().size() <= 0)) {
-            intermediateReducer = true;
-        }
-        return intermediateReducer;
-    }
-
 }

Modified: pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezGraceParallelism.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezGraceParallelism.java Fri Oct 16 20:48:45 2015
@@ -26,17 +26,23 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Properties;
 import java.util.Random;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.test.Util;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.junit.AfterClass;
@@ -282,4 +288,40 @@ public class TestTezGraceParallelism {
             Util.removeLogAppender(PigGraceShuffleVertexManager.class, "testJoinWithUnion");
         }
     }
+
+    @Test
+    // See PIG-4703
+    public void testUDFContextSetInBackend() throws IOException{
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+        File outputDir = File.createTempFile("intemediate", "txt");
+        outputDir.delete();
+        pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+        pigServer.registerQuery("B = order A by name;");
+        pigServer.registerQuery("C = distinct B;");
+        pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+        pigServer.registerQuery("E = group D by name;");
+        pigServer.registerQuery("F = foreach E generate group as name, AVG(D.age) as avg_age;");
+        pigServer.registerQuery("G = join C by name left, F by name;");
+        ExecJob job = pigServer.store("G", Util.removeColon(outputDir.getAbsolutePath()), StorerWithUDFContextCheck.class.getName());
+        assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+    }
+
+    static public class StorerWithUDFContextCheck extends PigStorage {
+        @Override
+        public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+            UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{signature})
+            .setProperty("schema", ObjectSerializer.serialize(resourceSchema));
+        }
+        @Override
+        public void setStoreLocation(String location, Job job) throws IOException {
+            Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+                    this.getClass(), new String[]{signature});
+            ResourceSchema rs = (ResourceSchema)ObjectSerializer.deserialize(udfProps.getProperty("schema"));
+            if (rs == null) {
+                throw new IOException("Should not be null");
+            }
+            super.setStoreLocation(location, job);
+        }
+    }
 }