You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2015/11/13 17:21:49 UTC

tez git commit: TEZ-2935. Add MR slow start translation for ShuffleVertexManager (jeagles)

Repository: tez
Updated Branches:
  refs/heads/master 5ec498d8f -> 24ba80f81


TEZ-2935. Add MR slow start translation for ShuffleVertexManager (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/24ba80f8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/24ba80f8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/24ba80f8

Branch: refs/heads/master
Commit: 24ba80f811a653867b456287d88bfaa349f21310
Parents: 5ec498d
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Nov 13 10:21:25 2015 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Nov 13 10:21:25 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../tez/mapreduce/hadoop/DeprecatedKeys.java    |  3 +
 .../mapreduce/hadoop/TestDeprecatedKeys.java    |  4 ++
 .../vertexmanager/ShuffleVertexManager.java     |  9 ++-
 .../vertexmanager/TestShuffleVertexManager.java | 60 ++++++++++++++++++--
 5 files changed, 70 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 52c73da..7233b03 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-2679. Admin forms of launch env settings
 
 ALL CHANGES:
+  TEZ-2935. Add MR slow start translation for ShuffleVertexManager
   TEZ-2918. Make progress notifications in IOs
   TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang indefinitely
   TEZ-2930. Tez UI: Parent controller is not polling at times
@@ -249,6 +250,7 @@ INCOMPATIBLE CHANGES
   TEZ-2679. Admin forms of launch env settings
 
 ALL CHANGES
+  TEZ-2935. Add MR slow start translation for ShuffleVertexManager
   TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang indefinitely
   TEZ-1670. Add tests for all converter functions in HistoryEventTimelineConversion.
   TEZ-2922. Tez Live UI gives access denied for admins

http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index 8bd27cb..49f95c0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
 
@@ -109,6 +110,8 @@ public class DeprecatedKeys {
     
     registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Constants.TEZ_RUNTIME_TASK_MEMORY);
     
+    registerMRToRuntimeKeyTranslation(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION);
+
     registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
     
     registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);

http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
index 3727233..2414743 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
 import org.junit.Test;
@@ -73,6 +74,7 @@ public class TestDeprecatedKeys {
     jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, 2000);
     jobConf.setInt(MRJobConfig.IO_SORT_MB, 100);
     jobConf.setInt(MRJobConfig.COUNTERS_MAX_KEY, 100);
+    jobConf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.95f);
     
     jobConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 1000);
     jobConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 200);
@@ -127,6 +129,7 @@ public class TestDeprecatedKeys {
     assertEquals("SecondaryComparator", jobConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, ""));
     assertEquals("DefaultSorter", jobConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, ""));
     assertTrue(jobConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false));
+    assertEquals(0.95f, jobConf.getFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0.0f), 0.0f);
 
     assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD));
     assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD_BYTES));
@@ -151,6 +154,7 @@ public class TestDeprecatedKeys {
     assertNull(jobConf.get(MRJobConfig.GROUP_COMPARATOR_CLASS));
     assertNull(jobConf.get(MRJobConfig.GROUP_COMPARATOR_CLASS));
     assertNull(jobConf.get("map.sort.class"));
+    assertNull(jobConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index c5016aa..5fb4df9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -99,7 +99,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
    * In case of a ScatterGather connection, once this fraction of source tasks
    * have completed, all tasks on the current vertex can be scheduled. Number of
    * tasks ready for scheduling on the current vertex scales linearly between
-   * min-fraction and max-fraction
+   * min-fraction and max-fraction. Defaults to the greater of the default value
+   * or tez.shuffle-vertex-manager.min-src-fraction.
    */
   public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = 
                                       "tez.shuffle-vertex-manager.max-src-fraction";
@@ -944,10 +945,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         .getFloat(
             ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
             ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT);
+    float defaultSlowStartMaxSrcFraction = ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT;
+    if (slowStartMinSrcCompletionFraction > defaultSlowStartMaxSrcFraction) {
+      defaultSlowStartMaxSrcFraction = slowStartMinSrcCompletionFraction;
+    }
     this.slowStartMaxSrcCompletionFraction = conf
         .getFloat(
             ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
-            ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT);
+            defaultSlowStartMaxSrcFraction);
 
     if (slowStartMinSrcCompletionFraction < 0 || slowStartMaxSrcCompletionFraction > 1
         || slowStartMaxSrcCompletionFraction < slowStartMinSrcCompletionFraction) {

http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index fec08b2..965e99c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -583,7 +583,7 @@ public class TestShuffleVertexManager {
 
     try {
       // source vertex have some tasks. min < 0.
-      manager = createManager(conf, mockContext, -0.1f, 0);
+      manager = createManager(conf, mockContext, -0.1f, 0.0f);
       Assert.assertTrue(false); // should not come here
     } catch (IllegalArgumentException e) {
       Assert.assertTrue(e.getMessage().contains(
@@ -607,9 +607,49 @@ public class TestShuffleVertexManager {
       Assert.assertTrue(e.getMessage().contains(
           "Invalid values for slowStartMinSrcCompletionFraction"));
     }
-    
+
+    // source vertex have some tasks. min > default and max undefined
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
+    scheduledTasks.clear();
+
+    manager = createManager(conf, mockContext, 0.8f, null);
+    manager.onVertexStarted(emptyCompletions);
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+    Assert.assertEquals(3, manager.pendingTasks.size());
+    Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
+    Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+
+    float completedTasksThreshold = 0.8f * manager.totalNumBipartiteSourceTasks;
+    int completedTasks = 0;
+    // Finish all tasks before exceeding the threshold
+    for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) {
+      for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
+        manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i));
+        ++completedTasks;
+        if ((completedTasks + 1) >= completedTasksThreshold) {
+          // stop before completing more than min/max source tasks
+          break;
+        }
+      }
+    }
+    // Since we haven't exceeded the threshold, all tasks are still pending
+    Assert.assertEquals(manager.totalTasksToSchedule, manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+
+    // Cross the threshold min/max threshold to schedule all tasks
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks));
+    Assert.assertEquals(0, manager.pendingTasks.size());
+    Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled
+
+    // reset vertices for next test
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+
     // source vertex have some tasks. min, max == 0
-    manager = createManager(conf, mockContext, 0, 0);
+    manager = createManager(conf, mockContext, 0.0f, 0.0f);
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.totalTasksToSchedule == 3);
@@ -1184,9 +1224,17 @@ public class TestShuffleVertexManager {
   }
 
   private ShuffleVertexManager createManager(Configuration conf,
-      VertexManagerPluginContext context, float min, float max) {
-    conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min);
-    conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max);
+      VertexManagerPluginContext context, Float min, Float max) {
+    if (min != null) {
+      conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min);
+    } else {
+      conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION);
+    }
+    if (max != null) {
+      conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max);
+    } else {
+      conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION);
+    }
     UserPayload payload;
     try {
       payload = TezUtils.createUserPayloadFromConf(conf);