You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/07/13 21:21:52 UTC
tez git commit: TEZ-3235. Modify Example TestOrderedWordCount job to
test the IPC limit for large dag plans. (Sushmitha Sreenivasan via hitesh)
Repository: tez
Updated Branches:
refs/heads/master e6be19695 -> 670691c35
TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans. (Sushmitha Sreenivasan via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/670691c3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/670691c3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/670691c3
Branch: refs/heads/master
Commit: 670691c352d6e80400d40f1bdb9f5b87368b8f9f
Parents: e6be196
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Jul 13 14:21:29 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Jul 13 14:21:29 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../examples/TestOrderedWordCount.java | 72 ++++++++++++++++++--
2 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/670691c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff57f62..f2ed0ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans.
TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid confusion.
TEZ-3303. Have ShuffleVertexManager consume more precise partition stats.
TEZ-1248. Reduce slow-start should special case 1 reducer runs.
http://git-wip-us.apache.org/repos/asf/tez/blob/670691c3/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index af455fb..51e4be1 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -29,8 +29,10 @@ import java.util.StringTokenizer;
import java.util.TreeMap;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@@ -102,7 +104,18 @@ public class TestOrderedWordCount extends Configured implements Tool {
private static final String DAG_VIEW_ACLS = "tez.testorderedwordcount.view-acls";
private static final String DAG_MODIFY_ACLS = "tez.testorderedwordcount.modify-acls";
-
+ /**
+ * IS_MAX_IPC_DATA_SET_BY_USER is a boolean value which is set to true when MAX_IPC_DATA_LENGTH is set by user
+ * use -Dtez.testorderedwordcount.ipc.maximum.data.length to set the maximum IPC Data limit in MB
+ * use -Dtez.testorderedwordcount.exceed.ipc.limit in MB to exceed the MAX_IPC_DATA_LENGTH value
+ * IPC_PAYLOAD value is a random string generated for each vertex such that MAX_IPC_DATA_LENGTH is violated
+ * NO_OF_VERTICES is the total number of vertices in testOrderedWordCount dag
+ */
+ private static final String IS_MAX_IPC_DATA_SET_BY_USER = "tez.testorderedwordcount.is-max-ipc-set-by-user";
+ private static final String MAX_IPC_DATA_LENGTH = "tez.testorderedwordcount.ipc.maximum.data.length";
+ private static final String EXCEED_IPC_DATA_LIMIT = "tez.testorderedwordcount.exceed.ipc.limit";
+ private static final String IPC_PAYLOAD = "tez.testorderedwordcount.ipc.payload";
+ private static final int NO_OF_VERTICES = 3;
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
@@ -110,6 +123,18 @@ public class TestOrderedWordCount extends Configured implements Tool {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
+ public void setup(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ if (conf.getBoolean(IS_MAX_IPC_DATA_SET_BY_USER, false)) {
+ LOG.info("Max IPC Data Length set : " + conf.getInt(MAX_IPC_DATA_LENGTH, -1) + " MB," +
+ " Exceed the Max IPC Data Length : " + conf.getInt(EXCEED_IPC_DATA_LIMIT, 3) + " MB," +
+ " Total Dag Payload sent through IPC : "
+ + (conf.getInt(MAX_IPC_DATA_LENGTH, -1) + conf.getInt(EXCEED_IPC_DATA_LIMIT, 3)) + " MB," +
+ " Each Vertex Processor payload : " +
+ ((conf.getInt(MAX_IPC_DATA_LENGTH, -1) + conf.getInt(EXCEED_IPC_DATA_LIMIT, 3))/NO_OF_VERTICES)+" MB");
+ }
+ }
+
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
@@ -161,7 +186,9 @@ public class TestOrderedWordCount extends Configured implements Tool {
int dagIndex, String inputPath, String outputPath,
boolean generateSplitsInClient,
boolean useMRSettings,
- int intermediateNumReduceTasks) throws Exception {
+ int intermediateNumReduceTasks,
+ int maxDataLengthThroughIPC,
+ int exceedDataLimit) throws Exception {
Configuration mapStageConf = new JobConf(conf);
mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
@@ -215,11 +242,14 @@ public class TestOrderedWordCount extends Configured implements Tool {
Map<String, String> reduceEnv = Maps.newHashMap();
MRHelpers.updateEnvBasedOnMRTaskEnv(mapStageConf, reduceEnv, false);
+ Configuration copyMapStageConf = new Configuration(mapStageConf);
+ setMaxDataLengthConf(copyMapStageConf, maxDataLengthThroughIPC, exceedDataLimit);
+
Vertex mapVertex;
ProcessorDescriptor mapProcessorDescriptor =
ProcessorDescriptor.create(MapProcessor.class.getName())
.setUserPayload(
- TezUtils.createUserPayloadFromConf(mapStageConf))
+ TezUtils.createUserPayloadFromConf(copyMapStageConf))
.setHistoryText(mapStageHistoryText);
if (!useMRSettings) {
mapVertex = Vertex.create("initialmap", mapProcessorDescriptor);
@@ -233,11 +263,14 @@ public class TestOrderedWordCount extends Configured implements Tool {
.addDataSource("MRInput", dsd);
vertices.add(mapVertex);
+ Configuration copyiReduceStageConf = new Configuration(iReduceStageConf);
+ setMaxDataLengthConf(copyiReduceStageConf, maxDataLengthThroughIPC, exceedDataLimit);
+
String iReduceStageHistoryText = TezUtils.convertToHistoryText("Intermediate Summation Vertex",
iReduceStageConf);
ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create(
ReduceProcessor.class.getName())
- .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
+ .setUserPayload(TezUtils.createUserPayloadFromConf(copyiReduceStageConf))
.setHistoryText(iReduceStageHistoryText);
Vertex intermediateVertex;
@@ -253,9 +286,12 @@ public class TestOrderedWordCount extends Configured implements Tool {
intermediateVertex.addTaskLocalFiles(commonLocalResources);
vertices.add(intermediateVertex);
+ Configuration copyFinalReduceConf = new Configuration(finalReduceConf);
+ setMaxDataLengthConf(copyFinalReduceConf, maxDataLengthThroughIPC, exceedDataLimit);
+
String finalReduceStageHistoryText = TezUtils.convertToHistoryText("Final Sorter Vertex",
finalReduceConf);
- UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
+ UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(copyFinalReduceConf);
Vertex finalReduceVertex;
ProcessorDescriptor finalReduceProcessorDescriptor =
@@ -304,6 +340,24 @@ public class TestOrderedWordCount extends Configured implements Tool {
return dag;
}
+ private void setMaxDataLengthConf(Configuration config, int maxDataLengthThroughIPC, int exceedDataLimit) {
+ /**
+ * if -Dtez.testorderedwordcount.ipc.maximum.data.length is set by user,
+ * this function sets necessary configurations as below:
+ * IS_MAX_IPC_DATA_SET_BY_USER is set to true
+ * EXCEED_IPC_DATA_LIMIT = <N> MB is used to test successful dag submission when MAX_IPC_DATA_LENGTH exceeds by N
+ * Each vertex processor payload can be set to IPC_PAYLOAD so that the cumulative dag payload exceeds
+ * the tez.testorderedwordcount.ipc.maximum.data.length set
+ */
+ if (maxDataLengthThroughIPC > 0) {
+ config.setBoolean(IS_MAX_IPC_DATA_SET_BY_USER, true);
+ config.setInt(EXCEED_IPC_DATA_LIMIT, exceedDataLimit);
+ int payloadSize;
+ payloadSize = (((maxDataLengthThroughIPC * 1024 * 1024) + (exceedDataLimit * 1024 * 1024)) / NO_OF_VERTICES);
+ String payload = RandomStringUtils.randomAlphanumeric(payloadSize);
+ config.set(IPC_PAYLOAD, payload);
+ }
+ }
private void updateDAGACls(Configuration conf, DAG dag, int dagIndex) {
LOG.info("Checking DAG specific ACLS");
@@ -360,6 +414,11 @@ public class TestOrderedWordCount extends Configured implements Tool {
boolean useMRSettings = conf.getBoolean("USE_MR_CONFIGS", true);
// TODO needs to use auto reduce parallelism
int intermediateNumReduceTasks = conf.getInt("IREDUCE_NUM_TASKS", 2);
+ int maxDataLengthThroughIPC = conf.getInt(MAX_IPC_DATA_LENGTH, -1);
+ int exceedDataLimit = conf.getInt(EXCEED_IPC_DATA_LIMIT, 3);
+ if (maxDataLengthThroughIPC > 0) {
+ conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, maxDataLengthThroughIPC * 1024 * 1024);
+ }
if (((otherArgs.length%2) != 0)
|| (!useTezSession && otherArgs.length != 2)) {
@@ -453,7 +512,8 @@ public class TestOrderedWordCount extends Configured implements Tool {
DAG dag = instance.createDAG(fs, tezConf, localResources,
stagingDir, dagIndex, inputPath, outputPath,
- generateSplitsInClient, useMRSettings, intermediateNumReduceTasks);
+ generateSplitsInClient, useMRSettings, intermediateNumReduceTasks,
+ maxDataLengthThroughIPC,exceedDataLimit);
String callerType = "TestOrderedWordCount";
String callerId = tezSession.getAppMasterApplicationId() == null ?
( "UnknownApp_" + System.currentTimeMillis() + dagIndex ) :