You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/07/13 21:21:52 UTC

tez git commit: TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans. (Sushmitha Sreenivasan via hitesh)

Repository: tez
Updated Branches:
  refs/heads/master e6be19695 -> 670691c35


TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans. (Sushmitha Sreenivasan via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/670691c3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/670691c3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/670691c3

Branch: refs/heads/master
Commit: 670691c352d6e80400d40f1bdb9f5b87368b8f9f
Parents: e6be196
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Jul 13 14:21:29 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Jul 13 14:21:29 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../examples/TestOrderedWordCount.java          | 72 ++++++++++++++++++--
 2 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/670691c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff57f62..f2ed0ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans.
   TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid confusion.
   TEZ-3303. Have ShuffleVertexManager consume more precise partition stats.
   TEZ-1248. Reduce slow-start should special case 1 reducer runs.

http://git-wip-us.apache.org/repos/asf/tez/blob/670691c3/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index af455fb..51e4be1 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -29,8 +29,10 @@ import java.util.StringTokenizer;
 import java.util.TreeMap;
 
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -102,7 +104,18 @@ public class TestOrderedWordCount extends Configured implements Tool {
 
   private static final String DAG_VIEW_ACLS = "tez.testorderedwordcount.view-acls";
   private static final String DAG_MODIFY_ACLS = "tez.testorderedwordcount.modify-acls";
-
+  /**
+   * IS_MAX_IPC_DATA_SET_BY_USER is a boolean value which is set to true when MAX_IPC_DATA_LENGTH is set by user
+   * use -Dtez.testorderedwordcount.ipc.maximum.data.length to set the maximum IPC Data limit in MB
+   * use -Dtez.testorderedwordcount.exceed.ipc.limit in MB to exceed the MAX_IPC_DATA_LENGTH value
+   * IPC_PAYLOAD value is a random string generated for each vertex such that MAX_IPC_DATA_LENGTH is violated
+   * NO_OF_VERTICES is the total number of vertices in testOrderedWordCount dag
+   */
+  private static final String IS_MAX_IPC_DATA_SET_BY_USER = "tez.testorderedwordcount.is-max-ipc-set-by-user";
+  private static final String MAX_IPC_DATA_LENGTH = "tez.testorderedwordcount.ipc.maximum.data.length";
+  private static final String EXCEED_IPC_DATA_LIMIT = "tez.testorderedwordcount.exceed.ipc.limit";
+  private static final String IPC_PAYLOAD = "tez.testorderedwordcount.ipc.payload";
+  private static final int NO_OF_VERTICES = 3;
 
   public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>{
@@ -110,6 +123,18 @@ public class TestOrderedWordCount extends Configured implements Tool {
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();
 
+    public void setup(Context context) throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      if (conf.getBoolean(IS_MAX_IPC_DATA_SET_BY_USER, false)) {
+        LOG.info("Max IPC Data Length set : " + conf.getInt(MAX_IPC_DATA_LENGTH, -1) + " MB," +
+            " Exceed the Max IPC Data Length : " + conf.getInt(EXCEED_IPC_DATA_LIMIT, 3) + " MB," +
+            " Total Dag Payload sent through IPC : "
+            + (conf.getInt(MAX_IPC_DATA_LENGTH, -1) + conf.getInt(EXCEED_IPC_DATA_LIMIT, 3)) + " MB," +
+            " Each Vertex Processor payload : " +
+            ((conf.getInt(MAX_IPC_DATA_LENGTH, -1) + conf.getInt(EXCEED_IPC_DATA_LIMIT, 3))/NO_OF_VERTICES)+" MB");
+      }
+    }
+
     public void map(Object key, Text value, Context context
                     ) throws IOException, InterruptedException {
       StringTokenizer itr = new StringTokenizer(value.toString());
@@ -161,7 +186,9 @@ public class TestOrderedWordCount extends Configured implements Tool {
       int dagIndex, String inputPath, String outputPath,
       boolean generateSplitsInClient,
       boolean useMRSettings,
-      int intermediateNumReduceTasks) throws Exception {
+      int intermediateNumReduceTasks,
+      int maxDataLengthThroughIPC,
+      int exceedDataLimit) throws Exception {
 
     Configuration mapStageConf = new JobConf(conf);
     mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
@@ -215,11 +242,14 @@ public class TestOrderedWordCount extends Configured implements Tool {
     Map<String, String> reduceEnv = Maps.newHashMap();
     MRHelpers.updateEnvBasedOnMRTaskEnv(mapStageConf, reduceEnv, false);
 
+    Configuration copyMapStageConf = new Configuration(mapStageConf);
+    setMaxDataLengthConf(copyMapStageConf, maxDataLengthThroughIPC, exceedDataLimit);
+
     Vertex mapVertex;
     ProcessorDescriptor mapProcessorDescriptor =
         ProcessorDescriptor.create(MapProcessor.class.getName())
             .setUserPayload(
-                TezUtils.createUserPayloadFromConf(mapStageConf))
+                TezUtils.createUserPayloadFromConf(copyMapStageConf))
             .setHistoryText(mapStageHistoryText);
     if (!useMRSettings) {
       mapVertex = Vertex.create("initialmap", mapProcessorDescriptor);
@@ -233,11 +263,14 @@ public class TestOrderedWordCount extends Configured implements Tool {
         .addDataSource("MRInput", dsd);
     vertices.add(mapVertex);
 
+    Configuration copyiReduceStageConf = new Configuration(iReduceStageConf);
+    setMaxDataLengthConf(copyiReduceStageConf, maxDataLengthThroughIPC, exceedDataLimit);
+
     String iReduceStageHistoryText = TezUtils.convertToHistoryText("Intermediate Summation Vertex",
         iReduceStageConf);
     ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create(
         ReduceProcessor.class.getName())
-        .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
+        .setUserPayload(TezUtils.createUserPayloadFromConf(copyiReduceStageConf))
         .setHistoryText(iReduceStageHistoryText);
 
     Vertex intermediateVertex;
@@ -253,9 +286,12 @@ public class TestOrderedWordCount extends Configured implements Tool {
     intermediateVertex.addTaskLocalFiles(commonLocalResources);
     vertices.add(intermediateVertex);
 
+    Configuration copyFinalReduceConf = new Configuration(finalReduceConf);
+    setMaxDataLengthConf(copyFinalReduceConf, maxDataLengthThroughIPC, exceedDataLimit);
+
     String finalReduceStageHistoryText = TezUtils.convertToHistoryText("Final Sorter Vertex",
         finalReduceConf);
-    UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
+    UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(copyFinalReduceConf);
     Vertex finalReduceVertex;
 
     ProcessorDescriptor finalReduceProcessorDescriptor =
@@ -304,6 +340,24 @@ public class TestOrderedWordCount extends Configured implements Tool {
 
     return dag;
   }
+  private void setMaxDataLengthConf(Configuration config, int maxDataLengthThroughIPC, int exceedDataLimit) {
+    /**
+     * if -Dtez.testorderedwordcount.ipc.maximum.data.length is set by user,
+     * this function sets necessary configurations as below:
+     * IS_MAX_IPC_DATA_SET_BY_USER is set to true
+     * EXCEED_IPC_DATA_LIMIT = <N> MB is used to test successful dag submission when MAX_IPC_DATA_LENGTH exceeds by N
+     * Each vertex processor payload can be set to IPC_PAYLOAD so that the cumulative dag payload exceeds
+     * the tez.testorderedwordcount.ipc.maximum.data.length set
+     */
+    if (maxDataLengthThroughIPC > 0) {
+      config.setBoolean(IS_MAX_IPC_DATA_SET_BY_USER, true);
+      config.setInt(EXCEED_IPC_DATA_LIMIT, exceedDataLimit);
+      int payloadSize;
+      payloadSize = (((maxDataLengthThroughIPC * 1024 * 1024) + (exceedDataLimit * 1024 * 1024)) / NO_OF_VERTICES);
+      String payload = RandomStringUtils.randomAlphanumeric(payloadSize);
+      config.set(IPC_PAYLOAD, payload);
+    }
+  }
 
   private void updateDAGACls(Configuration conf, DAG dag, int dagIndex) {
     LOG.info("Checking DAG specific ACLS");
@@ -360,6 +414,11 @@ public class TestOrderedWordCount extends Configured implements Tool {
     boolean useMRSettings = conf.getBoolean("USE_MR_CONFIGS", true);
     // TODO needs to use auto reduce parallelism
     int intermediateNumReduceTasks = conf.getInt("IREDUCE_NUM_TASKS", 2);
+    int maxDataLengthThroughIPC = conf.getInt(MAX_IPC_DATA_LENGTH, -1);
+    int exceedDataLimit = conf.getInt(EXCEED_IPC_DATA_LIMIT, 3);
+    if (maxDataLengthThroughIPC > 0) {
+      conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, maxDataLengthThroughIPC * 1024 * 1024);
+    }
 
     if (((otherArgs.length%2) != 0)
         || (!useTezSession && otherArgs.length != 2)) {
@@ -453,7 +512,8 @@ public class TestOrderedWordCount extends Configured implements Tool {
         
         DAG dag = instance.createDAG(fs, tezConf, localResources,
             stagingDir, dagIndex, inputPath, outputPath,
-            generateSplitsInClient, useMRSettings, intermediateNumReduceTasks);
+            generateSplitsInClient, useMRSettings, intermediateNumReduceTasks,
+            maxDataLengthThroughIPC,exceedDataLimit);
         String callerType = "TestOrderedWordCount";
         String callerId = tezSession.getAppMasterApplicationId() == null ?
             ( "UnknownApp_" + System.currentTimeMillis() + dagIndex ) :