You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2019/02/26 04:39:00 UTC

[tez] branch master updated: TEZ-4042. Speculative attempts should avoid running on the same node

This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 0093f8b  TEZ-4042. Speculative attempts should avoid running on the same node
0093f8b is described below

commit 0093f8b4d9ea38fcdfeb802fe3206c4b2ae8f077
Author: Ying Han <hy...@outlook.com>
AuthorDate: Mon Feb 25 22:38:01 2019 -0600

    TEZ-4042. Speculative attempts should avoid running on the same node
---
 .../org/apache/tez/dag/app/dag/TaskAttempt.java    |  5 ++++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java      | 16 +++++++++--
 .../org/apache/tez/dag/app/dag/impl/TaskImpl.java  | 15 +++++++---
 .../tez/dag/app/rm/DagAwareYarnTaskScheduler.java  | 32 ++++++++++++++++++----
 .../tez/dag/app/rm/YarnTaskSchedulerService.java   | 12 ++++++--
 .../tez/dag/app/dag/impl/TestTaskAttempt.java      |  2 +-
 .../apache/tez/dag/app/dag/impl/TestTaskImpl.java  |  7 ++++-
 .../dag/app/rm/TestDagAwareYarnTaskScheduler.java  | 19 +++++++++++++
 .../apache/tez/dag/app/rm/TestTaskScheduler.java   | 18 ++++++++++++
 9 files changed, 110 insertions(+), 16 deletions(-)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index ba09bd9..d0fec5c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag;
 
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.yarn.api.records.Container;
@@ -136,4 +137,8 @@ public interface TaskAttempt {
    */
   long getFinishTime();
 
+  /**
+   * @return the set of nodes on which sibling attempts were running on.
+   */
+  Set<NodeId> getNodesWithSiblingRunningAttempts();
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 3107330..4608052 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -228,6 +228,12 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final boolean leafVertex;
   
   private TezTaskAttemptID creationCausalTA;
+  // Record the set of nodes on which sibling attempts were running on, at the time of
+  // this attempt being scheduled. This set is empty for original task attempt, and
+  // non-empty when current task attempt is a speculative one, in which case scheduler
+  // should try to schedule the speculative attempt on to a node other than the one(s)
+  // recorded in this set.
+  private Set<NodeId> nodesWithSiblingRunningAttempts;
   private long creationTime;
   private long scheduledTime;
 
@@ -540,7 +546,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) {
     this(attemptId, eventHandler, taskCommunicatorManagerInterface, conf, clock,
         taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
-        vertex, locationHint, taskSpec, null);
+        vertex, locationHint, taskSpec, null, null);
   }
 
   @SuppressWarnings("rawtypes")
@@ -550,7 +556,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
       Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
-      TezTaskAttemptID schedulingCausalTA) {
+      TezTaskAttemptID schedulingCausalTA, Set<NodeId> nodesWithSiblingRunningAttempts) {
 
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
@@ -566,6 +572,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.locationHint = locationHint;
     this.taskSpec = taskSpec;
     this.creationCausalTA = schedulingCausalTA;
+    this.nodesWithSiblingRunningAttempts = nodesWithSiblingRunningAttempts;
     this.creationTime = clock.getTime();
 
     this.reportedStatus = new TaskAttemptStatus(this.attemptId);
@@ -609,6 +616,11 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   @Override
+  public Set<NodeId> getNodesWithSiblingRunningAttempts() {
+    return nodesWithSiblingRunningAttempts;
+  }
+
+  @Override
   public TaskAttemptReport getReport() {
     TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
     readLock.lock();
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 9e1d85f..9e3c5a8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -26,6 +26,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -37,6 +39,7 @@ import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
@@ -149,6 +152,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   // track the status of TaskAttempt (true mean completed, false mean uncompleted)
   private final Map<Integer, Boolean> taskAttemptStatus = new HashMap<Integer,Boolean>();
 
+  private final Set<NodeId> nodesWithRunningAttempts = Collections
+      .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
+
   private static final SingleArcTransition<TaskImpl , TaskEvent>
      ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
   private static final SingleArcTransition<TaskImpl, TaskEvent>
@@ -744,7 +750,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     return new TaskAttemptImpl(attemptId, eventHandler,
         taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
         (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),
-        locationHint, taskSpec, schedulingCausalTA);
+        locationHint, taskSpec, schedulingCausalTA, nodesWithRunningAttempts);
   }
 
   @Override
@@ -1009,14 +1015,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     public void transition(TaskImpl task, TaskEvent event) {
       LOG.info("Scheduling a redundant attempt for task " + task.taskId);
       task.counters.findCounter(TaskCounter.NUM_SPECULATIONS).increment(1);
-      TezTaskAttemptID earliestUnfinishedAttempt = null;
+      TaskAttempt earliestUnfinishedAttempt = null;
       for (TaskAttempt ta : task.attempts.values()) {
         // find the oldest running attempt
         if (!ta.isFinished()) {
-          earliestUnfinishedAttempt = ta.getID();
+          earliestUnfinishedAttempt = ta;
+          task.nodesWithRunningAttempts.add(ta.getNodeId());
         }
       }
-      task.addAndScheduleAttempt(earliestUnfinishedAttempt);
+      task.addAndScheduleAttempt(earliestUnfinishedAttempt.getID());
     }
   }
 
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
index 1cdc217..3191c48 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.serviceplugins.api.DagInfo;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
@@ -398,6 +399,9 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
       for (Collection<TaskRequest> requests : results) {
         if (!requests.isEmpty()) {
           TaskRequest request = requests.iterator().next();
+          if (maybeChangeNode(request, hc.getContainer().getNodeId())) {
+            continue;
+          }
           assignContainer(request, hc, location);
           assignments.add(new Assignment(request, hc.getContainer()));
           return;
@@ -515,6 +519,9 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
         if (requestTracker.isRequestBlocked(request)) {
           LOG.debug("Cannot assign task {} to container {} since vertex {} is a descendant of pending tasks",
               request.getTask(), hc.getId(), request.getVertexIndex());
+        } else if (maybeChangeNode(request, hc.getContainer().getNodeId())) {
+          LOG.debug("Cannot assign task {} to container {} since node {} is running sibling attempts",
+              request.getTask(), hc.getId(), request.getVertexIndex());
         } else {
           assignContainer(request, hc, hc.getId());
           return request;
@@ -543,8 +550,10 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
 
         Object signature = hc.getSignature();
         if (signature == null || signatureMatcher.isSuperSet(signature, request.getContainerSignature())) {
-          assignContainer(request, hc, matchLocation);
-          return request;
+          if (!maybeChangeNode(request, hc.getContainer().getNodeId())) {
+            assignContainer(request, hc, matchLocation);
+            return request;
+          }
         }
       }
     }
@@ -1050,7 +1059,7 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
     ContainerId affinity = request.getAffinity();
     if (affinity != null) {
       HeldContainer hc = heldContainers.get(affinity);
-      if (hc != null && hc.isAssignable()) {
+      if (hc != null && hc.isAssignable() && !maybeChangeNode(request, hc.getContainer().getNodeId())) {
         assignContainer(request, hc, affinity);
         return hc;
       }
@@ -1099,12 +1108,13 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
         if (eligibleStates.contains(hc.getState())) {
           Object csig = hc.getSignature();
           if (csig == null || signatureMatcher.isSuperSet(csig, request.getContainerSignature())) {
+            boolean needToChangeNode = maybeChangeNode(request, hc.getContainer().getNodeId());
             int numAffinities = hc.getNumAffinities();
-            if (numAffinities == 0) {
+            if (numAffinities == 0 && !needToChangeNode) {
               bestMatch = hc;
               break;
             }
-            if (bestMatch == null || numAffinities < bestMatch.getNumAffinities()) {
+            if ((bestMatch == null || numAffinities < bestMatch.getNumAffinities()) && !needToChangeNode) {
               bestMatch = hc;
             }
           } else {
@@ -1119,6 +1129,18 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
     return bestMatch;
   }
 
+  private boolean maybeChangeNode(TaskRequest request, NodeId nodeId) {
+    Object task = request.getTask();
+    if (task instanceof TaskAttempt) {
+      Set<NodeId> nodesWithSiblingRunningAttempts = ((TaskAttempt) task).getNodesWithSiblingRunningAttempts();
+      if (nodesWithSiblingRunningAttempts != null
+          && nodesWithSiblingRunningAttempts.contains(nodeId)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public void setShouldUnregister() {
     shouldUnregister = true;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 95cd85b..f128ec9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.common.ContainerSignatureMatcher;
@@ -118,7 +119,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
       new HashMap<ContainerId, HeldContainer>();
   
   Set<Priority> priorityHasAffinity = Sets.newHashSet();
-  
+
   Set<NodeId> blacklistedNodes = Collections
       .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
   
@@ -1533,6 +1534,11 @@ public class YarnTaskSchedulerService extends TaskScheduler
   private boolean canAssignTaskToContainer(
       CookieContainerRequest cookieContainerRequest, Container container) {
     HeldContainer heldContainer = heldContainers.get(container.getId());
+    Object task = getTask(cookieContainerRequest);
+    if (task instanceof TaskAttempt
+        && ((TaskAttempt) task).getNodesWithSiblingRunningAttempts().contains(container.getNodeId())) {
+      return false;
+    }
     if (heldContainer == null || heldContainer.isNew()) { // New container.
       return true;
     } else {
@@ -1784,9 +1790,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
       Container container = entry.getValue();
       // check for blacklisted nodes. There may be race conditions between
       // setting blacklist and receiving allocations
+      CookieContainerRequest request = entry.getKey();
+      Object task = getTask(request);
       if (blacklistedNodes.contains(container.getNodeId())) {
-        CookieContainerRequest request = entry.getKey();
-        Object task = getTask(request);
         LOG.info("Container: " + container.getId() + 
             " allocated on blacklisted node: " + container.getNodeId() + 
             " for task: " + task);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 5038810..ce3e7e5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -2194,7 +2194,7 @@ public class TestTaskAttempt {
           eventHandler, tal, conf,
           clock, taskHeartbeatHandler, appContext,
           isRescheduled, resource, containerContext, leafVertex, mockVertex,
-          locationHint, null, null);
+          locationHint, null, null, null);
     }
     
     boolean inputFailedReported = false;
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index b142bb9..51a4bdf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -1366,7 +1366,7 @@ public class TestTaskImpl {
         Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
       super(attemptId, eventHandler, tal, conf, clock, thh,
           appContext, isRescheduled, resource, containerContext, false, null,
-          locationHint, mockTaskSpec, schedCausalTA);
+          locationHint, mockTaskSpec, schedCausalTA, null);
     }
 
     @Override
@@ -1401,6 +1401,11 @@ public class TestTaskImpl {
     public ContainerId getAssignedContainerID() {
       return mockContainerId;
     }
+
+    @Override
+    public NodeId getNodeId() {
+      return mockNodeId;
+    }
   }
 
   public class ServiceBusyEvent extends TezAbstractEvent<TaskAttemptEventType>
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
index 911f4b1..c979a7a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app.rm;
 
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.MockClock;
+import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.AMRMClientAsyncWrapper;
 import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.HeldContainer;
 import org.apache.tez.dag.app.rm.DagAwareYarnTaskScheduler.TaskRequest;
@@ -343,6 +345,23 @@ public class TestDagAwareYarnTaskScheduler {
     verify(mockRMClient, times(8)).addContainerRequest(any(TaskRequest.class));
     assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
 
+    // test speculative node adjustment
+    String speculativeNode = "host8";
+    NodeId speculativeNodeId = mock(NodeId.class);
+    when(speculativeNodeId.getHost()).thenReturn(speculativeNode);
+    TaskAttempt mockTask5 = mock(TaskAttempt.class);
+    when(mockTask5.getNodesWithSiblingRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
+    Object mockCookie5 = new Object();
+    scheduler.allocateTask(mockTask5, mockCapability, hosts, racks,
+        mockPriority, null, mockCookie5);
+    drainableAppCallback.drain();
+    // no new allocation
+    verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
+    // verify container released
+    verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
+    // verify request added back
+    verify(mockRMClient, times(9)).addContainerRequest(requestCaptor.capture());
+
     List<NodeReport> mockUpdatedNodes = mock(List.class);
     scheduler.onNodesUpdated(mockUpdatedNodes);
     drainableAppCallback.drain();
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 49f8fe3..21f4c52 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
@@ -346,6 +347,23 @@ public class TestTaskScheduler {
         (CookieContainerRequest) any());
     assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
 
+    // test speculative node adjustment
+    String speculativeNode = "host8";
+    NodeId speculativeNodeId = mock(NodeId.class);
+    when(speculativeNodeId.getHost()).thenReturn(speculativeNode);
+    TaskAttempt mockTask5 = mock(TaskAttempt.class);
+    when(mockTask5.getNodesWithSiblingRunningAttempts()).thenReturn(Sets.newHashSet(speculativeNodeId));
+    Object mockCookie5 = new Object();
+    scheduler.allocateTask(mockTask5, mockCapability, hosts, racks,
+        mockPriority, null, mockCookie5);
+    drainableAppCallback.drain();
+    // no new allocation
+    verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
+    // verify container released
+    verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
+    // verify request added back
+    verify(mockRMClient, times(9)).addContainerRequest(requestCaptor.capture());
+
     List<NodeReport> mockUpdatedNodes = mock(List.class);
     scheduler.onNodesUpdated(mockUpdatedNodes);
     drainableAppCallback.drain();