You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/11/10 17:30:09 UTC

tez git commit: TEZ-3491. Tez job can hang due to container priority inversion (jlowe) (cherry picked from commit a93dbf0b2f143930ac8c5d51e48927989c0781a0)

Repository: tez
Updated Branches:
  refs/heads/branch-0.8 5280491d4 -> aff0edb87


TEZ-3491. Tez job can hang due to container priority inversion (jlowe)
(cherry picked from commit a93dbf0b2f143930ac8c5d51e48927989c0781a0)

Conflicts:

	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aff0edb8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aff0edb8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aff0edb8

Branch: refs/heads/branch-0.8
Commit: aff0edb87dd61529f9cc1c580eee0fc042604aaf
Parents: 5280491
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Nov 10 17:29:47 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Nov 10 17:29:47 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../dag/app/rm/YarnTaskSchedulerService.java    |  46 ++++---
 .../tez/dag/app/rm/TestTaskScheduler.java       | 128 +++++++++++++++++++
 3 files changed, 157 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/aff0edb8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index db48f91..a21554b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3491. Tez job can hang due to container priority inversion.
   TEZ-3533. ShuffleScheduler should shutdown threadpool on exit.
   TEZ-3493. DAG submit timeout cannot be set to a month
   TEZ-3505. Move license to the file header for TezBytesWritableSerialization

http://git-wip-us.apache.org/repos/asf/tez/blob/aff0edb8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 5087d0d..41d380a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -474,7 +474,12 @@ public class YarnTaskSchedulerService extends TaskScheduler
         }
 
         // container neither allocated nor released
-        LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
+        if (delayedContainer != null) {
+          LOG.info("Delayed container {} completed", containerStatus.getContainerId());
+          maybeRescheduleContainerAtPriority(delayedContainer.getContainer().getPriority());
+        } else {
+          LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
+        }
       }
     }
 
@@ -1314,23 +1319,8 @@ public class YarnTaskSchedulerService extends TaskScheduler
           // to us anymore. So we need to ask for this again. If there is no
           // outstanding request at that priority then its fine to not ask again.
           // See TEZ-915 for more details
-          for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
-            Object task = entry.getKey();
-            CookieContainerRequest request = entry.getValue();
-            if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
-              LOG.info("Resending request for task again: " + task);
-              deallocateTask(task, true, null, null);
-              allocateTask(task, request.getCapability(), 
-                  (request.getNodes() == null ? null : 
-                    request.getNodes().toArray(new String[request.getNodes().size()])), 
-                    (request.getRacks() == null ? null : 
-                      request.getRacks().toArray(new String[request.getRacks().size()])), 
-                    request.getPriority(), 
-                    request.getCookie().getContainerSignature(),
-                    request.getCookie().getAppCookie());
-              break;
-            }
-          }
+          maybeRescheduleContainerAtPriority(lowestPriNewContainer.getPriority());
+
           // come back and free more new containers if needed
           continue;
         }
@@ -1427,6 +1417,26 @@ public class YarnTaskSchedulerService extends TaskScheduler
     return true;
   }
 
+  private void maybeRescheduleContainerAtPriority(Priority priority) {
+    for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
+      Object task = entry.getKey();
+      CookieContainerRequest request = entry.getValue();
+      if (request.getPriority().equals(priority)) {
+        LOG.info("Resending request for task again: " + task);
+        deallocateTask(task, true, null, null);
+        allocateTask(task, request.getCapability(),
+            (request.getNodes() == null ? null :
+              request.getNodes().toArray(new String[request.getNodes().size()])),
+              (request.getRacks() == null ? null :
+                request.getRacks().toArray(new String[request.getRacks().size()])),
+                request.getPriority(),
+                request.getCookie().getContainerSignature(),
+                request.getCookie().getAppCookie());
+        break;
+      }
+    }
+  }
+
   private boolean fitsIn(Resource toFit, Resource resource) {
     // YARN-893 prevents using correct library code
     //return Resources.fitsIn(toFit, resource);

http://git-wip-us.apache.org/repos/asf/tez/blob/aff0edb8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index a3e5ff5..5c8daeb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -50,6 +50,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.commons.io.IOExceptionWithCause;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -2192,6 +2193,133 @@ public class TestTaskScheduler {
     Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 1));
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testContainerExpired() throws Exception {
+    RackResolver.init(new YarnConfiguration());
+
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
+                                                  mock(TezAMRMClientAsync.class);
+
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+
+    Configuration conf = new Configuration();
+    // to match all in the same pass
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+    // to release immediately after deallocate
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
+    conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
+
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+    TaskSchedulerWithDrainableContext scheduler =
+        new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+    scheduler.initialize();
+    drainableAppCallback.drain();
+
+    RegisterApplicationMasterResponse mockRegResponse =
+                                mock(RegisterApplicationMasterResponse.class);
+    Resource mockMaxResource = mock(Resource.class);
+    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
+    when(mockRegResponse.getMaximumResourceCapability()).
+                                                   thenReturn(mockMaxResource);
+    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+    when(mockRMClient.
+          registerApplicationMaster(anyString(), anyInt(), anyString())).
+                                                   thenReturn(mockRegResponse);
+    Resource mockClusterResource = mock(Resource.class);
+    when(mockRMClient.getAvailableResources()).
+                                              thenReturn(mockClusterResource);
+
+    scheduler.start();
+    drainableAppCallback.drain();
+
+    Object mockTask1 = mock(Object.class);
+    when(mockTask1.toString()).thenReturn("task1");
+    Object mockCookie1 = mock(Object.class);
+    Resource mockCapability = mock(Resource.class);
+    String[] hosts = {"host1", "host5"};
+    String[] racks = {"/default-rack", "/default-rack"};
+    final Priority mockPriority1 = Priority.newInstance(1);
+    final Priority mockPriority2 = Priority.newInstance(2);
+    Object mockTask2 = mock(Object.class);
+    when(mockTask2.toString()).thenReturn("task2");
+    Object mockCookie2 = mock(Object.class);
+    ArgumentCaptor<CookieContainerRequest> requestCaptor =
+        ArgumentCaptor.forClass(CookieContainerRequest.class);
+
+    scheduler.allocateTask(mockTask2, mockCapability, hosts,
+        racks, mockPriority2, null, mockCookie2);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(1)).
+                                addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request2 = requestCaptor.getValue();
+
+    scheduler.allocateTask(mockTask1, mockCapability, hosts,
+        racks, mockPriority1, null, mockCookie1);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(2)).
+                                addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request1 = requestCaptor.getValue();
+
+    List<Container> containers = new ArrayList<Container>();
+    // sending only lower priority container to make sure its not matched
+    Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer2.getNodeId().getHost()).thenReturn("host2");
+    when(mockContainer2.getPriority()).thenReturn(mockPriority2);
+    when(mockContainer2.toString()).thenReturn("container2");
+    ContainerId mockCId2 = mock(ContainerId.class);
+    when(mockContainer2.getId()).thenReturn(mockCId2);
+    when(mockCId2.toString()).thenReturn("container2");
+    containers.add(mockContainer2);
+    ArrayList<CookieContainerRequest> hostContainers =
+                             new ArrayList<CookieContainerRequest>();
+    hostContainers.add(request1);
+    final List<ArrayList<CookieContainerRequest>> hostList =
+                        new LinkedList<ArrayList<CookieContainerRequest>>();
+    hostList.add(hostContainers);
+
+    when(
+        mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
+            (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return hostList;
+          }
+
+        });
+    when(mockRMClient.getTopPriority()).thenReturn(mockPriority1);
+
+    scheduler.onContainersAllocated(containers);
+
+    List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+    ContainerStatus mockStatus2 = mock(ContainerStatus.class);
+    when(mockStatus2.getContainerId()).thenReturn(mockCId2);
+    statuses.add(mockStatus2);
+    scheduler.onContainersCompleted(statuses);
+
+    verify(mockApp, times(0)).taskAllocated(any(), any(), any(Container.class));
+    verify(mockRMClient, times(3)).addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest resubmitRequest = requestCaptor.getValue();
+    assertEquals(request2.getCookie().getTask(), resubmitRequest.getCookie().getTask());
+    assertEquals(request2.getCookie().getAppCookie(), resubmitRequest.getCookie().getAppCookie());
+    assertEquals(request2.getCookie().getContainerSignature(), resubmitRequest.getCookie().getContainerSignature());
+    assertEquals(request2.getCapability(), resubmitRequest.getCapability());
+    assertEquals(request2.getPriority(), resubmitRequest.getPriority());
+
+    // verify container is not re-requested when nothing at that priority
+    assertFalse(scheduler.deallocateTask(mockTask2, true, null, null));
+    scheduler.onContainersAllocated(containers);
+    scheduler.onContainersCompleted(statuses);
+    verify(mockApp, times(0)).taskAllocated(any(), any(), any(Container.class));
+    verify(mockRMClient, times(3)).addContainerRequest(requestCaptor.capture());
+  }
+
   private Container createContainer(int id, String host, Resource resource,
       Priority priority) {
     ContainerId containerID = ContainerId.newInstance(