You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2019/08/09 18:13:53 UTC
[hadoop] branch trunk updated: YARN-9527. Prevent rogue Localizer
Runner from downloading same file repeatly. Contributed by Jim Brennan
This is an automated email from the ASF dual-hosted git repository.
eyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ff0453 YARN-9527. Prevent rogue Localizer Runner from downloading same file repeatly. Contributed by Jim Brennan
6ff0453 is described below
commit 6ff0453edeeb0ed7bc9a7d3fb6dfa7048104238b
Author: Eric Yang <ey...@apache.org>
AuthorDate: Fri Aug 9 14:12:17 2019 -0400
YARN-9527. Prevent rogue Localizer Runner from downloading same file repeatly.
Contributed by Jim Brennan
---
.../localizer/ResourceLocalizationService.java | 144 +++++++------
.../localizer/TestResourceLocalizationService.java | 232 ++++++++++++++++++++-
2 files changed, 316 insertions(+), 60 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 0494c2d..3e4af2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -141,6 +141,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.util.FSDownload;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
@@ -722,6 +723,8 @@ public class ResourceLocalizationService extends CompositeService
private final PublicLocalizer publicLocalizer;
private final Map<String,LocalizerRunner> privLocalizers;
+ private final Map<String, String> recentlyCleanedLocalizers;
+ private final int maxRecentlyCleaned = 128;
LocalizerTracker(Configuration conf) {
this(conf, new HashMap<String,LocalizerRunner>());
@@ -732,6 +735,8 @@ public class ResourceLocalizationService extends CompositeService
super(LocalizerTracker.class.getName());
this.publicLocalizer = new PublicLocalizer(conf);
this.privLocalizers = privLocalizers;
+ this.recentlyCleanedLocalizers =
+ new LRUCacheHashMap<String, String>(maxRecentlyCleaned, false);
}
@Override
@@ -783,14 +788,24 @@ public class ResourceLocalizationService extends CompositeService
synchronized (privLocalizers) {
LocalizerRunner localizer = privLocalizers.get(locId);
if (localizer != null && localizer.killContainerLocalizer.get()) {
- // Old localizer thread has been stopped, remove it and creates
+ // Old localizer thread has been stopped, remove it and create
// a new localizer thread.
LOG.info("New " + event.getType() + " localize request for "
+ locId + ", remove old private localizer.");
- cleanupPrivLocalizers(locId);
+ privLocalizers.remove(locId);
+ localizer.interrupt();
localizer = null;
}
if (null == localizer) {
+ // Don't create a new localizer if this one has been recently
+ // cleaned up - this can happen if localization requests come
+ // in after cleanupPrivLocalizers has been called.
+ if (recentlyCleanedLocalizers.containsKey(locId)) {
+ LOG.info(
+ "Skipping localization request for recently cleaned " +
+ "localizer " + locId + " resource:" + req.getResource());
+ break;
+ }
LOG.info("Created localizer for " + locId);
localizer = new LocalizerRunner(req.getContext(), locId);
privLocalizers.put(locId, localizer);
@@ -808,6 +823,7 @@ public class ResourceLocalizationService extends CompositeService
public void cleanupPrivLocalizers(String locId) {
synchronized (privLocalizers) {
LocalizerRunner localizer = privLocalizers.get(locId);
+ recentlyCleanedLocalizers.put(locId, locId);
if (null == localizer) {
return; // ignore; already gone
}
@@ -1047,44 +1063,74 @@ public class ResourceLocalizationService extends CompositeService
*
* @return the next resource to be localized
*/
- private LocalResource findNextResource() {
+ private ResourceLocalizationSpec findNextResource(
+ String user, ApplicationId applicationId) {
synchronized (pending) {
for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
i.hasNext();) {
- LocalizerResourceRequestEvent evt = i.next();
- LocalizedResource nRsrc = evt.getResource();
- // Resource download should take place ONLY if resource is in
- // Downloading state
- if (nRsrc.getState() != ResourceState.DOWNLOADING) {
- i.remove();
- continue;
- }
- /*
- * Multiple containers will try to download the same resource. So the
- * resource download should start only if
- * 1) We can acquire a non blocking semaphore lock on resource
- * 2) Resource is still in DOWNLOADING state
- */
- if (nRsrc.tryAcquire()) {
- if (nRsrc.getState() == ResourceState.DOWNLOADING) {
- LocalResourceRequest nextRsrc = nRsrc.getRequest();
- LocalResource next =
- recordFactory.newRecordInstance(LocalResource.class);
- next.setResource(URL.fromPath(nextRsrc
- .getPath()));
- next.setTimestamp(nextRsrc.getTimestamp());
- next.setType(nextRsrc.getType());
- next.setVisibility(evt.getVisibility());
- next.setPattern(evt.getPattern());
- scheduled.put(nextRsrc, evt);
- return next;
- } else {
- // Need to release acquired lock
- nRsrc.unlock();
- }
- }
- }
- return null;
+ LocalizerResourceRequestEvent evt = i.next();
+ LocalizedResource nRsrc = evt.getResource();
+ // Resource download should take place ONLY if resource is in
+ // Downloading state
+ if (nRsrc.getState() != ResourceState.DOWNLOADING) {
+ i.remove();
+ continue;
+ }
+ /*
+ * Multiple containers will try to download the same resource. So the
+ * resource download should start only if
+ * 1) We can acquire a non blocking semaphore lock on resource
+ * 2) Resource is still in DOWNLOADING state
+ */
+ if (nRsrc.tryAcquire()) {
+ if (nRsrc.getState() == ResourceState.DOWNLOADING) {
+ LocalResourceRequest nextRsrc = nRsrc.getRequest();
+ LocalResource next =
+ recordFactory.newRecordInstance(LocalResource.class);
+ next.setResource(URL.fromPath(nextRsrc.getPath()));
+ next.setTimestamp(nextRsrc.getTimestamp());
+ next.setType(nextRsrc.getType());
+ next.setVisibility(evt.getVisibility());
+ next.setPattern(evt.getPattern());
+ ResourceLocalizationSpec nextSpec = null;
+ try {
+ LocalResourcesTracker tracker = getLocalResourcesTracker(
+ next.getVisibility(), user, applicationId);
+ if (tracker != null) {
+ Path localPath = getPathForLocalization(next, tracker);
+ if (localPath != null) {
+ nextSpec = NodeManagerBuilderUtils.
+ newResourceLocalizationSpec(next, localPath);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("local path for PRIVATE localization could not be " +
+ "found. Disks might have failed.", e);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Incorrect path for PRIVATE localization."
+ + next.getResource().getFile(), e);
+ } catch (URISyntaxException e) {
+ LOG.error(
+ "Got exception in parsing URL of LocalResource:"
+ + next.getResource(), e);
+ }
+ if (nextSpec != null) {
+ scheduled.put(nextRsrc, evt);
+ return nextSpec;
+ } else {
+ // We failed to get a path for this, don't try to localize this
+ // resource again.
+ nRsrc.unlock();
+ i.remove();
+ continue;
+ }
+ } else {
+ // Need to release acquired lock
+ nRsrc.unlock();
+ }
+ }
+ }
+ return null;
}
}
@@ -1170,29 +1216,9 @@ public class ResourceLocalizationService extends CompositeService
* TODO : It doesn't support multiple downloads per ContainerLocalizer
* at the same time. We need to think whether we should support this.
*/
- LocalResource next = findNextResource();
+ ResourceLocalizationSpec next = findNextResource(user, applicationId);
if (next != null) {
- try {
- LocalResourcesTracker tracker = getLocalResourcesTracker(
- next.getVisibility(), user, applicationId);
- if (tracker != null) {
- Path localPath = getPathForLocalization(next, tracker);
- if (localPath != null) {
- rsrcs.add(NodeManagerBuilderUtils.newResourceLocalizationSpec(
- next, localPath));
- }
- }
- } catch (IOException e) {
- LOG.error("local path for PRIVATE localization could not be " +
- "found. Disks might have failed.", e);
- } catch (IllegalArgumentException e) {
- LOG.error("Incorrect path for PRIVATE localization."
- + next.getResource().getFile(), e);
- } catch (URISyntaxException e) {
- LOG.error(
- "Got exception in parsing URL of LocalResource:"
- + next.getResource(), e);
- }
+ rsrcs.add(next);
}
response.setLocalizerAction(LocalizerAction.LIVE);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 8871bf6..cb877c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -140,6 +140,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@@ -1108,9 +1110,21 @@ public class TestResourceLocalizationService {
Thread.yield();
}
}
+ private void yieldForLocalizers(int num) {
+ for (int i = 0; i < num; i++) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+ }
private void setStopLocalization() {
stopLocalization = true;
}
+ private int getNumLocalizers() {
+ return numLocalizers.get();
+ }
}
@Test(timeout = 20000)
@@ -1137,7 +1151,8 @@ public class TestResourceLocalizationService {
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
- doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+ doReturn(lfs).when(spyService).
+ getLocalFileContext(isA(Configuration.class));
FsPermission defaultPermission =
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
FsPermission nmPermission =
@@ -1184,6 +1199,78 @@ public class TestResourceLocalizationService {
}
}
+ @Test
+ public void testResourceLocalizationReqsAfterContainerKill()
+ throws Exception {
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[1];
+ localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
+ sDirs[0] = localDirs.get(0).toString();
+
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+
+ DummyExecutor exec = new DummyExecutor();
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ dirsHandler.init(conf);
+
+ DeletionService delServiceReal = new DeletionService(exec);
+ DeletionService delService = spy(delServiceReal);
+ delService.init(new Configuration());
+ delService.start();
+
+ DrainDispatcher dispatcher = getDispatcher(conf);
+ ResourceLocalizationService rawService = new ResourceLocalizationService(
+ dispatcher, exec, delService, dirsHandler, nmContext, metrics);
+
+ ResourceLocalizationService spyService = spy(rawService);
+ doReturn(mockServer).when(spyService).createServer();
+ doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+ FsPermission defaultPermission =
+ FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+ FsPermission nmPermission =
+ ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+ final Path userDir =
+ new Path(sDirs[0].substring("file:".length()),
+ ContainerLocalizer.USERCACHE);
+ final Path fileDir =
+ new Path(sDirs[0].substring("file:".length()),
+ ContainerLocalizer.FILECACHE);
+ final Path sysDir =
+ new Path(sDirs[0].substring("file:".length()),
+ ResourceLocalizationService.NM_PRIVATE_DIR);
+ final FileStatus fs =
+ new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+ defaultPermission, "", "", new Path(sDirs[0]));
+ final FileStatus nmFs =
+ new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+ nmPermission, "", "", sysDir);
+
+ doAnswer(new Answer<FileStatus>() {
+ @Override
+ public FileStatus answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ if (args.length > 0) {
+ if (args[0].equals(userDir) || args[0].equals(fileDir)) {
+ return fs;
+ }
+ }
+ return nmFs;
+ }
+ }).when(spylfs).getFileStatus(isA(Path.class));
+
+ try {
+ spyService.init(conf);
+ spyService.start();
+
+ doLocalizationAfterCleanup(spyService, dispatcher, exec, delService);
+
+ } finally {
+ spyService.stop();
+ dispatcher.stop();
+ delService.stop();
+ }
+ }
+
private DrainDispatcher getDispatcher(Configuration config) {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(config);
@@ -1342,6 +1429,149 @@ public class TestResourceLocalizationService {
assertNull(rsrc3);
}
+ private void doLocalizationAfterCleanup(
+ ResourceLocalizationService spyService,
+ DrainDispatcher dispatcher, DummyExecutor exec,
+ DeletionService delService)
+ throws IOException, URISyntaxException, InterruptedException {
+ final Application app = mock(Application.class);
+ final ApplicationId appId =
+ BuilderUtils.newApplicationId(314159265358979L, 3);
+ String user = "user0";
+ when(app.getUser()).thenReturn(user);
+ when(app.getAppId()).thenReturn(appId);
+ List<LocalResource> resources = initializeLocalizer(appId);
+ LocalResource resource1 = resources.get(0);
+ LocalResource resource2 = resources.get(1);
+ LocalResource resource3 = resources.get(2);
+ final Container c1 = getMockContainer(appId, 42, "user0");
+ final Container c2 = getMockContainer(appId, 43, "user0");
+
+ EventHandler<ApplicationEvent> applicationBus =
+ getApplicationBus(dispatcher);
+ EventHandler<ContainerEvent> containerBus = getContainerBus(dispatcher);
+ initApp(spyService, applicationBus, app, appId, dispatcher);
+
+ // Send localization requests for container c1 and c2.
+ final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+ final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
+ final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ List<LocalResourceRequest> privateResourceList =
+ new ArrayList<LocalResourceRequest>();
+ rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
+
+ // Start Localization without any resources (so we can simulate the
+ // resource requests being delayed until after cleanup.
+ spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));
+ dispatcher.await();
+
+ // Kill c1 which leads to cleanup
+ spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));
+ dispatcher.await();
+
+ // Now we will send the resource requests and releases directly to tracker
+ privateResourceList.add(req1);
+ privateResourceList.add(req2);
+ privateResourceList.add(req3);
+
+ rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
+ LocalizerContext locCtx =
+ new LocalizerContext(user, c1.getContainerId(), c1.getCredentials());
+ LocalResourcesTracker tracker =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+ user, null);
+ for (LocalResourceRequest req : privateResourceList) {
+ tracker.handle(
+ new ResourceRequestEvent(req, LocalResourceVisibility.PRIVATE,
+ locCtx));
+ }
+ dispatcher.await();
+ for (LocalResourceRequest req : privateResourceList) {
+ tracker.handle(
+ new ResourceReleaseEvent(req, c1.getContainerId()));
+ }
+ dispatcher.await();
+
+ // Now start a second container with the same list of resources
+ spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs));
+ dispatcher.await();
+
+ // Wait for localizers to begin (should only be one for container2)
+ exec.yieldForLocalizers(2);
+ assertThat(exec.getNumLocalizers()).isEqualTo(1);
+
+ LocalizerRunner locC2 =
+ spyService.getLocalizerRunner(c2.getContainerId().toString());
+ LocalizerStatus stat = mockLocalizerStatus(c2, resource1, resource2);
+
+ // First heartbeat which schedules first resource.
+ LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
+ assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+
+ // Second heartbeat which reports first resource as success.
+ // Second resource is scheduled.
+ response = spyService.heartbeat(stat);
+ assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+ final String locPath1 =
+ response.getResourceSpecs().get(0).getDestinationDirectory().getFile();
+
+ // Third heartbeat which reports second resource as pending.
+ // Third resource is scheduled.
+ response = spyService.heartbeat(stat);
+ assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+ final String locPath2 =
+ response.getResourceSpecs().get(0).getDestinationDirectory().getFile();
+
+ // Container c2 is killed which leads to cleanup
+ spyService.handle(new ContainerLocalizationCleanupEvent(c2, rsrcs));
+
+ // This heartbeat will indicate to container localizer to die as localizer
+ // runner has stopped.
+ response = spyService.heartbeat(stat);
+ assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
+
+ exec.setStopLocalization();
+ dispatcher.await();
+
+ // verify container notification
+ ArgumentMatcher<ContainerEvent> successContainerLoc =
+ evt -> evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
+ && c2.getContainerId() == evt.getContainerID();
+ // Only one resource gets localized for container c2.
+ verify(containerBus).handle(argThat(successContainerLoc));
+
+ Set<Path> paths =
+ Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
+ new Path(locPath2), new Path(locPath2 + "_tmp"));
+ // Wait for localizer runner thread for container c1 to finish.
+ while (locC2.getState() != Thread.State.TERMINATED) {
+ Thread.sleep(50);
+ }
+ // Verify if downloading resources were submitted for deletion.
+ verify(delService, times(3)).delete(argThat(new FileDeletionMatcher(
+ delService, user, null, new ArrayList<>(paths))));
+
+ // Container c2 was killed but this resource was localized before kill
+ // hence its not removed despite ref cnt being 0.
+ LocalizedResource rsrc1 = tracker.getLocalizedResource(req1);
+ assertNotNull(rsrc1);
+ assertThat(rsrc1.getState()).isEqualTo(ResourceState.LOCALIZED);
+ assertThat(rsrc1.getRefCount()).isEqualTo(0);
+
+ // Container c1 and c2 were killed before this finished downloading
+ // these should no longer be there.
+ LocalizedResource rsrc2 = tracker.getLocalizedResource(req2);
+ assertNull(rsrc2);
+ LocalizedResource rsrc3 = tracker.getLocalizedResource(req3);
+ assertNull(rsrc3);
+
+ // Double-check that we never created a Localizer for C1
+ assertThat(exec.getNumLocalizers()).isEqualTo(1);
+ }
+
private LocalizerStatus mockLocalizerStatus(Container c1,
LocalResource resource1, LocalResource resource2) {
final String containerIdStr = c1.getContainerId().toString();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org