You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/03/22 20:56:49 UTC

[tez] branch master updated: TEZ-4045. Task should be accessible from TaskAttempt

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new d5675c3  TEZ-4045. Task should be accessible from TaskAttempt
d5675c3 is described below

commit d5675c332497c1ac1dedefdf91e87476b5c0d7a9
Author: Ying Han <hy...@outlook.com>
AuthorDate: Fri Mar 22 15:56:39 2019 -0500

    TEZ-4045. Task should be accessible from TaskAttempt
    
    Signed-off-by: Jonathan Eagles <je...@apache.org>
---
 .../main/java/org/apache/tez/dag/app/dag/Task.java |  8 ++++++
 .../org/apache/tez/dag/app/dag/TaskAttempt.java    |  9 ++----
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java      | 32 ++++++++++------------
 .../org/apache/tez/dag/app/dag/impl/TaskImpl.java  | 13 +++++++--
 .../tez/dag/app/rm/DagAwareYarnTaskScheduler.java  |  2 +-
 .../tez/dag/app/rm/YarnTaskSchedulerService.java   |  7 +++--
 .../tez/dag/app/dag/impl/TestTaskAttempt.java      |  8 ++++--
 .../apache/tez/dag/app/dag/impl/TestTaskImpl.java  |  4 +--
 .../dag/app/rm/TestDagAwareYarnTaskScheduler.java  |  5 +++-
 .../apache/tez/dag/app/rm/TestTaskScheduler.java   |  5 +++-
 10 files changed, 56 insertions(+), 37 deletions(-)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index d1b9b2a..c1fe7f7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -21,7 +21,9 @@ package org.apache.tez.dag.app.dag;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskReport;
@@ -73,4 +75,10 @@ public interface Task {
   long getFirstAttemptStartTime();
 
   long getFinishTime();
+
+  /**
+   * @return set of nodes on which previous attempts were running on, at the time
+   * of latest attempt being scheduled.
+   */
+  Set<NodeId> getNodesWithRunningAttempts();
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index d0fec5c..0fc7013 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -19,7 +19,6 @@
 package org.apache.tez.dag.app.dag;
 
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.yarn.api.records.Container;
@@ -72,7 +71,8 @@ public interface TaskAttempt {
   TezTaskID getTaskID();
   TezVertexID getVertexID();
   TezDAGID getDAGID();
-  
+
+  Task getTask();
   TaskAttemptReport getReport();
   List<String> getDiagnostics();
   TaskAttemptTerminationCause getTerminationCause();
@@ -136,9 +136,4 @@ public interface TaskAttempt {
    *  yet, returns 0.
    */
   long getFinishTime();
-
-  /**
-   * @return the set of nodes on which sibling attempts were running on.
-   */
-  Set<NodeId> getNodesWithSiblingRunningAttempts();
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 4608052..ade7bc7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -77,6 +77,7 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
@@ -198,6 +199,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   private String nodeRackName;
   
   private final Vertex vertex;
+  private final Task task;
   private final TaskLocationHint locationHint;
   private final TaskSpec taskSpec;
 
@@ -228,12 +230,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final boolean leafVertex;
   
   private TezTaskAttemptID creationCausalTA;
-  // Record the set of nodes on which sibling attempts were running on, at the time of
-  // this attempt being scheduled. This set is empty for original task attempt, and
-  // non-empty when current task attempt is a speculative one, in which case scheduler
-  // should try to schedule the speculative attempt on to a node other than the one(s)
-  // recorded in this set.
-  private Set<NodeId> nodesWithSiblingRunningAttempts;
   private long creationTime;
   private long scheduledTime;
 
@@ -543,10 +539,10 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
-      Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) {
+      Task task, TaskLocationHint locationHint, TaskSpec taskSpec) {
     this(attemptId, eventHandler, taskCommunicatorManagerInterface, conf, clock,
         taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
-        vertex, locationHint, taskSpec, null, null);
+        task, locationHint, taskSpec, null);
   }
 
   @SuppressWarnings("rawtypes")
@@ -555,8 +551,8 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
-      Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
-      TezTaskAttemptID schedulingCausalTA, Set<NodeId> nodesWithSiblingRunningAttempts) {
+      Task task, TaskLocationHint locationHint, TaskSpec taskSpec,
+      TezTaskAttemptID schedulingCausalTA) {
 
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
@@ -568,11 +564,11 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.clock = clock;
     this.taskHeartbeatHandler = taskHeartbeatHandler;
     this.appContext = appContext;
-    this.vertex = vertex;
+    this.vertex = task.getVertex();
+    this.task = task;
     this.locationHint = locationHint;
     this.taskSpec = taskSpec;
     this.creationCausalTA = schedulingCausalTA;
-    this.nodesWithSiblingRunningAttempts = nodesWithSiblingRunningAttempts;
     this.creationTime = clock.getTime();
 
     this.reportedStatus = new TaskAttemptStatus(this.attemptId);
@@ -616,11 +612,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   @Override
-  public Set<NodeId> getNodesWithSiblingRunningAttempts() {
-    return nodesWithSiblingRunningAttempts;
-  }
-
-  @Override
   public TaskAttemptReport getReport() {
     TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
     readLock.lock();
@@ -866,7 +857,12 @@ public class TaskAttemptImpl implements TaskAttempt,
       readLock.unlock();
     }
   }
-  
+
+  @Override
+  public Task getTask() {
+    return task;
+  }
+
   Vertex getVertex() {
     return vertex;
   }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 9e3c5a8..9289d8f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -152,6 +152,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   // track the status of TaskAttempt (true mean completed, false mean uncompleted)
   private final Map<Integer, Boolean> taskAttemptStatus = new HashMap<Integer,Boolean>();
 
+  // The set of nodes with active running attempts at the time of the latest attempt for
+  // this task was scheduled. This set is empty when scheduling original task attempt, and
+  // non-empty scheduling a speculative attempt, in which case scheduler should avoid
+  // scheduling the speculative attempt onto node(s) recorded in this set.
   private final Set<NodeId> nodesWithRunningAttempts = Collections
       .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
 
@@ -584,6 +588,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
+  @Override
+  public Set<NodeId> getNodesWithRunningAttempts() {
+    return nodesWithRunningAttempts;
+  }
+
   @VisibleForTesting
   public TaskStateInternal getInternalState() {
     readLock.lock();
@@ -749,8 +758,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         baseTaskSpec.getTaskConf());
     return new TaskAttemptImpl(attemptId, eventHandler,
         taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
-        (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),
-        locationHint, taskSpec, schedulingCausalTA, nodesWithRunningAttempts);
+        (failedAttempts > 0), taskResource, containerContext, leafVertex, this,
+        locationHint, taskSpec, schedulingCausalTA);
   }
 
   @Override
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
index 3191c48..6a78425 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
@@ -1132,7 +1132,7 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
   private boolean maybeChangeNode(TaskRequest request, NodeId nodeId) {
     Object task = request.getTask();
     if (task instanceof TaskAttempt) {
-      Set<NodeId> nodesWithSiblingRunningAttempts = ((TaskAttempt) task).getNodesWithSiblingRunningAttempts();
+      Set<NodeId> nodesWithSiblingRunningAttempts = ((TaskAttempt) task).getTask().getNodesWithRunningAttempts();
       if (nodesWithSiblingRunningAttempts != null
           && nodesWithSiblingRunningAttempts.contains(nodeId)) {
         return true;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index f128ec9..a327967 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -1536,7 +1536,8 @@ public class YarnTaskSchedulerService extends TaskScheduler
     HeldContainer heldContainer = heldContainers.get(container.getId());
     Object task = getTask(cookieContainerRequest);
     if (task instanceof TaskAttempt
-        && ((TaskAttempt) task).getNodesWithSiblingRunningAttempts().contains(container.getNodeId())) {
+        && ((TaskAttempt) task).getTask() != null
+        && ((TaskAttempt) task).getTask().getNodesWithRunningAttempts().contains(container.getNodeId())) {
       return false;
     }
     if (heldContainer == null || heldContainer.isNew()) { // New container.
@@ -1790,9 +1791,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
       Container container = entry.getValue();
       // check for blacklisted nodes. There may be race conditions between
       // setting blacklist and receiving allocations
-      CookieContainerRequest request = entry.getKey();
-      Object task = getTask(request);
       if (blacklistedNodes.contains(container.getNodeId())) {
+        CookieContainerRequest request = entry.getKey();
+        Object task = getTask(request);
         LOG.info("Container: " + container.getId() + 
             " allocated on blacklisted node: " + container.getNodeId() + 
             " for task: " + task);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index ce3e7e5..41cce3b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -83,6 +83,7 @@ import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskCommunicatorWrapper;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
@@ -145,6 +146,7 @@ public class TestTaskAttempt {
   TezConfiguration vertexConf = new TezConfiguration();
   TaskLocationHint locationHint;
   Vertex mockVertex;
+  Task mockTask;
   ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
       .setContainerLauncherName(TezConstants.getTezYarnServicePluginName());
 
@@ -161,6 +163,8 @@ public class TestTaskAttempt {
         TezConstants.getTezYarnServicePluginName());
 
     createMockVertex(vertexConf);
+    mockTask = mock(Task.class);
+    when(mockTask.getVertex()).thenReturn(mockVertex);
 
     HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
     doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
@@ -2193,8 +2197,8 @@ public class TestTaskAttempt {
       super(TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber),
           eventHandler, tal, conf,
           clock, taskHeartbeatHandler, appContext,
-          isRescheduled, resource, containerContext, leafVertex, mockVertex,
-          locationHint, null, null, null);
+          isRescheduled, resource, containerContext, leafVertex, mockTask,
+          locationHint, null, null);
     }
     
     boolean inputFailedReported = false;
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 51a4bdf..81cd675 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -1365,8 +1365,8 @@ public class TestTaskImpl {
         boolean isRescheduled,
         Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
       super(attemptId, eventHandler, tal, conf, clock, thh,
-          appContext, isRescheduled, resource, containerContext, false, null,
-          locationHint, mockTaskSpec, schedCausalTA, null);
+          appContext, isRescheduled, resource, containerContext, false, mockTask,
+          locationHint, mockTaskSpec, schedCausalTA);
     }
 
     @Override
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
index c979a7a..ad0cf07 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.MockClock;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.AMRMClientAsyncWrapper;
 import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.HeldContainer;
@@ -350,7 +351,9 @@ public class TestDagAwareYarnTaskScheduler {
     NodeId speculativeNodeId = mock(NodeId.class);
     when(speculativeNodeId.getHost()).thenReturn(speculativeNode);
     TaskAttempt mockTask5 = mock(TaskAttempt.class);
-    when(mockTask5.getNodesWithSiblingRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
+    Task task = mock(Task.class);
+    when(mockTask5.getTask()).thenReturn(task);
+    when(task.getNodesWithRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
     Object mockCookie5 = new Object();
     scheduler.allocateTask(mockTask5, mockCapability, hosts, racks,
         mockPriority, null, mockCookie5);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 21f4c52..965b8d7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
@@ -352,7 +353,9 @@ public class TestTaskScheduler {
     NodeId speculativeNodeId = mock(NodeId.class);
     when(speculativeNodeId.getHost()).thenReturn(speculativeNode);
     TaskAttempt mockTask5 = mock(TaskAttempt.class);
-    when(mockTask5.getNodesWithSiblingRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
+    Task task = mock(Task.class);
+    when(mockTask5.getTask()).thenReturn(task);
+    when(task.getNodesWithRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
     Object mockCookie5 = new Object();
     scheduler.allocateTask(mockTask5, mockCapability, hosts, racks,
         mockPriority, null, mockCookie5);