You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/02/15 23:12:02 UTC

[tez] branch master updated: TEZ-3952. Allow Tez task speculation to grant greater customization of certain parameters (Nishant Dash via jeagles)

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new ff999d8  TEZ-3952. Allow Tez task speculation to grant greater customization of certain parameters (Nishant Dash via jeagles)
ff999d8 is described below

commit ff999d8156a345503f5e3d680a36d6a95186bf79
Author: Jonathan Eagles <je...@gmail.com>
AuthorDate: Fri Feb 15 17:11:54 2019 -0600

    TEZ-3952. Allow Tez task speculation to grant greater customization of certain parameters (Nishant Dash via jeagles)
---
 .../org/apache/tez/dag/api/TezConfiguration.java   | 52 ++++++++++++++++++++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |  3 ++
 .../dag/speculation/legacy/LegacySpeculator.java   | 51 ++++++++++++++++-----
 .../org/apache/tez/dag/app/TestSpeculation.java    | 17 +++++++
 .../tez/mapreduce/hadoop/DeprecatedKeys.java       | 10 +++++
 .../apache/tez/mapreduce/hadoop/MRJobConfig.java   |  9 ++++
 6 files changed, 132 insertions(+), 10 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 7c52122..4566600 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -552,6 +552,58 @@ public class TezConfiguration extends Configuration {
   public static final long TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT_DEFAULT = -1;
 
   /**
+   * Long value. Specifies amount of time (in ms) that needs to elapse to do the next round of
+   * speculation if there is no task speculated in this round.
+   */
+  @Unstable
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty(type="long")
+  public static final String TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE =
+          TEZ_AM_PREFIX + "soonest.retry.after.no.speculate";
+  public static final long TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE_DEFAULT = 1000L * 1L;
+
+  /**
+   * Long value. Specifies amount of time (in ms) that needs to elapse to do the next round of
+   * speculation if there are tasks speculated in this round.
+   */
+  @Unstable
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty(type="long")
+  public static final String TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE=
+          TEZ_AM_PREFIX + "soonest.retry.after.speculate";
+  public static final long TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE_DEFAULT = 1000L * 15L;
+
+  /**
+   * Double value. The max percent (0-1) of running tasks that can be speculatively re-executed at any time.
+   */
+  @Unstable
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty(type="double")
+  public static final String TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE =
+          TEZ_AM_PREFIX + "proportion.running.tasks.speculatable";
+  public static final double TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE_DEFAULT = 0.1;
+
+  /**
+   * Double value. The max percent (0-1) of all tasks that can be speculatively re-executed at any time.
+   */
+  @Unstable
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty(type="double")
+  public static final String TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE =
+          TEZ_AM_PREFIX + "proportion.total.tasks.speculatable";
+  public static final double TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE_DEFAULT = 0.01;
+
+  /**
+   * Integer value. The minimum allowed tasks that can be speculatively re-executed at any time.
+   */
+  @Unstable
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS =
+          TEZ_AM_PREFIX + "minimum.allowed.speculative.tasks";
+  public static final int TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS_DEFAULT = 10;
+
+  /**
    * Int value. Upper limit on the number of threads user to launch containers in the app
    * master. Expert level setting.
    */
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a4d2de1..a2ef475 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4762,4 +4762,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       return maxAllowedTimeForTaskReadErrorSec;
     }
   }
+
+  @VisibleForTesting
+  public LegacySpeculator getSpeculator() { return speculator; }
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
index c132fb1..3e7c2c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,12 +63,12 @@ public class LegacySpeculator {
   private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
   private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
 
-  private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
-  private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
+  private final long soonestRetryAfterNoSpeculate;
+  private final long soonestRetryAfterSpeculate;
 
-  private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
-  private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
-  private static final int  MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+  private final double proportionRunningTasksSpeculatable;
+  private final double proportionTotalTasksSpeculatable;
+  private final int  minimumAllowedSpeculativeTasks;
   private static final int VERTEX_SIZE_THRESHOLD_FOR_TIMEOUT_SPECULATION = 1;
 
   private static final Logger LOG = LoggerFactory.getLogger(LegacySpeculator.class);
@@ -93,6 +94,21 @@ public class LegacySpeculator {
   private final Clock clock;
   private long nextSpeculateTime = Long.MIN_VALUE;
 
+  @VisibleForTesting
+  public int getMinimumAllowedSpeculativeTasks() { return minimumAllowedSpeculativeTasks;}
+
+  @VisibleForTesting
+  public double getProportionTotalTasksSpeculatable() { return proportionTotalTasksSpeculatable;}
+
+  @VisibleForTesting
+  public double getProportionRunningTasksSpeculatable() { return proportionRunningTasksSpeculatable;}
+
+  @VisibleForTesting
+  public long getSoonestRetryAfterNoSpeculate() { return soonestRetryAfterNoSpeculate;}
+
+  @VisibleForTesting
+  public long getSoonestRetryAfterSpeculate() { return soonestRetryAfterSpeculate;}
+
   public LegacySpeculator(Configuration conf, AppContext context, Vertex vertex) {
     this(conf, context.getClock(), vertex);
   }
@@ -120,6 +136,21 @@ public class LegacySpeculator {
     taskTimeout = conf.getLong(
             TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT,
             TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT_DEFAULT);
+    soonestRetryAfterNoSpeculate = conf.getLong(
+            TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE,
+            TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE_DEFAULT);
+    soonestRetryAfterSpeculate = conf.getLong(
+            TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE,
+            TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE_DEFAULT);
+    proportionRunningTasksSpeculatable = conf.getDouble(
+            TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE,
+            TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE_DEFAULT);
+    proportionTotalTasksSpeculatable = conf.getDouble(
+            TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE,
+            TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE_DEFAULT);
+    minimumAllowedSpeculativeTasks = conf.getInt(
+            TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS,
+            TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS_DEFAULT);
   }
 
 /*   *************************************************************    */
@@ -133,8 +164,8 @@ public class LegacySpeculator {
     
     int speculations = maybeScheduleASpeculation();
     long mininumRecomp
-        = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
-                           : SOONEST_RETRY_AFTER_NO_SPECULATE;
+        = speculations > 0 ? soonestRetryAfterSpeculate
+                           : soonestRetryAfterNoSpeculate;
 
     long wait = Math.max(mininumRecomp,
           clock.getTime() - now);
@@ -358,8 +389,8 @@ public class LegacySpeculator {
     Map<TezTaskID, Task> tasks = vertex.getTasks();
 
     int numberAllowedSpeculativeTasks
-        = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
-                         PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
+        = (int) Math.max(minimumAllowedSpeculativeTasks,
+                         proportionTotalTasksSpeculatable * tasks.size());
 
     TezTaskID bestTaskID = null;
     long bestSpeculationValue = -1L;
@@ -388,7 +419,7 @@ public class LegacySpeculator {
     }
     numberAllowedSpeculativeTasks
         = (int) Math.max(numberAllowedSpeculativeTasks,
-                         PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
+                         proportionRunningTasksSpeculatable * numberRunningTasks);
 
     // If we found a speculation target, fire it off
     if (bestTaskID != null
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index 1df5af4..e1aa448 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -44,6 +44,8 @@ import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.VertexImpl;
+import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -148,6 +150,13 @@ public class TestSpeculation {
   }
 
   public void testBasicSpeculation(boolean withProgress) throws Exception {
+
+    defaultConf.setInt(TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS, 20);
+    defaultConf.setDouble(TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE, 0.2);
+    defaultConf.setDouble(TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE, 0.25);
+    defaultConf.setLong(TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE, 2000);
+    defaultConf.setLong(TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE, 10000);
+
     DAG dag = DAG.create("test");
     Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
     dag.addVertex(vA);
@@ -185,6 +194,14 @@ public class TestSpeculation {
       Assert.assertEquals(1, v.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
           .getValue());
     }
+
+    LegacySpeculator speculator = ((VertexImpl) dagImpl.getVertex(vA.getName())).getSpeculator();
+    Assert.assertEquals(20, speculator.getMinimumAllowedSpeculativeTasks());
+    Assert.assertEquals(.2, speculator.getProportionTotalTasksSpeculatable(), 0);
+    Assert.assertEquals(.25, speculator.getProportionRunningTasksSpeculatable(), 0);
+    Assert.assertEquals(2000, speculator.getSoonestRetryAfterNoSpeculate());
+    Assert.assertEquals(10000, speculator.getSoonestRetryAfterSpeculate());
+
     tezClient.stop();
   }
   
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index 9ae58c0..d9b0930 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -161,6 +161,16 @@ public class DeprecatedKeys {
     registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
 
     registerMRToRuntimeKeyTranslation(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, TezConfiguration.TEZ_USER_CLASSPATH_FIRST);
+
+    registerMRToRuntimeKeyTranslation(MRJobConfig.RETRY_AFTER_NO_SPECULATE, TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE);
+
+    registerMRToRuntimeKeyTranslation(MRJobConfig.RETRY_AFTER_SPECULATE, TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE);
+
+    registerMRToRuntimeKeyTranslation(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE);
+
+    registerMRToRuntimeKeyTranslation(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE);
+
+    registerMRToRuntimeKeyTranslation(MRJobConfig.MINIMUM_ALLOWED_TASKS, TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS);
   }
   
   private static void addDeprecatedKeys() {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 334a7db..cd6fd44 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -85,6 +85,15 @@ public interface MRJobConfig {
 
   public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
 
+  public static final String SPECULATIVECAP_RUNNING_TASKS = "mapreduce.job.speculative.speculative-cap-running-tasks";
+
+  public static final String RETRY_AFTER_NO_SPECULATE = "mapreduce.job.speculative.retry-after-no-speculate";
+
+  public static final String RETRY_AFTER_SPECULATE = "mapreduce.job.speculative.retry-after-speculate";
+
+  public static final String MINIMUM_ALLOWED_TASKS = "mapreduce.job.speculative.minimum-allowed-tasks";
+
+  public static final String SPECULATIVECAP_TOTAL_TASKS = "mapreduce.job.speculative.speculative-cap-total-tasks";
 
   public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";