You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2019/08/09 18:13:53 UTC

[hadoop] branch trunk updated: YARN-9527. Prevent rogue Localizer Runner from downloading same file repeatly. Contributed by Jim Brennan

This is an automated email from the ASF dual-hosted git repository.

eyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ff0453  YARN-9527.  Prevent rogue Localizer Runner from downloading same file repeatly.             Contributed by Jim Brennan
6ff0453 is described below

commit 6ff0453edeeb0ed7bc9a7d3fb6dfa7048104238b
Author: Eric Yang <ey...@apache.org>
AuthorDate: Fri Aug 9 14:12:17 2019 -0400

    YARN-9527.  Prevent rogue Localizer Runner from downloading same file repeatly.
                Contributed by Jim Brennan
---
 .../localizer/ResourceLocalizationService.java     | 144 +++++++------
 .../localizer/TestResourceLocalizationService.java | 232 ++++++++++++++++++++-
 2 files changed, 316 insertions(+), 60 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 0494c2d..3e4af2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -141,6 +141,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
 import org.apache.hadoop.yarn.util.FSDownload;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
@@ -722,6 +723,8 @@ public class ResourceLocalizationService extends CompositeService
 
     private final PublicLocalizer publicLocalizer;
     private final Map<String,LocalizerRunner> privLocalizers;
+    private final Map<String, String> recentlyCleanedLocalizers;
+    private final int maxRecentlyCleaned = 128;
 
     LocalizerTracker(Configuration conf) {
       this(conf, new HashMap<String,LocalizerRunner>());
@@ -732,6 +735,8 @@ public class ResourceLocalizationService extends CompositeService
       super(LocalizerTracker.class.getName());
       this.publicLocalizer = new PublicLocalizer(conf);
       this.privLocalizers = privLocalizers;
+      this.recentlyCleanedLocalizers =
+          new LRUCacheHashMap<String, String>(maxRecentlyCleaned, false);
     }
     
     @Override
@@ -783,14 +788,24 @@ public class ResourceLocalizationService extends CompositeService
           synchronized (privLocalizers) {
             LocalizerRunner localizer = privLocalizers.get(locId);
             if (localizer != null && localizer.killContainerLocalizer.get()) {
-              // Old localizer thread has been stopped, remove it and creates
+              // Old localizer thread has been stopped, remove it and create
               // a new localizer thread.
               LOG.info("New " + event.getType() + " localize request for "
                   + locId + ", remove old private localizer.");
-              cleanupPrivLocalizers(locId);
+              privLocalizers.remove(locId);
+              localizer.interrupt();
               localizer = null;
             }
             if (null == localizer) {
+              // Don't create a new localizer if this one has been recently
+              // cleaned up - this can happen if localization requests come
+              // in after cleanupPrivLocalizers has been called.
+              if (recentlyCleanedLocalizers.containsKey(locId)) {
+                LOG.info(
+                    "Skipping localization request for recently cleaned " +
+                    "localizer " + locId + " resource:" + req.getResource());
+                break;
+              }
               LOG.info("Created localizer for " + locId);
               localizer = new LocalizerRunner(req.getContext(), locId);
               privLocalizers.put(locId, localizer);
@@ -808,6 +823,7 @@ public class ResourceLocalizationService extends CompositeService
     public void cleanupPrivLocalizers(String locId) {
       synchronized (privLocalizers) {
         LocalizerRunner localizer = privLocalizers.get(locId);
+        recentlyCleanedLocalizers.put(locId, locId);
         if (null == localizer) {
           return; // ignore; already gone
         }
@@ -1047,44 +1063,74 @@ public class ResourceLocalizationService extends CompositeService
      * 
      * @return the next resource to be localized
      */
-    private LocalResource findNextResource() {
+    private ResourceLocalizationSpec findNextResource(
+        String user, ApplicationId applicationId) {
       synchronized (pending) {
         for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
             i.hasNext();) {
-         LocalizerResourceRequestEvent evt = i.next();
-         LocalizedResource nRsrc = evt.getResource();
-         // Resource download should take place ONLY if resource is in
-         // Downloading state
-         if (nRsrc.getState() != ResourceState.DOWNLOADING) {
-           i.remove();
-           continue;
-         }
-         /*
-          * Multiple containers will try to download the same resource. So the
-          * resource download should start only if
-          * 1) We can acquire a non blocking semaphore lock on resource
-          * 2) Resource is still in DOWNLOADING state
-          */
-         if (nRsrc.tryAcquire()) {
-           if (nRsrc.getState() == ResourceState.DOWNLOADING) {
-             LocalResourceRequest nextRsrc = nRsrc.getRequest();
-             LocalResource next =
-                 recordFactory.newRecordInstance(LocalResource.class);
-             next.setResource(URL.fromPath(nextRsrc
-               .getPath()));
-             next.setTimestamp(nextRsrc.getTimestamp());
-             next.setType(nextRsrc.getType());
-             next.setVisibility(evt.getVisibility());
-             next.setPattern(evt.getPattern());
-             scheduled.put(nextRsrc, evt);
-             return next;
-           } else {
-             // Need to release acquired lock
-             nRsrc.unlock();
-           }
-         }
-       }
-       return null;
+          LocalizerResourceRequestEvent evt = i.next();
+          LocalizedResource nRsrc = evt.getResource();
+          // Resource download should take place ONLY if resource is in
+          // Downloading state
+          if (nRsrc.getState() != ResourceState.DOWNLOADING) {
+            i.remove();
+            continue;
+          }
+          /*
+           * Multiple containers will try to download the same resource. So the
+           * resource download should start only if
+           * 1) We can acquire a non blocking semaphore lock on resource
+           * 2) Resource is still in DOWNLOADING state
+           */
+          if (nRsrc.tryAcquire()) {
+            if (nRsrc.getState() == ResourceState.DOWNLOADING) {
+              LocalResourceRequest nextRsrc = nRsrc.getRequest();
+              LocalResource next =
+                  recordFactory.newRecordInstance(LocalResource.class);
+              next.setResource(URL.fromPath(nextRsrc.getPath()));
+              next.setTimestamp(nextRsrc.getTimestamp());
+              next.setType(nextRsrc.getType());
+              next.setVisibility(evt.getVisibility());
+              next.setPattern(evt.getPattern());
+              ResourceLocalizationSpec nextSpec = null;
+              try {
+                LocalResourcesTracker tracker = getLocalResourcesTracker(
+                    next.getVisibility(), user, applicationId);
+                if (tracker != null) {
+                  Path localPath = getPathForLocalization(next, tracker);
+                  if (localPath != null) {
+                    nextSpec = NodeManagerBuilderUtils.
+                        newResourceLocalizationSpec(next, localPath);
+                  }
+                }
+              } catch (IOException e) {
+                LOG.error("local path for PRIVATE localization could not be " +
+                    "found. Disks might have failed.", e);
+              } catch (IllegalArgumentException e) {
+                LOG.error("Incorrect path for PRIVATE localization."
+                    + next.getResource().getFile(), e);
+              } catch (URISyntaxException e) {
+                LOG.error(
+                    "Got exception in parsing URL of LocalResource:"
+                        + next.getResource(), e);
+              }
+              if (nextSpec != null) {
+                scheduled.put(nextRsrc, evt);
+                return nextSpec;
+              } else {
+                // We failed to get a path for this, don't try to localize this
+                // resource again.
+                nRsrc.unlock();
+                i.remove();
+                continue;
+              }
+            } else {
+              // Need to release acquired lock
+              nRsrc.unlock();
+            }
+          }
+        }
+        return null;
       }
     }
 
@@ -1170,29 +1216,9 @@ public class ResourceLocalizationService extends CompositeService
        * TODO : It doesn't support multiple downloads per ContainerLocalizer
        * at the same time. We need to think whether we should support this.
        */
-      LocalResource next = findNextResource();
+      ResourceLocalizationSpec next = findNextResource(user, applicationId);
       if (next != null) {
-        try {
-          LocalResourcesTracker tracker = getLocalResourcesTracker(
-              next.getVisibility(), user, applicationId);
-          if (tracker != null) {
-            Path localPath = getPathForLocalization(next, tracker);
-            if (localPath != null) {
-              rsrcs.add(NodeManagerBuilderUtils.newResourceLocalizationSpec(
-                  next, localPath));
-            }
-          }
-        } catch (IOException e) {
-          LOG.error("local path for PRIVATE localization could not be " +
-            "found. Disks might have failed.", e);
-        } catch (IllegalArgumentException e) {
-          LOG.error("Incorrect path for PRIVATE localization."
-              + next.getResource().getFile(), e);
-        } catch (URISyntaxException e) {
-          LOG.error(
-              "Got exception in parsing URL of LocalResource:"
-                  + next.getResource(), e);
-        }
+        rsrcs.add(next);
       }
 
       response.setLocalizerAction(LocalizerAction.LIVE);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 8871bf6..cb877c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -140,6 +140,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@@ -1108,9 +1110,21 @@ public class TestResourceLocalizationService {
         Thread.yield();
       }
     }
+    private void yieldForLocalizers(int num) {
+      for (int i = 0; i < num; i++) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          continue;
+        }
+      }
+    }
     private void setStopLocalization() {
       stopLocalization = true;
     }
+    private int getNumLocalizers() {
+      return numLocalizers.get();
+    }
   }
 
   @Test(timeout = 20000)
@@ -1137,7 +1151,8 @@ public class TestResourceLocalizationService {
 
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
-    doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+    doReturn(lfs).when(spyService).
+        getLocalFileContext(isA(Configuration.class));
     FsPermission defaultPermission =
         FsPermission.getDirDefault().applyUMask(lfs.getUMask());
     FsPermission nmPermission =
@@ -1184,6 +1199,78 @@ public class TestResourceLocalizationService {
     }
   }
 
+  @Test
+  public void testResourceLocalizationReqsAfterContainerKill()
+      throws Exception {
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[1];
+    localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
+    sDirs[0] = localDirs.get(0).toString();
+
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+
+    DummyExecutor exec = new DummyExecutor();
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    dirsHandler.init(conf);
+
+    DeletionService delServiceReal = new DeletionService(exec);
+    DeletionService delService = spy(delServiceReal);
+    delService.init(new Configuration());
+    delService.start();
+
+    DrainDispatcher dispatcher = getDispatcher(conf);
+    ResourceLocalizationService rawService = new ResourceLocalizationService(
+        dispatcher, exec, delService, dirsHandler, nmContext, metrics);
+
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FsPermission nmPermission =
+        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+    final Path userDir =
+        new Path(sDirs[0].substring("file:".length()),
+            ContainerLocalizer.USERCACHE);
+    final Path fileDir =
+        new Path(sDirs[0].substring("file:".length()),
+            ContainerLocalizer.FILECACHE);
+    final Path sysDir =
+        new Path(sDirs[0].substring("file:".length()),
+            ResourceLocalizationService.NM_PRIVATE_DIR);
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+            defaultPermission, "", "", new Path(sDirs[0]));
+    final FileStatus nmFs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+            nmPermission, "", "", sysDir);
+
+    doAnswer(new Answer<FileStatus>() {
+      @Override
+      public FileStatus answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        if (args.length > 0) {
+          if (args[0].equals(userDir) || args[0].equals(fileDir)) {
+            return fs;
+          }
+        }
+        return nmFs;
+      }
+    }).when(spylfs).getFileStatus(isA(Path.class));
+
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      doLocalizationAfterCleanup(spyService, dispatcher, exec, delService);
+
+    } finally {
+      spyService.stop();
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
+
   private DrainDispatcher getDispatcher(Configuration config) {
     DrainDispatcher dispatcher = new DrainDispatcher();
     dispatcher.init(config);
@@ -1342,6 +1429,149 @@ public class TestResourceLocalizationService {
     assertNull(rsrc3);
   }
 
+  private void doLocalizationAfterCleanup(
+      ResourceLocalizationService spyService,
+      DrainDispatcher dispatcher, DummyExecutor exec,
+      DeletionService delService)
+      throws IOException, URISyntaxException, InterruptedException {
+    final Application app = mock(Application.class);
+    final ApplicationId appId =
+        BuilderUtils.newApplicationId(314159265358979L, 3);
+    String user = "user0";
+    when(app.getUser()).thenReturn(user);
+    when(app.getAppId()).thenReturn(appId);
+    List<LocalResource> resources = initializeLocalizer(appId);
+    LocalResource resource1 = resources.get(0);
+    LocalResource resource2 = resources.get(1);
+    LocalResource resource3 = resources.get(2);
+    final Container c1 = getMockContainer(appId, 42, "user0");
+    final Container c2 = getMockContainer(appId, 43, "user0");
+
+    EventHandler<ApplicationEvent> applicationBus =
+        getApplicationBus(dispatcher);
+    EventHandler<ContainerEvent> containerBus = getContainerBus(dispatcher);
+    initApp(spyService, applicationBus, app, appId, dispatcher);
+
+    // Send localization requests for container c1 and c2.
+    final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+    final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
+    final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+        new HashMap<LocalResourceVisibility,
+            Collection<LocalResourceRequest>>();
+    List<LocalResourceRequest> privateResourceList =
+        new ArrayList<LocalResourceRequest>();
+    rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
+
+    // Start Localization without any resources (so we can simulate the
+    // resource requests being delayed until after cleanup.
+    spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));
+    dispatcher.await();
+
+    // Kill c1 which leads to cleanup
+    spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));
+    dispatcher.await();
+
+    // Now we will send the resource requests and releases directly to tracker
+    privateResourceList.add(req1);
+    privateResourceList.add(req2);
+    privateResourceList.add(req3);
+
+    rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
+    LocalizerContext locCtx =
+        new LocalizerContext(user, c1.getContainerId(), c1.getCredentials());
+    LocalResourcesTracker tracker =
+        spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+            user, null);
+    for (LocalResourceRequest req : privateResourceList) {
+      tracker.handle(
+          new ResourceRequestEvent(req, LocalResourceVisibility.PRIVATE,
+              locCtx));
+    }
+    dispatcher.await();
+    for (LocalResourceRequest req : privateResourceList) {
+      tracker.handle(
+          new ResourceReleaseEvent(req, c1.getContainerId()));
+    }
+    dispatcher.await();
+
+    // Now start a second container with the same list of resources
+    spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs));
+    dispatcher.await();
+
+    // Wait for localizers to begin (should only be one for container2)
+    exec.yieldForLocalizers(2);
+    assertThat(exec.getNumLocalizers()).isEqualTo(1);
+
+    LocalizerRunner locC2 =
+        spyService.getLocalizerRunner(c2.getContainerId().toString());
+    LocalizerStatus stat = mockLocalizerStatus(c2, resource1, resource2);
+
+    // First heartbeat which schedules first resource.
+    LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
+    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+
+    // Second heartbeat which reports first resource as success.
+    // Second resource is scheduled.
+    response = spyService.heartbeat(stat);
+    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+    final String locPath1 =
+        response.getResourceSpecs().get(0).getDestinationDirectory().getFile();
+
+    // Third heartbeat which reports second resource as pending.
+    // Third resource is scheduled.
+    response = spyService.heartbeat(stat);
+    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+    final String locPath2 =
+        response.getResourceSpecs().get(0).getDestinationDirectory().getFile();
+
+    // Container c2 is killed which leads to cleanup
+    spyService.handle(new ContainerLocalizationCleanupEvent(c2, rsrcs));
+
+    // This heartbeat will indicate to container localizer to die as localizer
+    // runner has stopped.
+    response = spyService.heartbeat(stat);
+    assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
+
+    exec.setStopLocalization();
+    dispatcher.await();
+
+    // verify container notification
+    ArgumentMatcher<ContainerEvent> successContainerLoc =
+        evt -> evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
+            && c2.getContainerId() == evt.getContainerID();
+    // Only one resource gets localized for container c2.
+    verify(containerBus).handle(argThat(successContainerLoc));
+
+    Set<Path> paths =
+        Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
+            new Path(locPath2), new Path(locPath2 + "_tmp"));
+    // Wait for localizer runner thread for container c1 to finish.
+    while (locC2.getState() != Thread.State.TERMINATED) {
+      Thread.sleep(50);
+    }
+    // Verify if downloading resources were submitted for deletion.
+    verify(delService, times(3)).delete(argThat(new FileDeletionMatcher(
+        delService, user, null, new ArrayList<>(paths))));
+
+    // Container c2 was killed but this resource was localized before kill
+    // hence its not removed despite ref cnt being 0.
+    LocalizedResource rsrc1 = tracker.getLocalizedResource(req1);
+    assertNotNull(rsrc1);
+    assertThat(rsrc1.getState()).isEqualTo(ResourceState.LOCALIZED);
+    assertThat(rsrc1.getRefCount()).isEqualTo(0);
+
+    // Container c1 and c2 were killed before this finished downloading
+    // these should no longer be there.
+    LocalizedResource rsrc2 = tracker.getLocalizedResource(req2);
+    assertNull(rsrc2);
+    LocalizedResource rsrc3 = tracker.getLocalizedResource(req3);
+    assertNull(rsrc3);
+
+    // Double-check that we never created a Localizer for C1
+    assertThat(exec.getNumLocalizers()).isEqualTo(1);
+  }
+
   private LocalizerStatus mockLocalizerStatus(Container c1,
       LocalResource resource1, LocalResource resource2) {
     final String containerIdStr = c1.getContainerId().toString();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org