You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/02/15 23:12:02 UTC
[tez] branch master updated: TEZ-3952. Allow Tez task speculation
to grant greater customization of certain parameters (Nishant Dash via
jeagles)
This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new ff999d8 TEZ-3952. Allow Tez task speculation to grant greater customization of certain parameters (Nishant Dash via jeagles)
ff999d8 is described below
commit ff999d8156a345503f5e3d680a36d6a95186bf79
Author: Jonathan Eagles <je...@gmail.com>
AuthorDate: Fri Feb 15 17:11:54 2019 -0600
TEZ-3952. Allow Tez task speculation to grant greater customization of certain parameters (Nishant Dash via jeagles)
---
.../org/apache/tez/dag/api/TezConfiguration.java | 52 ++++++++++++++++++++++
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 3 ++
.../dag/speculation/legacy/LegacySpeculator.java | 51 ++++++++++++++++-----
.../org/apache/tez/dag/app/TestSpeculation.java | 17 +++++++
.../tez/mapreduce/hadoop/DeprecatedKeys.java | 10 +++++
.../apache/tez/mapreduce/hadoop/MRJobConfig.java | 9 ++++
6 files changed, 132 insertions(+), 10 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 7c52122..4566600 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -552,6 +552,58 @@ public class TezConfiguration extends Configuration {
public static final long TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT_DEFAULT = -1;
/**
+ * Long value. Specifies amount of time (in ms) that needs to elapse to do the next round of
+ * speculation if there is no task speculated in this round.
+ */
+ @Unstable
+ @ConfigurationScope(Scope.VERTEX)
+ @ConfigurationProperty(type="long")
+ public static final String TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE =
+ TEZ_AM_PREFIX + "soonest.retry.after.no.speculate";
+ public static final long TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE_DEFAULT = 1000L * 1L;
+
+ /**
+ * Long value. Specifies amount of time (in ms) that needs to elapse to do the next round of
+ * speculation if there are tasks speculated in this round.
+ */
+ @Unstable
+ @ConfigurationScope(Scope.VERTEX)
+ @ConfigurationProperty(type="long")
+ public static final String TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE=
+ TEZ_AM_PREFIX + "soonest.retry.after.speculate";
+ public static final long TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE_DEFAULT = 1000L * 15L;
+
+ /**
+ * Double value. The max percent (0-1) of running tasks that can be speculatively re-executed at any time.
+ */
+ @Unstable
+ @ConfigurationScope(Scope.VERTEX)
+ @ConfigurationProperty(type="double")
+ public static final String TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE =
+ TEZ_AM_PREFIX + "proportion.running.tasks.speculatable";
+ public static final double TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE_DEFAULT = 0.1;
+
+ /**
+ * Double value. The max percent (0-1) of all tasks that can be speculatively re-executed at any time.
+ */
+ @Unstable
+ @ConfigurationScope(Scope.VERTEX)
+ @ConfigurationProperty(type="double")
+ public static final String TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE =
+ TEZ_AM_PREFIX + "proportion.total.tasks.speculatable";
+ public static final double TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE_DEFAULT = 0.01;
+
+ /**
+ * Integer value. The minimum allowed tasks that can be speculatively re-executed at any time.
+ */
+ @Unstable
+ @ConfigurationScope(Scope.VERTEX)
+ @ConfigurationProperty(type="integer")
+ public static final String TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS =
+ TEZ_AM_PREFIX + "minimum.allowed.speculative.tasks";
+ public static final int TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS_DEFAULT = 10;
+
+ /**
* Int value. Upper limit on the number of threads user to launch containers in the app
* master. Expert level setting.
*/
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a4d2de1..a2ef475 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4762,4 +4762,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
return maxAllowedTimeForTaskReadErrorSec;
}
}
+
+ @VisibleForTesting
+ public LegacySpeculator getSpeculator() { return speculator; }
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
index c132fb1..3e7c2c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,12 +63,12 @@ public class LegacySpeculator {
private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
- private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
- private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
+ private final long soonestRetryAfterNoSpeculate;
+ private final long soonestRetryAfterSpeculate;
- private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
- private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
- private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+ private final double proportionRunningTasksSpeculatable;
+ private final double proportionTotalTasksSpeculatable;
+ private final int minimumAllowedSpeculativeTasks;
private static final int VERTEX_SIZE_THRESHOLD_FOR_TIMEOUT_SPECULATION = 1;
private static final Logger LOG = LoggerFactory.getLogger(LegacySpeculator.class);
@@ -93,6 +94,21 @@ public class LegacySpeculator {
private final Clock clock;
private long nextSpeculateTime = Long.MIN_VALUE;
+ @VisibleForTesting
+ public int getMinimumAllowedSpeculativeTasks() { return minimumAllowedSpeculativeTasks;}
+
+ @VisibleForTesting
+ public double getProportionTotalTasksSpeculatable() { return proportionTotalTasksSpeculatable;}
+
+ @VisibleForTesting
+ public double getProportionRunningTasksSpeculatable() { return proportionRunningTasksSpeculatable;}
+
+ @VisibleForTesting
+ public long getSoonestRetryAfterNoSpeculate() { return soonestRetryAfterNoSpeculate;}
+
+ @VisibleForTesting
+ public long getSoonestRetryAfterSpeculate() { return soonestRetryAfterSpeculate;}
+
public LegacySpeculator(Configuration conf, AppContext context, Vertex vertex) {
this(conf, context.getClock(), vertex);
}
@@ -120,6 +136,21 @@ public class LegacySpeculator {
taskTimeout = conf.getLong(
TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT,
TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT_DEFAULT);
+ soonestRetryAfterNoSpeculate = conf.getLong(
+ TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE,
+ TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE_DEFAULT);
+ soonestRetryAfterSpeculate = conf.getLong(
+ TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE,
+ TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE_DEFAULT);
+ proportionRunningTasksSpeculatable = conf.getDouble(
+ TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE,
+ TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE_DEFAULT);
+ proportionTotalTasksSpeculatable = conf.getDouble(
+ TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE,
+ TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE_DEFAULT);
+ minimumAllowedSpeculativeTasks = conf.getInt(
+ TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS,
+ TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS_DEFAULT);
}
/* ************************************************************* */
@@ -133,8 +164,8 @@ public class LegacySpeculator {
int speculations = maybeScheduleASpeculation();
long mininumRecomp
- = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
- : SOONEST_RETRY_AFTER_NO_SPECULATE;
+ = speculations > 0 ? soonestRetryAfterSpeculate
+ : soonestRetryAfterNoSpeculate;
long wait = Math.max(mininumRecomp,
clock.getTime() - now);
@@ -358,8 +389,8 @@ public class LegacySpeculator {
Map<TezTaskID, Task> tasks = vertex.getTasks();
int numberAllowedSpeculativeTasks
- = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
- PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
+ = (int) Math.max(minimumAllowedSpeculativeTasks,
+ proportionTotalTasksSpeculatable * tasks.size());
TezTaskID bestTaskID = null;
long bestSpeculationValue = -1L;
@@ -388,7 +419,7 @@ public class LegacySpeculator {
}
numberAllowedSpeculativeTasks
= (int) Math.max(numberAllowedSpeculativeTasks,
- PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
+ proportionRunningTasksSpeculatable * numberRunningTasks);
// If we found a speculation target, fire it off
if (bestTaskID != null
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index 1df5af4..e1aa448 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -44,6 +44,8 @@ import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.VertexImpl;
+import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -148,6 +150,13 @@ public class TestSpeculation {
}
public void testBasicSpeculation(boolean withProgress) throws Exception {
+
+ defaultConf.setInt(TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS, 20);
+ defaultConf.setDouble(TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE, 0.2);
+ defaultConf.setDouble(TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE, 0.25);
+ defaultConf.setLong(TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE, 2000);
+ defaultConf.setLong(TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE, 10000);
+
DAG dag = DAG.create("test");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
dag.addVertex(vA);
@@ -185,6 +194,14 @@ public class TestSpeculation {
Assert.assertEquals(1, v.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue());
}
+
+ LegacySpeculator speculator = ((VertexImpl) dagImpl.getVertex(vA.getName())).getSpeculator();
+ Assert.assertEquals(20, speculator.getMinimumAllowedSpeculativeTasks());
+ Assert.assertEquals(.2, speculator.getProportionTotalTasksSpeculatable(), 0);
+ Assert.assertEquals(.25, speculator.getProportionRunningTasksSpeculatable(), 0);
+ Assert.assertEquals(2000, speculator.getSoonestRetryAfterNoSpeculate());
+ Assert.assertEquals(10000, speculator.getSoonestRetryAfterSpeculate());
+
tezClient.stop();
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index 9ae58c0..d9b0930 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -161,6 +161,16 @@ public class DeprecatedKeys {
registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
registerMRToRuntimeKeyTranslation(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, TezConfiguration.TEZ_USER_CLASSPATH_FIRST);
+
+ registerMRToRuntimeKeyTranslation(MRJobConfig.RETRY_AFTER_NO_SPECULATE, TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE);
+
+ registerMRToRuntimeKeyTranslation(MRJobConfig.RETRY_AFTER_SPECULATE, TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE);
+
+ registerMRToRuntimeKeyTranslation(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE);
+
+ registerMRToRuntimeKeyTranslation(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE);
+
+ registerMRToRuntimeKeyTranslation(MRJobConfig.MINIMUM_ALLOWED_TASKS, TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS);
}
private static void addDeprecatedKeys() {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 334a7db..cd6fd44 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -85,6 +85,15 @@ public interface MRJobConfig {
public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
+ public static final String SPECULATIVECAP_RUNNING_TASKS = "mapreduce.job.speculative.speculative-cap-running-tasks";
+
+ public static final String RETRY_AFTER_NO_SPECULATE = "mapreduce.job.speculative.retry-after-no-speculate";
+
+ public static final String RETRY_AFTER_SPECULATE = "mapreduce.job.speculative.retry-after-speculate";
+
+ public static final String MINIMUM_ALLOWED_TASKS = "mapreduce.job.speculative.minimum-allowed-tasks";
+
+ public static final String SPECULATIVECAP_TOTAL_TASKS = "mapreduce.job.speculative.speculative-cap-total-tasks";
public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";