You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/02 23:57:57 UTC
hadoop git commit: YARN-3024. LocalizerRunner should give DIE action
when all resources are localized. Contributed by Chengbing Liu
Repository: hadoop
Updated Branches:
refs/heads/branch-2.6.1 8770f1f03 -> 9af5b1dcd
YARN-3024. LocalizerRunner should give DIE action when all resources are
localized. Contributed by Chengbing Liu
(cherry picked from commit 0d6bd62102f94c55d59f7a0a86a684e99d746127)
(cherry picked from commit a7696b3fbfacd98a892bbb3678663658c7b9d2bd)
(cherry picked from commit 9e30232004ab7c3c3bfde3b8b27c37fa7065f6be)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9af5b1dc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9af5b1dc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9af5b1dc
Branch: refs/heads/branch-2.6.1
Commit: 9af5b1dcd08c41df026bb86129abd42863baccef
Parents: 8770f1f
Author: Xuan <xg...@apache.org>
Authored: Sun Jan 25 19:37:57 2015 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed Sep 2 14:52:06 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../localizer/ResourceLocalizationService.java | 105 ++++++++-----------
.../TestResourceLocalizationService.java | 71 +++++++++----
3 files changed, 94 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af5b1dc/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 1d80c85..e9c5217 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -129,6 +129,9 @@ Release 2.6.1 - UNRELEASED
YARN-3487. CapacityScheduler scheduler lock obtained unnecessarily when
calling getQueue (Jason Lowe via wangda)
+ YARN-3024. LocalizerRunner should give DIE action when all resources are
+ localized. (Chengbing Liu via xgong)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af5b1dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index bcda3c7..08196a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -763,7 +763,7 @@ public class ResourceLocalizationService extends CompositeService
*/
if (rsrc.tryAcquire()) {
- if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
+ if (rsrc.getState() == ResourceState.DOWNLOADING) {
LocalResource resource = request.getResource().getRequest();
try {
Path publicRootPath =
@@ -896,7 +896,7 @@ public class ResourceLocalizationService extends CompositeService
LocalizedResource nRsrc = evt.getResource();
// Resource download should take place ONLY if resource is in
// Downloading state
- if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
+ if (nRsrc.getState() != ResourceState.DOWNLOADING) {
i.remove();
continue;
}
@@ -907,7 +907,7 @@ public class ResourceLocalizationService extends CompositeService
* 2) Resource is still in DOWNLOADING state
*/
if (nRsrc.tryAcquire()) {
- if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
+ if (nRsrc.getState() == ResourceState.DOWNLOADING) {
LocalResourceRequest nextRsrc = nRsrc.getRequest();
LocalResource next =
recordFactory.newRecordInstance(LocalResource.class);
@@ -937,41 +937,9 @@ public class ResourceLocalizationService extends CompositeService
String user = context.getUser();
ApplicationId applicationId =
context.getContainerId().getApplicationAttemptId().getApplicationId();
- // The localizer has just spawned. Start giving it resources for
- // remote-fetching.
- if (remoteResourceStatuses.isEmpty()) {
- LocalResource next = findNextResource();
- if (next != null) {
- response.setLocalizerAction(LocalizerAction.LIVE);
- try {
- ArrayList<ResourceLocalizationSpec> rsrcs =
- new ArrayList<ResourceLocalizationSpec>();
- ResourceLocalizationSpec rsrc =
- NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
- getPathForLocalization(next));
- rsrcs.add(rsrc);
- response.setResourceSpecs(rsrcs);
- } catch (IOException e) {
- LOG.error("local path for PRIVATE localization could not be found."
- + "Disks might have failed.", e);
- } catch (URISyntaxException e) {
- // TODO fail? Already translated several times...
- }
- } else if (pending.isEmpty()) {
- // TODO: Synchronization
- response.setLocalizerAction(LocalizerAction.DIE);
- } else {
- response.setLocalizerAction(LocalizerAction.LIVE);
- }
- return response;
- }
- ArrayList<ResourceLocalizationSpec> rsrcs =
- new ArrayList<ResourceLocalizationSpec>();
- /*
- * TODO : It doesn't support multiple downloads per ContainerLocalizer
- * at the same time. We need to think whether we should support this.
- */
+ LocalizerAction action = LocalizerAction.LIVE;
+ // Update resource statuses.
for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource();
LocalResourceRequest req = null;
@@ -1000,33 +968,8 @@ public class ResourceLocalizationService extends CompositeService
// list
assoc.getResource().unlock();
scheduled.remove(req);
-
- if (pending.isEmpty()) {
- // TODO: Synchronization
- response.setLocalizerAction(LocalizerAction.DIE);
- break;
- }
- response.setLocalizerAction(LocalizerAction.LIVE);
- LocalResource next = findNextResource();
- if (next != null) {
- try {
- ResourceLocalizationSpec resource =
- NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
- getPathForLocalization(next));
- rsrcs.add(resource);
- } catch (IOException e) {
- LOG.error("local path for PRIVATE localization could not be " +
- "found. Disks might have failed.", e);
- } catch (IllegalArgumentException e) {
- LOG.error("Inorrect path for PRIVATE localization."
- + next.getResource().getFile(), e);
- } catch (URISyntaxException e) {
- //TODO fail? Already translated several times...
- }
- }
break;
case FETCH_PENDING:
- response.setLocalizerAction(LocalizerAction.LIVE);
break;
case FETCH_FAILURE:
final String diagnostics = stat.getException().toString();
@@ -1040,17 +983,51 @@ public class ResourceLocalizationService extends CompositeService
// list
assoc.getResource().unlock();
scheduled.remove(req);
-
break;
default:
LOG.info("Unknown status: " + stat.getStatus());
- response.setLocalizerAction(LocalizerAction.DIE);
+ action = LocalizerAction.DIE;
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(new ResourceFailedLocalizationEvent(
req, stat.getException().getMessage()));
break;
}
}
+ if (action == LocalizerAction.DIE) {
+ response.setLocalizerAction(action);
+ return response;
+ }
+
+ // Give the localizer resources for remote-fetching.
+ List<ResourceLocalizationSpec> rsrcs =
+ new ArrayList<ResourceLocalizationSpec>();
+
+ /*
+ * TODO : It doesn't support multiple downloads per ContainerLocalizer
+ * at the same time. We need to think whether we should support this.
+ */
+ LocalResource next = findNextResource();
+ if (next != null) {
+ try {
+ ResourceLocalizationSpec resource =
+ NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+ getPathForLocalization(next));
+ rsrcs.add(resource);
+ } catch (IOException e) {
+ LOG.error("local path for PRIVATE localization could not be " +
+ "found. Disks might have failed.", e);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Inorrect path for PRIVATE localization."
+ + next.getResource().getFile(), e);
+ } catch (URISyntaxException e) {
+ //TODO fail? Already translated several times...
+ }
+ } else if (pending.isEmpty()) {
+ // TODO: Synchronization
+ action = LocalizerAction.DIE;
+ }
+
+ response.setLocalizerAction(action);
response.setResourceSpecs(rsrcs);
return response;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af5b1dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index c549fce..0894dc6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -832,10 +832,16 @@ public class TestResourceLocalizationService {
do {
resource2 = getPrivateMockedResource(r);
} while (resource2 == null || resource2.equals(resource1));
+ LocalResource resource3 = null;
+ do {
+ resource3 = getPrivateMockedResource(r);
+ } while (resource3 == null || resource3.equals(resource1)
+ || resource3.equals(resource2));
// above call to make sure we don't get identical resources.
final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
+ final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
@@ -843,6 +849,7 @@ public class TestResourceLocalizationService {
new ArrayList<LocalResourceRequest>();
privateResourceList.add(req1);
privateResourceList.add(req2);
+ privateResourceList.add(req3);
rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible
@@ -857,30 +864,47 @@ public class TestResourceLocalizationService {
Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer
- LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class);
- LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class);
+ LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
+ LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class);
+ LocalResourceStatus rsrc2success = mock(LocalResourceStatus.class);
+ LocalResourceStatus rsrc3success = mock(LocalResourceStatus.class);
LocalizerStatus stat = mock(LocalizerStatus.class);
when(stat.getLocalizerId()).thenReturn(ctnrStr);
- when(rsrcStat1.getResource()).thenReturn(resource1);
- when(rsrcStat2.getResource()).thenReturn(resource2);
- when(rsrcStat1.getLocalSize()).thenReturn(4344L);
- when(rsrcStat2.getLocalSize()).thenReturn(2342L);
+ when(rsrc1success.getResource()).thenReturn(resource1);
+ when(rsrc2pending.getResource()).thenReturn(resource2);
+ when(rsrc2success.getResource()).thenReturn(resource2);
+ when(rsrc3success.getResource()).thenReturn(resource3);
+ when(rsrc1success.getLocalSize()).thenReturn(4344L);
+ when(rsrc2success.getLocalSize()).thenReturn(2342L);
+ when(rsrc3success.getLocalSize()).thenReturn(5345L);
URL locPath = getPath("/cache/private/blah");
- when(rsrcStat1.getLocalPath()).thenReturn(locPath);
- when(rsrcStat2.getLocalPath()).thenReturn(locPath);
- when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
- when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+ when(rsrc1success.getLocalPath()).thenReturn(locPath);
+ when(rsrc2success.getLocalPath()).thenReturn(locPath);
+ when(rsrc3success.getLocalPath()).thenReturn(locPath);
+ when(rsrc1success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+ when(rsrc2pending.getStatus()).thenReturn(ResourceStatusType.FETCH_PENDING);
+ when(rsrc2success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+ when(rsrc3success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+
+ // Four heartbeats with sending:
+ // 1 - empty
+ // 2 - resource1 FETCH_SUCCESS
+ // 3 - resource2 FETCH_PENDING
+ // 4 - resource2 FETCH_SUCCESS, resource3 FETCH_SUCCESS
+ List<LocalResourceStatus> rsrcs4 = new ArrayList<LocalResourceStatus>();
+ rsrcs4.add(rsrc2success);
+ rsrcs4.add(rsrc3success);
when(stat.getResources())
.thenReturn(Collections.<LocalResourceStatus>emptyList())
- .thenReturn(Collections.singletonList(rsrcStat1))
- .thenReturn(Collections.singletonList(rsrcStat2))
- .thenReturn(Collections.<LocalResourceStatus>emptyList());
+ .thenReturn(Collections.singletonList(rsrc1success))
+ .thenReturn(Collections.singletonList(rsrc2pending))
+ .thenReturn(rsrcs4);
String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
Path.SEPARATOR + "user0" + Path.SEPARATOR +
ContainerLocalizer.FILECACHE;
-
- // get first resource
+
+ // First heartbeat
LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
assertEquals(1, response.getResourceSpecs().size());
@@ -893,7 +917,7 @@ public class TestResourceLocalizationService {
assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "10"));
- // get second resource
+ // Second heartbeat
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
assertEquals(1, response.getResourceSpecs().size());
@@ -907,16 +931,21 @@ public class TestResourceLocalizationService {
assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11"));
- // empty rsrc
+ // Third heartbeat
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
- assertEquals(0, response.getResourceSpecs().size());
+ assertEquals(1, response.getResourceSpecs().size());
+ assertEquals(req3, new LocalResourceRequest(response.getResourceSpecs()
+ .get(0).getResource()));
+ localizedPath =
+ response.getResourceSpecs().get(0).getDestinationDirectory();
+ assertTrue(localizedPath.getFile().endsWith(
+ localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));
// get shutdown
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
-
dispatcher.await();
// verify container notification
ArgumentMatcher<ContainerEvent> matchesContainerLoc =
@@ -928,8 +957,8 @@ public class TestResourceLocalizationService {
&& c.getContainerId() == evt.getContainerID();
}
};
- // total 2 resource localzation calls. one for each resource.
- verify(containerBus, times(2)).handle(argThat(matchesContainerLoc));
+ // total 3 resource localzation calls. one for each resource.
+ verify(containerBus, times(3)).handle(argThat(matchesContainerLoc));
// Verify deletion of localization token.
verify(delService).delete((String)isNull(), eq(localizationTokenPath));