You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/10/08 20:39:44 UTC
tez git commit: TEZ-1788. Allow vertex level disabling of speculation
(bikas)
Repository: tez
Updated Branches:
refs/heads/master 92e8927a2 -> c6c9f6ecd
TEZ-1788. Allow vertex level disabling of speculation (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c6c9f6ec
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c6c9f6ec
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c6c9f6ec
Branch: refs/heads/master
Commit: c6c9f6ecd62e2d670a85a626ad054bbb68b0f4f7
Parents: 92e8927
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Oct 8 11:39:27 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Oct 8 11:39:27 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/dag/app/TestSpeculation.java | 60 +++++++++++++++++++-
2 files changed, 60 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c6c9f6ec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8cd6400..8e5da31 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1788. Allow vertex level disabling of speculation
TEZ-2868. Fix setting Caller Context in Tez Examples.
TEZ-2860. NPE in DAGClientImpl.
TEZ-2855. Fix a potential NPE while routing VertexManager events.
http://git-wip-us.apache.org/repos/asf/tez/blob/c6c9f6ec/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index 3413762..9a39fac 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -26,6 +26,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
@@ -35,6 +42,7 @@ import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -57,6 +65,8 @@ public class TestSpeculation {
defaultConf.set("fs.defaultFS", "file:///");
defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
defaultConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+ defaultConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 1);
+ defaultConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 1);
localFs = FileSystem.getLocal(defaultConf);
String stagingDir = "target" + Path.SEPARATOR + TestSpeculation.class.getName() + "-tmpDir";
defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir);
@@ -103,6 +113,7 @@ public class TestSpeculation {
TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
mockLauncher.updateProgress(withProgress);
+ // cause speculation trigger
mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
mockLauncher.startScheduling(true);
@@ -116,7 +127,7 @@ public class TestSpeculation {
Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION,
killedAttempt.getTerminationCause());
if (withProgress) {
- // without progress updates occasionally more than 1 task specualates
+ // without progress updates occasionally more than 1 task speculates
Assert.assertEquals(1, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
.getValue());
Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
@@ -137,6 +148,53 @@ public class TestSpeculation {
public void testBasicSpeculationWithoutProgress() throws Exception {
testBasicSpeculation(false);
}
+
+ @Test (timeout=10000)
+ public void testBasicSpeculationPerVertexConf() throws Exception {
+ DAG dag = DAG.create("test");
+ String vNameNoSpec = "A";
+ String vNameSpec = "B";
+ Vertex vA = Vertex.create(vNameNoSpec, ProcessorDescriptor.create("Proc.class"), 5);
+ Vertex vB = Vertex.create(vNameSpec, ProcessorDescriptor.create("Proc.class"), 5);
+ vA.setConf(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, "false");
+ dag.addVertex(vA);
+ dag.addVertex(vB);
+ // min/max src fraction is set to 1. So vertices will run sequentially
+ dag.addEdge(
+ Edge.create(vA, vB,
+ EdgeProperty.create(DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("O"),
+ InputDescriptor.create("I"))));
+
+ MockTezClient tezClient = createTezSession();
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+ TezVertexID vertexId = dagImpl.getVertex(vNameSpec).getVertexId();
+ TezVertexID vertexIdNoSpec = dagImpl.getVertex(vNameNoSpec).getVertexId();
+ // original attempt is killed and speculative one is successful
+ TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0),
+ 0);
+ TezTaskAttemptID noSpecTaId = TezTaskAttemptID
+ .getInstance(TezTaskID.getInstance(vertexIdNoSpec, 0), 0);
+
+ // cause speculation trigger for both
+ mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+ mockLauncher.setStatusUpdatesForTask(noSpecTaId, 100);
+
+ mockLauncher.startScheduling(true);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ org.apache.tez.dag.app.dag.Vertex vSpec = dagImpl.getVertex(vertexId);
+ org.apache.tez.dag.app.dag.Vertex vNoSpec = dagImpl.getVertex(vertexIdNoSpec);
+ // speculation for vA but not for vB
+ Assert.assertTrue(vSpec.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+ .getValue() > 0);
+ Assert.assertEquals(0, vNoSpec.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+ .getValue());
+
+ tezClient.stop();
+ }
@Test (timeout=10000)
public void testBasicSpeculationNotUseful() throws Exception {