You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/11/10 17:30:09 UTC
tez git commit: TEZ-3491. Tez job can hang due to container priority
inversion (jlowe) (cherry picked from commit
a93dbf0b2f143930ac8c5d51e48927989c0781a0)
Repository: tez
Updated Branches:
refs/heads/branch-0.8 5280491d4 -> aff0edb87
TEZ-3491. Tez job can hang due to container priority inversion (jlowe)
(cherry picked from commit a93dbf0b2f143930ac8c5d51e48927989c0781a0)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aff0edb8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aff0edb8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aff0edb8
Branch: refs/heads/branch-0.8
Commit: aff0edb87dd61529f9cc1c580eee0fc042604aaf
Parents: 5280491
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Nov 10 17:29:47 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Nov 10 17:29:47 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dag/app/rm/YarnTaskSchedulerService.java | 46 ++++---
.../tez/dag/app/rm/TestTaskScheduler.java | 128 +++++++++++++++++++
3 files changed, 157 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/aff0edb8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index db48f91..a21554b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3491. Tez job can hang due to container priority inversion.
TEZ-3533. ShuffleScheduler should shutdown threadpool on exit.
TEZ-3493. DAG submit timeout cannot be set to a month
TEZ-3505. Move license to the file header for TezBytesWritableSerialization
http://git-wip-us.apache.org/repos/asf/tez/blob/aff0edb8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 5087d0d..41d380a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -474,7 +474,12 @@ public class YarnTaskSchedulerService extends TaskScheduler
}
// container neither allocated nor released
- LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
+ if (delayedContainer != null) {
+ LOG.info("Delayed container {} completed", containerStatus.getContainerId());
+ maybeRescheduleContainerAtPriority(delayedContainer.getContainer().getPriority());
+ } else {
+ LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
+ }
}
}
@@ -1314,23 +1319,8 @@ public class YarnTaskSchedulerService extends TaskScheduler
// to us anymore. So we need to ask for this again. If there is no
// outstanding request at that priority then its fine to not ask again.
// See TEZ-915 for more details
- for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
- Object task = entry.getKey();
- CookieContainerRequest request = entry.getValue();
- if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
- LOG.info("Resending request for task again: " + task);
- deallocateTask(task, true, null, null);
- allocateTask(task, request.getCapability(),
- (request.getNodes() == null ? null :
- request.getNodes().toArray(new String[request.getNodes().size()])),
- (request.getRacks() == null ? null :
- request.getRacks().toArray(new String[request.getRacks().size()])),
- request.getPriority(),
- request.getCookie().getContainerSignature(),
- request.getCookie().getAppCookie());
- break;
- }
- }
+ maybeRescheduleContainerAtPriority(lowestPriNewContainer.getPriority());
+
// come back and free more new containers if needed
continue;
}
@@ -1427,6 +1417,26 @@ public class YarnTaskSchedulerService extends TaskScheduler
return true;
}
+ private void maybeRescheduleContainerAtPriority(Priority priority) {
+ for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
+ Object task = entry.getKey();
+ CookieContainerRequest request = entry.getValue();
+ if (request.getPriority().equals(priority)) {
+ LOG.info("Resending request for task again: " + task);
+ deallocateTask(task, true, null, null);
+ allocateTask(task, request.getCapability(),
+ (request.getNodes() == null ? null :
+ request.getNodes().toArray(new String[request.getNodes().size()])),
+ (request.getRacks() == null ? null :
+ request.getRacks().toArray(new String[request.getRacks().size()])),
+ request.getPriority(),
+ request.getCookie().getContainerSignature(),
+ request.getCookie().getAppCookie());
+ break;
+ }
+ }
+ }
+
private boolean fitsIn(Resource toFit, Resource resource) {
// YARN-893 prevents using correct library code
//return Resources.fitsIn(toFit, resource);
http://git-wip-us.apache.org/repos/asf/tez/blob/aff0edb8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index a3e5ff5..5c8daeb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -50,6 +50,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.commons.io.IOExceptionWithCause;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -2192,6 +2193,133 @@ public class TestTaskScheduler {
Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 1));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testContainerExpired() throws Exception {
+ RackResolver.init(new YarnConfiguration());
+
+ TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
+ mock(TezAMRMClientAsync.class);
+
+ String appHost = "host";
+ int appPort = 0;
+ String appUrl = "url";
+
+ Configuration conf = new Configuration();
+ // to match all in the same pass
+ conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+ // to release immediately after deallocate
+ conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
+ conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
+
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+ final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+ TaskSchedulerWithDrainableContext scheduler =
+ new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+ scheduler.initialize();
+ drainableAppCallback.drain();
+
+ RegisterApplicationMasterResponse mockRegResponse =
+ mock(RegisterApplicationMasterResponse.class);
+ Resource mockMaxResource = mock(Resource.class);
+ Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
+ when(mockRegResponse.getMaximumResourceCapability()).
+ thenReturn(mockMaxResource);
+ when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+ when(mockRMClient.
+ registerApplicationMaster(anyString(), anyInt(), anyString())).
+ thenReturn(mockRegResponse);
+ Resource mockClusterResource = mock(Resource.class);
+ when(mockRMClient.getAvailableResources()).
+ thenReturn(mockClusterResource);
+
+ scheduler.start();
+ drainableAppCallback.drain();
+
+ Object mockTask1 = mock(Object.class);
+ when(mockTask1.toString()).thenReturn("task1");
+ Object mockCookie1 = mock(Object.class);
+ Resource mockCapability = mock(Resource.class);
+ String[] hosts = {"host1", "host5"};
+ String[] racks = {"/default-rack", "/default-rack"};
+ final Priority mockPriority1 = Priority.newInstance(1);
+ final Priority mockPriority2 = Priority.newInstance(2);
+ Object mockTask2 = mock(Object.class);
+ when(mockTask2.toString()).thenReturn("task2");
+ Object mockCookie2 = mock(Object.class);
+ ArgumentCaptor<CookieContainerRequest> requestCaptor =
+ ArgumentCaptor.forClass(CookieContainerRequest.class);
+
+ scheduler.allocateTask(mockTask2, mockCapability, hosts,
+ racks, mockPriority2, null, mockCookie2);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(1)).
+ addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request2 = requestCaptor.getValue();
+
+ scheduler.allocateTask(mockTask1, mockCapability, hosts,
+ racks, mockPriority1, null, mockCookie1);
+ drainableAppCallback.drain();
+ verify(mockRMClient, times(2)).
+ addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest request1 = requestCaptor.getValue();
+
+ List<Container> containers = new ArrayList<Container>();
+ // sending only lower priority container to make sure its not matched
+ Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(mockContainer2.getNodeId().getHost()).thenReturn("host2");
+ when(mockContainer2.getPriority()).thenReturn(mockPriority2);
+ when(mockContainer2.toString()).thenReturn("container2");
+ ContainerId mockCId2 = mock(ContainerId.class);
+ when(mockContainer2.getId()).thenReturn(mockCId2);
+ when(mockCId2.toString()).thenReturn("container2");
+ containers.add(mockContainer2);
+ ArrayList<CookieContainerRequest> hostContainers =
+ new ArrayList<CookieContainerRequest>();
+ hostContainers.add(request1);
+ final List<ArrayList<CookieContainerRequest>> hostList =
+ new LinkedList<ArrayList<CookieContainerRequest>>();
+ hostList.add(hostContainers);
+
+ when(
+ mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
+ (Resource) any())).thenAnswer(
+ new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+ @Override
+ public List<? extends Collection<CookieContainerRequest>> answer(
+ InvocationOnMock invocation) throws Throwable {
+ return hostList;
+ }
+
+ });
+ when(mockRMClient.getTopPriority()).thenReturn(mockPriority1);
+
+ scheduler.onContainersAllocated(containers);
+
+ List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+ ContainerStatus mockStatus2 = mock(ContainerStatus.class);
+ when(mockStatus2.getContainerId()).thenReturn(mockCId2);
+ statuses.add(mockStatus2);
+ scheduler.onContainersCompleted(statuses);
+
+ verify(mockApp, times(0)).taskAllocated(any(), any(), any(Container.class));
+ verify(mockRMClient, times(3)).addContainerRequest(requestCaptor.capture());
+ CookieContainerRequest resubmitRequest = requestCaptor.getValue();
+ assertEquals(request2.getCookie().getTask(), resubmitRequest.getCookie().getTask());
+ assertEquals(request2.getCookie().getAppCookie(), resubmitRequest.getCookie().getAppCookie());
+ assertEquals(request2.getCookie().getContainerSignature(), resubmitRequest.getCookie().getContainerSignature());
+ assertEquals(request2.getCapability(), resubmitRequest.getCapability());
+ assertEquals(request2.getPriority(), resubmitRequest.getPriority());
+
+ // verify container is not re-requested when nothing at that priority
+ assertFalse(scheduler.deallocateTask(mockTask2, true, null, null));
+ scheduler.onContainersAllocated(containers);
+ scheduler.onContainersCompleted(statuses);
+ verify(mockApp, times(0)).taskAllocated(any(), any(), any(Container.class));
+ verify(mockRMClient, times(3)).addContainerRequest(requestCaptor.capture());
+ }
+
private Container createContainer(int id, String host, Resource resource,
Priority priority) {
ContainerId containerID = ContainerId.newInstance(