You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/10/08 20:39:44 UTC

tez git commit: TEZ-1788. Allow vertex level disabling of speculation (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 92e8927a2 -> c6c9f6ecd


TEZ-1788. Allow vertex level disabling of speculation (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c6c9f6ec
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c6c9f6ec
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c6c9f6ec

Branch: refs/heads/master
Commit: c6c9f6ecd62e2d670a85a626ad054bbb68b0f4f7
Parents: 92e8927
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Oct 8 11:39:27 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Oct 8 11:39:27 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/dag/app/TestSpeculation.java | 60 +++++++++++++++++++-
 2 files changed, 60 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c6c9f6ec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8cd6400..8e5da31 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1788. Allow vertex level disabling of speculation
   TEZ-2868. Fix setting Caller Context in Tez Examples.
   TEZ-2860. NPE in DAGClientImpl.
   TEZ-2855. Fix a potential NPE while routing VertexManager events.

http://git-wip-us.apache.org/repos/asf/tez/blob/c6c9f6ec/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index 3413762..9a39fac 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -26,6 +26,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
@@ -35,6 +42,7 @@ import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -57,6 +65,8 @@ public class TestSpeculation {
       defaultConf.set("fs.defaultFS", "file:///");
       defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
       defaultConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+      defaultConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 1);
+      defaultConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 1);
       localFs = FileSystem.getLocal(defaultConf);
       String stagingDir = "target" + Path.SEPARATOR + TestSpeculation.class.getName() + "-tmpDir";
       defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir);
@@ -103,6 +113,7 @@ public class TestSpeculation {
     TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
 
     mockLauncher.updateProgress(withProgress);
+    // cause speculation trigger
     mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
 
     mockLauncher.startScheduling(true);
@@ -116,7 +127,7 @@ public class TestSpeculation {
     Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, 
         killedAttempt.getTerminationCause());
     if (withProgress) {
-      // without progress updates occasionally more than 1 task specualates
+      // without progress updates occasionally more than 1 task speculates
       Assert.assertEquals(1, task.getCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
           .getValue());
       Assert.assertEquals(1, dagImpl.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
@@ -137,6 +148,53 @@ public class TestSpeculation {
   public void testBasicSpeculationWithoutProgress() throws Exception {
     testBasicSpeculation(false);
   }
+  
+  @Test (timeout=10000)
+  public void testBasicSpeculationPerVertexConf() throws Exception {
+    DAG dag = DAG.create("test");
+    String vNameNoSpec = "A";
+    String vNameSpec = "B";
+    Vertex vA = Vertex.create(vNameNoSpec, ProcessorDescriptor.create("Proc.class"), 5);
+    Vertex vB = Vertex.create(vNameSpec, ProcessorDescriptor.create("Proc.class"), 5);
+    vA.setConf(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, "false");
+    dag.addVertex(vA);
+    dag.addVertex(vB);
+    // min/max src fraction is set to 1. So vertices will run sequentially
+    dag.addEdge(
+        Edge.create(vA, vB,
+            EdgeProperty.create(DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+                SchedulingType.SEQUENTIAL, OutputDescriptor.create("O"),
+                InputDescriptor.create("I"))));
+
+    MockTezClient tezClient = createTezSession();
+    
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    TezVertexID vertexId = dagImpl.getVertex(vNameSpec).getVertexId();
+    TezVertexID vertexIdNoSpec = dagImpl.getVertex(vNameNoSpec).getVertexId();
+    // original attempt is killed and speculative one is successful
+    TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0),
+        0);
+    TezTaskAttemptID noSpecTaId = TezTaskAttemptID
+        .getInstance(TezTaskID.getInstance(vertexIdNoSpec, 0), 0);
+
+    // cause speculation trigger for both
+    mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+    mockLauncher.setStatusUpdatesForTask(noSpecTaId, 100);
+
+    mockLauncher.startScheduling(true);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    org.apache.tez.dag.app.dag.Vertex vSpec = dagImpl.getVertex(vertexId);
+    org.apache.tez.dag.app.dag.Vertex vNoSpec = dagImpl.getVertex(vertexIdNoSpec);
+    // speculation for vA but not for vB
+    Assert.assertTrue(vSpec.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+        .getValue() > 0);
+    Assert.assertEquals(0, vNoSpec.getAllCounters().findCounter(TaskCounter.NUM_SPECULATIONS)
+        .getValue());
+
+    tezClient.stop();
+  }
 
   @Test (timeout=10000)
   public void testBasicSpeculationNotUseful() throws Exception {