You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2015/01/26 04:39:24 UTC

hadoop git commit: YARN-3024. LocalizerRunner should give DIE action when all resources are localized. Contributed by Chengbing Liu

Repository: hadoop
Updated Branches:
  refs/heads/trunk 35f64962f -> 0d6bd6210


YARN-3024. LocalizerRunner should give DIE action when all resources are
localized. Contributed by Chengbing Liu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d6bd621
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d6bd621
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d6bd621

Branch: refs/heads/trunk
Commit: 0d6bd62102f94c55d59f7a0a86a684e99d746127
Parents: 35f6496
Author: Xuan <xg...@apache.org>
Authored: Sun Jan 25 19:37:57 2015 -0800
Committer: Xuan <xg...@apache.org>
Committed: Sun Jan 25 19:37:57 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../localizer/ResourceLocalizationService.java  | 99 ++++++++------------
 .../TestResourceLocalizationService.java        | 71 +++++++++-----
 3 files changed, 91 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6bd621/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7263c6f..0808678 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -200,6 +200,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2800. Remove MemoryNodeLabelsStore and add a way to enable/disable
     node labels feature. (Wangda Tan via ozawa)
 
+    YARN-3024. LocalizerRunner should give DIE action when all resources are
+    localized. (Chengbing Liu via xgong)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6bd621/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 5440980..2f4fa5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -763,7 +763,7 @@ public class ResourceLocalizationService extends CompositeService
        */
 
       if (rsrc.tryAcquire()) {
-        if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
+        if (rsrc.getState() == ResourceState.DOWNLOADING) {
           LocalResource resource = request.getResource().getRequest();
           try {
             Path publicRootPath =
@@ -895,7 +895,7 @@ public class ResourceLocalizationService extends CompositeService
          LocalizedResource nRsrc = evt.getResource();
          // Resource download should take place ONLY if resource is in
          // Downloading state
-         if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
+         if (nRsrc.getState() != ResourceState.DOWNLOADING) {
            i.remove();
            continue;
          }
@@ -906,7 +906,7 @@ public class ResourceLocalizationService extends CompositeService
           * 2) Resource is still in DOWNLOADING state
           */
          if (nRsrc.tryAcquire()) {
-           if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
+           if (nRsrc.getState() == ResourceState.DOWNLOADING) {
              LocalResourceRequest nextRsrc = nRsrc.getRequest();
              LocalResource next =
                  recordFactory.newRecordInstance(LocalResource.class);
@@ -936,41 +936,9 @@ public class ResourceLocalizationService extends CompositeService
       String user = context.getUser();
       ApplicationId applicationId =
           context.getContainerId().getApplicationAttemptId().getApplicationId();
-      // The localizer has just spawned. Start giving it resources for
-      // remote-fetching.
-      if (remoteResourceStatuses.isEmpty()) {
-        LocalResource next = findNextResource();
-        if (next != null) {
-          response.setLocalizerAction(LocalizerAction.LIVE);
-          try {
-            ArrayList<ResourceLocalizationSpec> rsrcs =
-                new ArrayList<ResourceLocalizationSpec>();
-            ResourceLocalizationSpec rsrc =
-                NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
-                  getPathForLocalization(next));
-            rsrcs.add(rsrc);
-            response.setResourceSpecs(rsrcs);
-          } catch (IOException e) {
-            LOG.error("local path for PRIVATE localization could not be found."
-                + "Disks might have failed.", e);
-          } catch (URISyntaxException e) {
-            // TODO fail? Already translated several times...
-          }
-        } else if (pending.isEmpty()) {
-          // TODO: Synchronization
-          response.setLocalizerAction(LocalizerAction.DIE);
-        } else {
-          response.setLocalizerAction(LocalizerAction.LIVE);
-        }
-        return response;
-      }
-      ArrayList<ResourceLocalizationSpec> rsrcs =
-          new ArrayList<ResourceLocalizationSpec>();
-       /*
-        * TODO : It doesn't support multiple downloads per ContainerLocalizer
-        * at the same time. We need to think whether we should support this.
-        */
 
+      LocalizerAction action = LocalizerAction.LIVE;
+      // Update resource statuses.
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
         LocalResourceRequest req = null;
@@ -999,30 +967,8 @@ public class ResourceLocalizationService extends CompositeService
             // list
             assoc.getResource().unlock();
             scheduled.remove(req);
-            
-            if (pending.isEmpty()) {
-              // TODO: Synchronization
-              response.setLocalizerAction(LocalizerAction.DIE);
-              break;
-            }
-            response.setLocalizerAction(LocalizerAction.LIVE);
-            LocalResource next = findNextResource();
-            if (next != null) {
-              try {
-                ResourceLocalizationSpec resource =
-                    NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
-                      getPathForLocalization(next));
-                rsrcs.add(resource);
-              } catch (IOException e) {
-                LOG.error("local path for PRIVATE localization could not be " +
-                  "found. Disks might have failed.", e);
-              } catch (URISyntaxException e) {
-                  //TODO fail? Already translated several times...
-              }
-            }
             break;
           case FETCH_PENDING:
-            response.setLocalizerAction(LocalizerAction.LIVE);
             break;
           case FETCH_FAILURE:
             final String diagnostics = stat.getException().toString();
@@ -1036,17 +982,48 @@ public class ResourceLocalizationService extends CompositeService
             // list
             assoc.getResource().unlock();
             scheduled.remove(req);
-            
             break;
           default:
             LOG.info("Unknown status: " + stat.getStatus());
-            response.setLocalizerAction(LocalizerAction.DIE);
+            action = LocalizerAction.DIE;
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(new ResourceFailedLocalizationEvent(
                   req, stat.getException().getMessage()));
             break;
         }
       }
+      if (action == LocalizerAction.DIE) {
+        response.setLocalizerAction(action);
+        return response;
+      }
+
+      // Give the localizer resources for remote-fetching.
+      List<ResourceLocalizationSpec> rsrcs =
+          new ArrayList<ResourceLocalizationSpec>();
+
+      /*
+       * TODO : It doesn't support multiple downloads per ContainerLocalizer
+       * at the same time. We need to think whether we should support this.
+       */
+      LocalResource next = findNextResource();
+      if (next != null) {
+        try {
+          ResourceLocalizationSpec resource =
+              NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                getPathForLocalization(next));
+          rsrcs.add(resource);
+        } catch (IOException e) {
+          LOG.error("local path for PRIVATE localization could not be " +
+            "found. Disks might have failed.", e);
+        } catch (URISyntaxException e) {
+            //TODO fail? Already translated several times...
+        }
+      } else if (pending.isEmpty()) {
+        // TODO: Synchronization
+        action = LocalizerAction.DIE;
+      }
+
+      response.setLocalizerAction(action);
       response.setResourceSpecs(rsrcs);
       return response;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6bd621/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index f968bb9..9ed18dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -827,10 +827,16 @@ public class TestResourceLocalizationService {
       do {
         resource2 = getPrivateMockedResource(r);
       } while (resource2 == null || resource2.equals(resource1));
+      LocalResource resource3 = null;
+      do {
+        resource3 = getPrivateMockedResource(r);
+      } while (resource3 == null || resource3.equals(resource1)
+          || resource3.equals(resource2));
       // above call to make sure we don't get identical resources.
       
       final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
       final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
+      final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
         new HashMap<LocalResourceVisibility, 
                     Collection<LocalResourceRequest>>();
@@ -838,6 +844,7 @@ public class TestResourceLocalizationService {
           new ArrayList<LocalResourceRequest>();
       privateResourceList.add(req1);
       privateResourceList.add(req2);
+      privateResourceList.add(req3);
       rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
       spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
       // Sigh. Thread init of private localizer not accessible
@@ -852,30 +859,47 @@ public class TestResourceLocalizationService {
       Path localizationTokenPath = tokenPathCaptor.getValue();
 
       // heartbeat from localizer
-      LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class);
-      LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrc2success = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrc3success = mock(LocalResourceStatus.class);
       LocalizerStatus stat = mock(LocalizerStatus.class);
       when(stat.getLocalizerId()).thenReturn(ctnrStr);
-      when(rsrcStat1.getResource()).thenReturn(resource1);
-      when(rsrcStat2.getResource()).thenReturn(resource2);
-      when(rsrcStat1.getLocalSize()).thenReturn(4344L);
-      when(rsrcStat2.getLocalSize()).thenReturn(2342L);
+      when(rsrc1success.getResource()).thenReturn(resource1);
+      when(rsrc2pending.getResource()).thenReturn(resource2);
+      when(rsrc2success.getResource()).thenReturn(resource2);
+      when(rsrc3success.getResource()).thenReturn(resource3);
+      when(rsrc1success.getLocalSize()).thenReturn(4344L);
+      when(rsrc2success.getLocalSize()).thenReturn(2342L);
+      when(rsrc3success.getLocalSize()).thenReturn(5345L);
       URL locPath = getPath("/cache/private/blah");
-      when(rsrcStat1.getLocalPath()).thenReturn(locPath);
-      when(rsrcStat2.getLocalPath()).thenReturn(locPath);
-      when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
-      when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrc1success.getLocalPath()).thenReturn(locPath);
+      when(rsrc2success.getLocalPath()).thenReturn(locPath);
+      when(rsrc3success.getLocalPath()).thenReturn(locPath);
+      when(rsrc1success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrc2pending.getStatus()).thenReturn(ResourceStatusType.FETCH_PENDING);
+      when(rsrc2success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrc3success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+
+      // Four heartbeats with sending:
+      // 1 - empty
+      // 2 - resource1 FETCH_SUCCESS
+      // 3 - resource2 FETCH_PENDING
+      // 4 - resource2 FETCH_SUCCESS, resource3 FETCH_SUCCESS
+      List<LocalResourceStatus> rsrcs4 = new ArrayList<LocalResourceStatus>();
+      rsrcs4.add(rsrc2success);
+      rsrcs4.add(rsrc3success);
       when(stat.getResources())
         .thenReturn(Collections.<LocalResourceStatus>emptyList())
-        .thenReturn(Collections.singletonList(rsrcStat1))
-        .thenReturn(Collections.singletonList(rsrcStat2))
-        .thenReturn(Collections.<LocalResourceStatus>emptyList());
+        .thenReturn(Collections.singletonList(rsrc1success))
+        .thenReturn(Collections.singletonList(rsrc2pending))
+        .thenReturn(rsrcs4);
 
       String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
           Path.SEPARATOR + "user0" + Path.SEPARATOR +
           ContainerLocalizer.FILECACHE;
-      
-      // get first resource
+
+      // First heartbeat
       LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
       assertEquals(1, response.getResourceSpecs().size());
@@ -888,7 +912,7 @@ public class TestResourceLocalizationService {
       assertTrue(localizedPath.getFile().endsWith(
         localPath + Path.SEPARATOR + "10"));
 
-      // get second resource
+      // Second heartbeat
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
       assertEquals(1, response.getResourceSpecs().size());
@@ -902,16 +926,21 @@ public class TestResourceLocalizationService {
       assertTrue(localizedPath.getFile().endsWith(
         localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11"));
 
-      // empty rsrc
+      // Third heartbeat
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
-      assertEquals(0, response.getResourceSpecs().size());
+      assertEquals(1, response.getResourceSpecs().size());
+      assertEquals(req3, new LocalResourceRequest(response.getResourceSpecs()
+          .get(0).getResource()));
+      localizedPath =
+          response.getResourceSpecs().get(0).getDestinationDirectory();
+      assertTrue(localizedPath.getFile().endsWith(
+          localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));
 
       // get shutdown
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
 
-
       dispatcher.await();
       // verify container notification
       ArgumentMatcher<ContainerEvent> matchesContainerLoc =
@@ -923,8 +952,8 @@ public class TestResourceLocalizationService {
               && c.getContainerId() == evt.getContainerID();
           }
         };
-      // total 2 resource localzation calls. one for each resource.
-      verify(containerBus, times(2)).handle(argThat(matchesContainerLoc));
+      // total 3 resource localzation calls. one for each resource.
+      verify(containerBus, times(3)).handle(argThat(matchesContainerLoc));
         
       // Verify deletion of localization token.
       verify(delService).delete((String)isNull(), eq(localizationTokenPath));