You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/06/03 02:20:33 UTC
tez git commit: TEZ-2509. YarnTaskSchedulerService should not try to
allocate containers if AM is shutting down. (hitesh)
Repository: tez
Updated Branches:
refs/heads/master cd068538d -> 631c3e9c6
TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/631c3e9c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/631c3e9c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/631c3e9c
Branch: refs/heads/master
Commit: 631c3e9c627603ac53426d5a0b120b426fb6712c
Parents: cd06853
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Jun 2 17:20:20 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Jun 2 17:20:20 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../java/org/apache/tez/dag/app/AppContext.java | 4 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 6 ++
.../dag/app/rm/YarnTaskSchedulerService.java | 34 ++++---
.../tez/dag/app/rm/TestContainerReuse.java | 95 ++++++++++++++++++--
5 files changed, 120 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/631c3e9c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1780eac..b4c4967 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down.
TEZ-2506. TestAysncHttpConnection failing.
TEZ-2503. findbugs version isn't reported properly in test-patch report.
TEZ-2198. Fix sorter spill counts.
@@ -31,6 +32,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down.
TEZ-2527. Tez UI: Application hangs on entering erroneous RegEx in counter table search box
TEZ-2523. Tez UI: derive applicationId from dag/vertex id instead of relying on json data
TEZ-2505. PipelinedSorter uses Comparator objects concurrently from multiple threads.
@@ -229,6 +231,7 @@ Release 0.6.2: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down.
TEZ-2483. Tez should close task if processor fail
Release 0.6.1: 2015-05-18
http://git-wip-us.apache.org/repos/asf/tez/blob/631c3e9c/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index c005447..e909d80 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -108,4 +108,8 @@ public interface AppContext {
String[] getLocalDirs();
String getAMUser();
+
+ /** Whether the AM is in the process of shutting down/completing */
+ boolean isAMInCompletionState();
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/631c3e9c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 3805b6c..b2c77dc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1468,6 +1468,12 @@ public class DAGAppMaster extends AbstractService {
}
@Override
+ public boolean isAMInCompletionState() {
+ return EnumSet.of(DAGAppMasterState.SUCCEEDED, DAGAppMasterState.KILLED, DAGAppMasterState.FAILED,
+ DAGAppMasterState.ERROR).contains(state);
+ }
+
+ @Override
public Map<ApplicationAccessType, String> getApplicationACLs() {
if (getServiceState() != STATE.STARTED) {
throw new TezUncheckedException(
http://git-wip-us.apache.org/repos/asf/tez/blob/631c3e9c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 44f5484..19902b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -527,14 +527,18 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
private synchronized Map<CookieContainerRequest, Container>
assignNewlyAllocatedContainers(Iterable<Container> containers) {
+ boolean amInCompletionState = appContext.isAMInCompletionState();
Map<CookieContainerRequest, Container> assignedContainers =
new HashMap<CookieContainerRequest, Container>();
- assignNewContainersWithLocation(containers,
- NODE_LOCAL_ASSIGNER, assignedContainers);
- assignNewContainersWithLocation(containers,
- RACK_LOCAL_ASSIGNER, assignedContainers);
- assignNewContainersWithLocation(containers,
- NON_LOCAL_ASSIGNER, assignedContainers);
+
+ if (!amInCompletionState) {
+ assignNewContainersWithLocation(containers,
+ NODE_LOCAL_ASSIGNER, assignedContainers);
+ assignNewContainersWithLocation(containers,
+ RACK_LOCAL_ASSIGNER, assignedContainers);
+ assignNewContainersWithLocation(containers,
+ NON_LOCAL_ASSIGNER, assignedContainers);
+ }
// Release any unassigned containers given by the RM
releaseUnassignedContainers(containers);
@@ -545,16 +549,19 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
private synchronized Map<CookieContainerRequest, Container>
tryAssignReUsedContainers(Iterable<Container> containers) {
+ boolean amInCompletionState = appContext.isAMInCompletionState();
Map<CookieContainerRequest, Container> assignedContainers =
new HashMap<CookieContainerRequest, Container>();
// Honor locality and match as many as possible
- assignReUsedContainersWithLocation(containers,
- NODE_LOCAL_ASSIGNER, assignedContainers, true);
- assignReUsedContainersWithLocation(containers,
- RACK_LOCAL_ASSIGNER, assignedContainers, true);
- assignReUsedContainersWithLocation(containers,
- NON_LOCAL_ASSIGNER, assignedContainers, true);
+ if (!amInCompletionState) {
+ assignReUsedContainersWithLocation(containers,
+ NODE_LOCAL_ASSIGNER, assignedContainers, true);
+ assignReUsedContainersWithLocation(containers,
+ RACK_LOCAL_ASSIGNER, assignedContainers, true);
+ assignReUsedContainersWithLocation(containers,
+ NON_LOCAL_ASSIGNER, assignedContainers, true);
+ }
return assignedContainers;
}
@@ -583,6 +590,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
assignDelayedContainer(HeldContainer heldContainer) {
DAGAppMasterState state = appContext.getAMState();
+
boolean isNew = heldContainer.isNew();
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to assign a delayed container"
@@ -658,7 +666,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
heldContainer.getContainer(), currentTime
+ localitySchedulingDelay);
}
- } else if (state.equals(DAGAppMasterState.RUNNING)) {
+ } else if (state.equals(DAGAppMasterState.RUNNING)) {
// clear min held containers since we need to allocate to tasks
if (!sessionMinHeldContainers.isEmpty()) {
// update the expire time of min held containers so that they are
http://git-wip-us.apache.org/repos/asf/tez/blob/631c3e9c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 89b77a7..79450a9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -355,8 +355,8 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(lrTa31);
taskSchedulerEventHandler.handleEvent(
- new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
- TaskAttemptState.SUCCEEDED));
+ new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
+ TaskAttemptState.SUCCEEDED));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta21), eq(true));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(
@@ -846,9 +846,9 @@ public class TestContainerReuse {
Configuration tezConf = new Configuration(new YarnConfiguration());
tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
tezConf.setLong(
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1l);
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1l);
tezConf.setLong(
- TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 2000l);
+ TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 2000l);
tezConf.setInt(
TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1);
@@ -914,7 +914,7 @@ public class TestContainerReuse {
TaskAttempt ta11 = mock(TaskAttempt.class);
doReturn(vertexID1).when(ta11).getVertexID();
AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(
- taID11, ta11, resource1, host1, racks, priority1);
+ taID11, ta11, resource1, host1, racks, priority1);
//Vertex2, Task1, Attempt 1, host1
TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(
@@ -945,8 +945,8 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Container should be assigned to task21.
taskSchedulerEventHandler.handleEvent(
- new AMSchedulerEventTAEnded(ta11, container1.getId(),
- TaskAttemptState.SUCCEEDED));
+ new AMSchedulerEventTAEnded(ta11, container1.getId(),
+ TaskAttemptState.SUCCEEDED));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
verify(taskSchedulerEventHandler).taskAllocated(
@@ -955,8 +955,8 @@ public class TestContainerReuse {
// Task 2 completes.
taskSchedulerEventHandler.handleEvent(
- new AMSchedulerEventTAEnded(ta21, container1.getId(),
- TaskAttemptState.SUCCEEDED));
+ new AMSchedulerEventTAEnded(ta21, container1.getId(),
+ TaskAttemptState.SUCCEEDED));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
LOG.info("Sleeping to ensure that the scheduling loop runs");
@@ -1294,6 +1294,83 @@ public class TestContainerReuse {
taskSchedulerEventHandler.close();
}
+ @Test(timeout = 10000l)
+ public void testAssignmentOnShutdown()
+ throws IOException, InterruptedException, ExecutionException {
+ LOG.info("Test testAssignmentOnShutdown");
+ Configuration tezConf = new Configuration(new YarnConfiguration());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+ tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+ tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
+ tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
+ TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+
+ CapturingEventHandler eventHandler = new CapturingEventHandler();
+ TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+
+ AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
+ TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
+ String appUrl = "url";
+ String appMsg = "success";
+ AppFinalStatus finalStatus =
+ new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+
+ doReturn(finalStatus).when(mockApp).getFinalAppStatus();
+
+ AppContext appContext = mock(AppContext.class);
+ doReturn(new Configuration(false)).when(appContext).getAMConf();
+ AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
+ mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+ AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+ doReturn(amContainerMap).when(appContext).getAllContainers();
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+ doReturn(true).when(appContext).isAMInCompletionState();
+ doReturn(dagID).when(appContext).getCurrentDAGID();
+ doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+ TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
+ new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+ new AlwaysMatchesContainerMatcher());
+ TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
+ taskSchedulerEventHandler.init(tezConf);
+ taskSchedulerEventHandler.start();
+
+ TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback)
+ ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ .getSpyTaskScheduler();
+ TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+ AtomicBoolean drainNotifier = new AtomicBoolean(false);
+ taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
+
+ Resource resource1 = Resource.newInstance(1024, 1);
+ String[] host1 = {"host1"};
+ String[] host2 = {"host2"};
+
+ String []racks = {"/default-rack"};
+ Priority priority1 = Priority.newInstance(1);
+
+ TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
+
+ //Vertex 1, Task 1, Attempt 1, host1
+ TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1);
+ TaskAttempt ta11 = mock(TaskAttempt.class);
+ AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(taID11, ta11, resource1,
+ host1, racks, priority1);
+ taskSchedulerEventHandler.handleEvent(lrEvent1);
+
+ Container container1 = createContainer(1, "host1", resource1, priority1);
+
+ // One container allocated.
+ drainNotifier.set(false);
+ taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+ drainableAppCallback.drain();
+ verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta11),
+ any(Object.class), eq(container1));
+ taskScheduler.close();
+ taskSchedulerEventHandler.close();
+ }
private Container createContainer(int id, String host, Resource resource, Priority priority) {
ContainerId containerID = ContainerId.newInstance(