You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/01/11 15:06:05 UTC

svn commit: r1778311 - in /pig/branches/branch-0.16: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/util/ test/org/apache/pig/tez/

Author: rohini
Date: Wed Jan 11 15:06:05 2017
New Revision: 1778311

URL: http://svn.apache.org/viewvc?rev=1778311&view=rev
Log:
PIG-5043: Slowstart not applied in Tez with PARALLEL clause (rohini)

Modified:
    pig/branches/branch-0.16/CHANGES.txt
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
    pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java

Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Wed Jan 11 15:06:05 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-5043: Slowstart not applied in Tez with PARALLEL clause (rohini)
+
 PIG-4930: Skewed Join Breaks On Empty Sampled Input When Key is From Map (nkollar via rohini)
 
 PIG-3417: Job fails when skewed join is done on tuple key (nkollar via rohini)

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Jan 11 15:06:05 2017
@@ -23,9 +23,11 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -174,6 +176,7 @@ public class TezDagBuilder extends TezOp
     private PigContext pc;
     private Configuration globalConf;
     private Configuration pigContextConf;
+    private Configuration shuffleVertexManagerBaseConf;
     private FileSystem fs;
     private long intermediateTaskInputSize;
     private Set<String> inputSplitInDiskVertices;
@@ -217,6 +220,16 @@ public class TezDagBuilder extends TezOp
         this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
         MRToTezHelper.processMRSettings(pigContextConf, globalConf);
 
+        shuffleVertexManagerBaseConf = new Configuration(false);
+        // Only copy tez.shuffle-vertex-manager config to keep payload size small
+        Iterator<Entry<String, String>> iter = pigContextConf.iterator();
+        while (iter.hasNext()) {
+            Entry<String, String> entry = iter.next();
+            if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) {
+                shuffleVertexManagerBaseConf.set(entry.getKey(), entry.getValue());
+            }
+        }
+
         // Add credentials from binary token file and get tokens for namenodes
         // specified in mapreduce.job.hdfs-servers
         SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
@@ -265,7 +278,7 @@ public class TezDagBuilder extends TezOp
         if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) {
             // If tez setting is not defined
             MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
-            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true);
+            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, false);
         }
 
         if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) {
@@ -778,6 +791,21 @@ public class TezDagBuilder extends TezOp
 
         String vmPluginName = null;
         Configuration vmPluginConf = null;
+        boolean containScatterGather = false;
+        boolean containCustomPartitioner = false;
+        for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+            if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
+                containScatterGather = true;
+            }
+            if (edge.partitionerClass != null) {
+                containCustomPartitioner = true;
+            }
+        }
+
+        if(containScatterGather) {
+            vmPluginName = ShuffleVertexManager.class.getName();
+            vmPluginConf = new Configuration(shuffleVertexManagerBaseConf);
+        }
 
         // Set the right VertexManagerPlugin
         if (tezOp.getEstimatedParallelism() != -1) {
@@ -792,31 +820,8 @@ public class TezDagBuilder extends TezOp
                     log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
                 }
             } else {
-                boolean containScatterGather = false;
-                boolean containCustomPartitioner = false;
-                for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
-                    if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
-                        containScatterGather = true;
-                    }
-                    if (edge.partitionerClass!=null) {
-                        containCustomPartitioner = true;
-                    }
-                }
                 if (containScatterGather && !containCustomPartitioner) {
-                    vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf;
-                    // Use auto-parallelism feature of ShuffleVertexManager to dynamically
-                    // reduce the parallelism of the vertex
-                    if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
-                            && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
-                            && tezOp.getCrossKeys() == null) {
-                        vmPluginName = PigGraceShuffleVertexManager.class.getName();
-                        tezOp.setUseGraceParallelism(true);
-                        vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
-                        vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
-                    } else {
-                        vmPluginName = ShuffleVertexManager.class.getName();
-                    }
-                    vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+
                     // For Intermediate reduce, set the bytes per reducer to be block size.
                     long bytesPerReducer = intermediateTaskInputSize;
                     // If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user.
@@ -825,8 +830,8 @@ public class TezDagBuilder extends TezOp
                     // In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce
                     // as map input sizes are mostly always high compared to map output.
                     if (stores.size() > 0) {
-                        if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
-                            bytesPerReducer = vmPluginConf.getLong(
+                        if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+                            bytesPerReducer = pigContextConf.getLong(
                                             InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                                             InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
                         } else if (tezOp.isGroupBy()) {
@@ -835,6 +840,20 @@ public class TezDagBuilder extends TezOp
                             bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
                         }
                     }
+
+                    // Use auto-parallelism feature of ShuffleVertexManager to dynamically
+                    // reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager
+                    // instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on
+                    if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+                            && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
+                            && tezOp.getCrossKeys() == null) {
+                        vmPluginName = PigGraceShuffleVertexManager.class.getName();
+                        tezOp.setUseGraceParallelism(true);
+                        vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
+                        vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
+                        vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer);
+                    }
+                    vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
                     vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
                     log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
                 }

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Wed Jan 11 15:06:05 2017
@@ -50,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
@@ -102,7 +103,6 @@ public class MRToTezHelper {
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
         mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency");
-        mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency");
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
     }
 
@@ -259,6 +259,14 @@ public class MRToTezHelper {
         JobControlCompiler.configureCompression(tezConf);
         convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap());
         removeUnwantedSettings(tezConf, false);
+
+        // ShuffleVertexManager Plugin settings
+        // DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and not max
+        String slowStartFraction = mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+        if (slowStartFraction != null) {
+            tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, slowStartFraction);
+            tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, slowStartFraction);
+        }
     }
 
     /**

Modified: pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java Wed Jan 11 15:06:05 2017
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,12 +37,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
@@ -50,6 +53,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -59,8 +63,11 @@ import org.apache.pig.test.junit.Ordered
 import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -79,7 +86,8 @@ import org.junit.runner.RunWith;
     "testTezParallelismEstimatorFilterFlatten",
     "testTezParallelismEstimatorHashJoin",
     "testTezParallelismEstimatorSplitBranch",
-    "testTezParallelismDefaultParallelism"
+    "testTezParallelismDefaultParallelism",
+    "testShuffleVertexManagerConfig"
 })
 public class TestTezJobControlCompiler {
     private static PigContext pc;
@@ -292,6 +300,72 @@ public class TestTezJobControlCompiler {
         TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
         Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
         assertEquals(leafVertex.getParallelism(), 5);
+        pc.defaultParallel = -1;
+    }
+
+    @Test
+    public void testShuffleVertexManagerConfig() throws Exception{
+        pc.getProperties().setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "0.3");
+        pc.getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "500");
+
+        try {
+
+            String query = "a = load '10' using " + ArbitarySplitsLoader.class.getName()
+                    + "() as (name:chararray, age:int, gpa:double);"
+                    + "b = limit a 5;"
+                    + "c = group b by name;"
+                    + "store c into 'output';";
+
+            VertexManagerPluginDescriptor vmPlugin = getLeafVertexVMPlugin(query);
+            Configuration vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+
+            // Case of grace auto parallelism (PigGraceShuffleVertexManager)
+            assertEquals(PigGraceShuffleVertexManager.class.getName(), vmPlugin.getClassName());
+            // min and max src fraction, auto parallel, desired size, bytes.per.reducer, pig.tez.plan and pigcontext
+            assertEquals(7, vmPluginConf.size());
+            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+            assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
+            assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
+            assertEquals("500", vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM));
+
+            // Case of auto parallelism (ShuffleVertexManager)
+            pc.getProperties().setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false");
+            vmPlugin = getLeafVertexVMPlugin(query);
+            vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+            assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName());
+            // min and max src fraction, auto parallel, desired size
+            assertEquals(4, vmPluginConf.size());
+            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+            assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
+            assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
+
+            // Case of default parallel or PARALLEL (ShuffleVertexManager)
+            pc.defaultParallel = 2;
+            vmPlugin = getLeafVertexVMPlugin(query);
+            vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+            assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName());
+            // min and max src fraction
+            assertEquals(2, vmPluginConf.size());
+            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+            assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+        } finally {
+            pc.getProperties().remove(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+            pc.getProperties().remove(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
+            pc.getProperties().remove(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM);
+            pc.defaultParallel = -1;
+        }
+    }
+
+    private VertexManagerPluginDescriptor getLeafVertexVMPlugin(String query) throws Exception {
+        Pair<TezOperPlan, DAG> compiledPlan = compile(query);
+        TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
+        Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
+        Field vmPluginField = Vertex.class.getDeclaredField("vertexManagerPlugin");
+        vmPluginField.setAccessible(true);
+        VertexManagerPluginDescriptor vmPlugin = (VertexManagerPluginDescriptor) vmPluginField.get(leafVertex);
+        return vmPlugin;
     }
 
     private Pair<TezOperPlan, DAG> compile(String query) throws Exception {