You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/01/11 15:06:05 UTC
svn commit: r1778311 - in /pig/branches/branch-0.16: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/util/
test/org/apache/pig/tez/
Author: rohini
Date: Wed Jan 11 15:06:05 2017
New Revision: 1778311
URL: http://svn.apache.org/viewvc?rev=1778311&view=rev
Log:
PIG-5043: Slowstart not applied in Tez with PARALLEL clause (rohini)
Modified:
pig/branches/branch-0.16/CHANGES.txt
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java
Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Wed Jan 11 15:06:05 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-5043: Slowstart not applied in Tez with PARALLEL clause (rohini)
+
PIG-4930: Skewed Join Breaks On Empty Sampled Input When Key is From Map (nkollar via rohini)
PIG-3417: Job fails when skewed join is done on tuple key (nkollar via rohini)
Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Jan 11 15:06:05 2017
@@ -23,9 +23,11 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
@@ -174,6 +176,7 @@ public class TezDagBuilder extends TezOp
private PigContext pc;
private Configuration globalConf;
private Configuration pigContextConf;
+ private Configuration shuffleVertexManagerBaseConf;
private FileSystem fs;
private long intermediateTaskInputSize;
private Set<String> inputSplitInDiskVertices;
@@ -217,6 +220,16 @@ public class TezDagBuilder extends TezOp
this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
MRToTezHelper.processMRSettings(pigContextConf, globalConf);
+ shuffleVertexManagerBaseConf = new Configuration(false);
+ // Only copy tez.shuffle-vertex-manager config to keep payload size small
+ Iterator<Entry<String, String>> iter = pigContextConf.iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) {
+ shuffleVertexManagerBaseConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
// Add credentials from binary token file and get tokens for namenodes
// specified in mapreduce.job.hdfs-servers
SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
@@ -265,7 +278,7 @@ public class TezDagBuilder extends TezOp
if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) {
// If tez setting is not defined
MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
- MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true);
+ MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, false);
}
if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) {
@@ -778,6 +791,21 @@ public class TezDagBuilder extends TezOp
String vmPluginName = null;
Configuration vmPluginConf = null;
+ boolean containScatterGather = false;
+ boolean containCustomPartitioner = false;
+ for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+ if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
+ containScatterGather = true;
+ }
+ if (edge.partitionerClass != null) {
+ containCustomPartitioner = true;
+ }
+ }
+
+ if(containScatterGather) {
+ vmPluginName = ShuffleVertexManager.class.getName();
+ vmPluginConf = new Configuration(shuffleVertexManagerBaseConf);
+ }
// Set the right VertexManagerPlugin
if (tezOp.getEstimatedParallelism() != -1) {
@@ -792,31 +820,8 @@ public class TezDagBuilder extends TezOp
log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
}
} else {
- boolean containScatterGather = false;
- boolean containCustomPartitioner = false;
- for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
- if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
- containScatterGather = true;
- }
- if (edge.partitionerClass!=null) {
- containCustomPartitioner = true;
- }
- }
if (containScatterGather && !containCustomPartitioner) {
- vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf;
- // Use auto-parallelism feature of ShuffleVertexManager to dynamically
- // reduce the parallelism of the vertex
- if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
- && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
- && tezOp.getCrossKeys() == null) {
- vmPluginName = PigGraceShuffleVertexManager.class.getName();
- tezOp.setUseGraceParallelism(true);
- vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
- vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
- } else {
- vmPluginName = ShuffleVertexManager.class.getName();
- }
- vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+
// For Intermediate reduce, set the bytes per reducer to be block size.
long bytesPerReducer = intermediateTaskInputSize;
// If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user.
@@ -825,8 +830,8 @@ public class TezDagBuilder extends TezOp
// In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce
// as map input sizes are mostly always high compared to map output.
if (stores.size() > 0) {
- if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
- bytesPerReducer = vmPluginConf.getLong(
+ if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+ bytesPerReducer = pigContextConf.getLong(
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
} else if (tezOp.isGroupBy()) {
@@ -835,6 +840,20 @@ public class TezDagBuilder extends TezOp
bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
}
}
+
+ // Use auto-parallelism feature of ShuffleVertexManager to dynamically
+ // reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager
+ // instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on
+ if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+ && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
+ && tezOp.getCrossKeys() == null) {
+ vmPluginName = PigGraceShuffleVertexManager.class.getName();
+ tezOp.setUseGraceParallelism(true);
+ vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
+ vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
+ vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer);
+ }
+ vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
}
Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Wed Jan 11 15:06:05 2017
@@ -50,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.classification.InterfaceAudience;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
@@ -102,7 +103,6 @@ public class MRToTezHelper {
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency");
- mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency");
mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
}
@@ -259,6 +259,14 @@ public class MRToTezHelper {
JobControlCompiler.configureCompression(tezConf);
convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap());
removeUnwantedSettings(tezConf, false);
+
+ // ShuffleVertexManager Plugin settings
+ // DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and not max
+ String slowStartFraction = mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+ if (slowStartFraction != null) {
+ tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, slowStartFraction);
+ tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, slowStartFraction);
+ }
}
/**
Modified: pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1778311&r1=1778310&r2=1778311&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/branches/branch-0.16/test/org/apache/pig/tez/TestTezJobControlCompiler.java Wed Jan 11 15:06:05 2017
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@@ -36,12 +37,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
@@ -50,6 +53,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
@@ -59,8 +63,11 @@ import org.apache.pig.test.junit.Ordered
import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -79,7 +86,8 @@ import org.junit.runner.RunWith;
"testTezParallelismEstimatorFilterFlatten",
"testTezParallelismEstimatorHashJoin",
"testTezParallelismEstimatorSplitBranch",
- "testTezParallelismDefaultParallelism"
+ "testTezParallelismDefaultParallelism",
+ "testShuffleVertexManagerConfig"
})
public class TestTezJobControlCompiler {
private static PigContext pc;
@@ -292,6 +300,72 @@ public class TestTezJobControlCompiler {
TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
assertEquals(leafVertex.getParallelism(), 5);
+ pc.defaultParallel = -1;
+ }
+
+ @Test
+ public void testShuffleVertexManagerConfig() throws Exception{
+ pc.getProperties().setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "0.3");
+ pc.getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "500");
+
+ try {
+
+ String query = "a = load '10' using " + ArbitarySplitsLoader.class.getName()
+ + "() as (name:chararray, age:int, gpa:double);"
+ + "b = limit a 5;"
+ + "c = group b by name;"
+ + "store c into 'output';";
+
+ VertexManagerPluginDescriptor vmPlugin = getLeafVertexVMPlugin(query);
+ Configuration vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+
+ // Case of grace auto parallelism (PigGraceShuffleVertexManager)
+ assertEquals(PigGraceShuffleVertexManager.class.getName(), vmPlugin.getClassName());
+ // min and max src fraction, auto parallel, desired size, bytes.per.reducer, pig.tez.plan and pigcontext
+ assertEquals(7, vmPluginConf.size());
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+ assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
+ assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
+ assertEquals("500", vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM));
+
+ // Case of auto parallelism (ShuffleVertexManager)
+ pc.getProperties().setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false");
+ vmPlugin = getLeafVertexVMPlugin(query);
+ vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+ assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName());
+ // min and max src fraction, auto parallel, desired size
+ assertEquals(4, vmPluginConf.size());
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+ assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
+ assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
+
+ // Case of default parallel or PARALLEL (ShuffleVertexManager)
+ pc.defaultParallel = 2;
+ vmPlugin = getLeafVertexVMPlugin(query);
+ vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+ assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName());
+ // min and max src fraction
+ assertEquals(2, vmPluginConf.size());
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+ assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+ } finally {
+ pc.getProperties().remove(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+ pc.getProperties().remove(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
+ pc.getProperties().remove(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM);
+ pc.defaultParallel = -1;
+ }
+ }
+
+ private VertexManagerPluginDescriptor getLeafVertexVMPlugin(String query) throws Exception {
+ Pair<TezOperPlan, DAG> compiledPlan = compile(query);
+ TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
+ Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
+ Field vmPluginField = Vertex.class.getDeclaredField("vertexManagerPlugin");
+ vmPluginField.setAccessible(true);
+ VertexManagerPluginDescriptor vmPlugin = (VertexManagerPluginDescriptor) vmPluginField.get(leafVertex);
+ return vmPlugin;
}
private Pair<TezOperPlan, DAG> compile(String query) throws Exception {