You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2015/11/13 17:21:49 UTC
tez git commit: TEZ-2935. Add MR slow start translation for
ShuffleVertexManager (jeagles)
Repository: tez
Updated Branches:
refs/heads/master 5ec498d8f -> 24ba80f81
TEZ-2935. Add MR slow start translation for ShuffleVertexManager (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/24ba80f8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/24ba80f8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/24ba80f8
Branch: refs/heads/master
Commit: 24ba80f811a653867b456287d88bfaa349f21310
Parents: 5ec498d
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Nov 13 10:21:25 2015 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Nov 13 10:21:25 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../tez/mapreduce/hadoop/DeprecatedKeys.java | 3 +
.../mapreduce/hadoop/TestDeprecatedKeys.java | 4 ++
.../vertexmanager/ShuffleVertexManager.java | 9 ++-
.../vertexmanager/TestShuffleVertexManager.java | 60 ++++++++++++++++++--
5 files changed, 70 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 52c73da..7233b03 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-2679. Admin forms of launch env settings
ALL CHANGES:
+ TEZ-2935. Add MR slow start translation for ShuffleVertexManager
TEZ-2918. Make progress notifications in IOs
TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang indefinitely
TEZ-2930. Tez UI: Parent controller is not polling at times
@@ -249,6 +250,7 @@ INCOMPATIBLE CHANGES
TEZ-2679. Admin forms of launch env settings
ALL CHANGES
+ TEZ-2935. Add MR slow start translation for ShuffleVertexManager
TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang indefinitely
TEZ-1670. Add tests for all converter functions in HistoryEventTimelineConversion.
TEZ-2922. Tez Live UI gives access denied for admins
http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index 8bd27cb..49f95c0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
@@ -109,6 +110,8 @@ public class DeprecatedKeys {
registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Constants.TEZ_RUNTIME_TASK_MEMORY);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION);
+
registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
index 3727233..2414743 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestDeprecatedKeys.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
import org.junit.Test;
@@ -73,6 +74,7 @@ public class TestDeprecatedKeys {
jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, 2000);
jobConf.setInt(MRJobConfig.IO_SORT_MB, 100);
jobConf.setInt(MRJobConfig.COUNTERS_MAX_KEY, 100);
+ jobConf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.95f);
jobConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 1000);
jobConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 200);
@@ -127,6 +129,7 @@ public class TestDeprecatedKeys {
assertEquals("SecondaryComparator", jobConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, ""));
assertEquals("DefaultSorter", jobConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, ""));
assertTrue(jobConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false));
+ assertEquals(0.95f, jobConf.getFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0.0f), 0.0f);
assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD));
assertNull(jobConf.get(MRConfig.MAPRED_IFILE_READAHEAD_BYTES));
@@ -151,6 +154,7 @@ public class TestDeprecatedKeys {
assertNull(jobConf.get(MRJobConfig.GROUP_COMPARATOR_CLASS));
assertNull(jobConf.get(MRJobConfig.GROUP_COMPARATOR_CLASS));
assertNull(jobConf.get("map.sort.class"));
+ assertNull(jobConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index c5016aa..5fb4df9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -99,7 +99,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
* In case of a ScatterGather connection, once this fraction of source tasks
* have completed, all tasks on the current vertex can be scheduled. Number of
* tasks ready for scheduling on the current vertex scales linearly between
- * min-fraction and max-fraction
+ * min-fraction and max-fraction. Defaults to the greater of the default value
+ * or tez.shuffle-vertex-manager.min-src-fraction.
*/
public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION =
"tez.shuffle-vertex-manager.max-src-fraction";
@@ -944,10 +945,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
.getFloat(
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT);
+ float defaultSlowStartMaxSrcFraction = ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT;
+ if (slowStartMinSrcCompletionFraction > defaultSlowStartMaxSrcFraction) {
+ defaultSlowStartMaxSrcFraction = slowStartMinSrcCompletionFraction;
+ }
this.slowStartMaxSrcCompletionFraction = conf
.getFloat(
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
- ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT);
+ defaultSlowStartMaxSrcFraction);
if (slowStartMinSrcCompletionFraction < 0 || slowStartMaxSrcCompletionFraction > 1
|| slowStartMaxSrcCompletionFraction < slowStartMinSrcCompletionFraction) {
http://git-wip-us.apache.org/repos/asf/tez/blob/24ba80f8/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index fec08b2..965e99c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -583,7 +583,7 @@ public class TestShuffleVertexManager {
try {
// source vertex have some tasks. min < 0.
- manager = createManager(conf, mockContext, -0.1f, 0);
+ manager = createManager(conf, mockContext, -0.1f, 0.0f);
Assert.assertTrue(false); // should not come here
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
@@ -607,9 +607,49 @@ public class TestShuffleVertexManager {
Assert.assertTrue(e.getMessage().contains(
"Invalid values for slowStartMinSrcCompletionFraction"));
}
-
+
+ // source vertex have some tasks. min > default and max undefined
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
+ scheduledTasks.clear();
+
+ manager = createManager(conf, mockContext, 0.8f, null);
+ manager.onVertexStarted(emptyCompletions);
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+ Assert.assertEquals(3, manager.pendingTasks.size());
+ Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+
+ float completedTasksThreshold = 0.8f * manager.totalNumBipartiteSourceTasks;
+ int completedTasks = 0;
+ // Finish all tasks before exceeding the threshold
+ for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) {
+ for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i));
+ ++completedTasks;
+ if ((completedTasks + 1) >= completedTasksThreshold) {
+ // stop before completing more than min/max source tasks
+ break;
+ }
+ }
+ }
+ // Since we haven't exceeded the threshold, all tasks are still pending
+ Assert.assertEquals(manager.totalTasksToSchedule, manager.pendingTasks.size());
+ Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+
+ // Cross the threshold min/max threshold to schedule all tasks
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks));
+ Assert.assertEquals(0, manager.pendingTasks.size());
+ Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled
+
+ // reset vertices for next test
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+
// source vertex have some tasks. min, max == 0
- manager = createManager(conf, mockContext, 0, 0);
+ manager = createManager(conf, mockContext, 0.0f, 0.0f);
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.totalTasksToSchedule == 3);
@@ -1184,9 +1224,17 @@ public class TestShuffleVertexManager {
}
private ShuffleVertexManager createManager(Configuration conf,
- VertexManagerPluginContext context, float min, float max) {
- conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min);
- conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max);
+ VertexManagerPluginContext context, Float min, Float max) {
+ if (min != null) {
+ conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min);
+ } else {
+ conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION);
+ }
+ if (max != null) {
+ conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max);
+ } else {
+ conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION);
+ }
UserPayload payload;
try {
payload = TezUtils.createUserPayloadFromConf(conf);