You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/06/16 19:57:57 UTC

tez git commit: TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements (jlowe)

Repository: tez
Updated Branches:
  refs/heads/master 7f699f18f -> cc33410d8


TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cc33410d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cc33410d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cc33410d

Branch: refs/heads/master
Commit: cc33410d8c9992fb70e489fc1f61748143fb08c2
Parents: 7f699f1
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Jun 16 19:55:09 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Jun 16 19:55:09 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 .../app/dag/impl/DAGSchedulerNaturalOrder.java  |  2 +-
 .../DAGSchedulerNaturalOrderControlled.java     |  2 +-
 .../tez/dag/app/dag/impl/TestDAGScheduler.java  | 24 +++++++++++++++-----
 4 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cc33410d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b9c6b0e..1e1803d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements
   TEZ-3294. DAG.createDag() does not clear local state on repeat calls.
   TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
   TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
@@ -60,6 +61,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements
   TEZ-3294. DAG.createDag() does not clear local state on repeat calls.
   TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
   TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
@@ -510,6 +512,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements
   TEZ-3294. DAG.createDag() does not clear local state on repeat calls.
   TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
   TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870

http://git-wip-us.apache.org/repos/asf/tez/blob/cc33410d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
index 4246ad0..3a16f46 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
@@ -49,7 +49,7 @@ public class DAGSchedulerNaturalOrder extends DAGScheduler {
     int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
 
     // natural priority. Handles failures and retries.
-    int priorityLowLimit = (vertexDistanceFromRoot + 1) * 3;
+    int priorityLowLimit = ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3) + (vertex.getVertexId().getId() * 3);
     int priorityHighLimit = priorityLowLimit - 2;
 
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/cc33410d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
index 0802dce..34cc92f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
@@ -79,7 +79,7 @@ public class DAGSchedulerNaturalOrderControlled extends DAGScheduler {
     int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
 
     // natural priority. Handles failures and retries.
-    int priorityLowLimit = (vertexDistanceFromRoot + 1) * 3;
+    int priorityLowLimit = ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3) + (vertex.getVertexId().getId() * 3);
     int priorityHighLimit = priorityLowLimit - 2;
 
     TaskAttemptEventSchedule attemptEvent = new TaskAttemptEventSchedule(

http://git-wip-us.apache.org/repos/asf/tez/blob/cc33410d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index a28f367..f2fd933 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -57,8 +57,15 @@ public class TestDAGScheduler {
     Vertex mockVertex = mock(Vertex.class);
     TaskAttempt mockAttempt = mock(TaskAttempt.class);
     when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex);
+    when(mockDag.getTotalVertices()).thenReturn(4);
     when(mockVertex.getDistanceFromRoot()).thenReturn(0).thenReturn(1)
         .thenReturn(2);
+    TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
+    TezVertexID vId1 = TezVertexID.fromString("vertex_1436907267600_195589_1_01");
+    TezVertexID vId2 = TezVertexID.fromString("vertex_1436907267600_195589_1_02");
+    TezVertexID vId3 = TezVertexID.fromString("vertex_1436907267600_195589_1_03");
+    when(mockVertex.getVertexId()).thenReturn(vId0).thenReturn(vId1)
+        .thenReturn(vId2).thenReturn(vId3);
     
     DAGEventSchedulerUpdate event = new DAGEventSchedulerUpdate(
         DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt);    
@@ -66,20 +73,24 @@ public class TestDAGScheduler {
     DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
         mockEventHandler);
     scheduler.scheduleTaskEx(event);
-    Assert.assertEquals(1, mockEventHandler.event.getPriorityHighLimit());
-    Assert.assertEquals(3, mockEventHandler.event.getPriorityLowLimit());
+    Assert.assertEquals(10, mockEventHandler.event.getPriorityHighLimit());
+    Assert.assertEquals(12, mockEventHandler.event.getPriorityLowLimit());
+    scheduler.scheduleTaskEx(event);
+    Assert.assertEquals(25, mockEventHandler.event.getPriorityHighLimit());
+    Assert.assertEquals(27, mockEventHandler.event.getPriorityLowLimit());
     scheduler.scheduleTaskEx(event);
-    Assert.assertEquals(4, mockEventHandler.event.getPriorityHighLimit());
-    Assert.assertEquals(6, mockEventHandler.event.getPriorityLowLimit());
+    Assert.assertEquals(40, mockEventHandler.event.getPriorityHighLimit());
+    Assert.assertEquals(42, mockEventHandler.event.getPriorityLowLimit());
     scheduler.scheduleTaskEx(event);
-    Assert.assertEquals(7, mockEventHandler.event.getPriorityHighLimit());
-    Assert.assertEquals(9, mockEventHandler.event.getPriorityLowLimit());
+    Assert.assertEquals(43, mockEventHandler.event.getPriorityHighLimit());
+    Assert.assertEquals(45, mockEventHandler.event.getPriorityLowLimit());
   }
   
   @Test(timeout=5000)
   public void testConcurrencyLimit() {
     MockEventHandler mockEventHandler = new MockEventHandler();
     DAG mockDag = mock(DAG.class);
+    when(mockDag.getTotalVertices()).thenReturn(2);
     TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
     TezVertexID vId1 = TezVertexID.fromString("vertex_1436907267600_195589_1_01");
     TezTaskID tId0 = TezTaskID.getInstance(vId0, 0);
@@ -90,6 +101,7 @@ public class TestDAGScheduler {
     Vertex mockVertex = mock(Vertex.class);
     when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex);
     when(mockVertex.getDistanceFromRoot()).thenReturn(0);
+    when(mockVertex.getVertexId()).thenReturn(vId0);
     
     DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
         mockEventHandler);