You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/23 06:37:46 UTC
svn commit: r1470812 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/
hadoop-yarn/hadoop-yarn-server/hado...
Author: vinodkv
Date: Tue Apr 23 04:37:45 2013
New Revision: 1470812
URL: http://svn.apache.org/r1470812
Log:
YARN-583. Moved application level local resources to be localized under the filecache sub-directory under application directory. Contributed by Omkar Vinit Joshi.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1470812&r1=1470811&r2=1470812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Apr 23 04:37:45 2013
@@ -167,6 +167,10 @@ Release 2.0.5-beta - UNRELEASED
YARN-542. Changed the default global AM max-attempts value to be not one.
(Zhijie Shen via vinodkv)
+ YARN-583. Moved application level local resources to be localized under the
+ filecache sub-directory under application directory. (Omkar Vinit Joshi via
+ vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1470812&r1=1470811&r2=1470812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Apr 23 04:37:45 2013
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@@ -46,7 +47,6 @@ import java.util.concurrent.ScheduledExe
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,6 +65,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -481,18 +482,15 @@ public class ResourceLocalizationService
}
private String getUserFileCachePath(String user) {
- String path =
- "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
- + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
- return path;
+ return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
+ ContainerLocalizer.USERCACHE, user, ContainerLocalizer.FILECACHE));
+
}
- private String getUserAppCachePath(String user, String appId) {
- String path =
- "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
- + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
- + Path.SEPARATOR + appId;
- return path;
+ private String getAppFileCachePath(String user, String appId) {
+ return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
+ ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
+ ContainerLocalizer.FILECACHE));
}
@VisibleForTesting
@@ -942,7 +940,7 @@ public class ResourceLocalizationService
if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
cacheDirectory = getUserFileCachePath(user);
} else {// APPLICATION ONLY
- cacheDirectory = getUserAppCachePath(user, appId.toString());
+ cacheDirectory = getAppFileCachePath(user, appId.toString());
}
Path dirPath =
dirsHandler.getLocalPathForWrite(cacheDirectory,
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1470812&r1=1470811&r2=1470812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Tue Apr 23 04:37:45 2013
@@ -42,6 +42,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -72,6 +73,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -848,6 +851,163 @@ public class TestResourceLocalizationSer
}
}
+ @Test(timeout = 10000)
+ @SuppressWarnings("unchecked")
+ public void testLocalResourcePath() throws Exception {
+
+ // test the local path where application and user cache files will be
+ // localized.
+
+ DrainDispatcher dispatcher1 = null;
+ try {
+ dispatcher1 = new DrainDispatcher();
+ String user = "testuser";
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+
+ // mocked Resource Localization Service
+ Configuration conf = new Configuration();
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ // We don't want files to be created
+ doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
+ anyBoolean());
+
+ // creating one local directory
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[1];
+ for (int i = 0; i < 1; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+ // setting log directory.
+ String logDir =
+ lfs.makeQualified(new Path(basedir, "logdir ")).toString();
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+
+ LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
+ localDirHandler.init(conf);
+ // Registering event handlers
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher1.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher1.register(ContainerEventType.class, containerBus);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ DeletionService delService = mock(DeletionService.class);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ // initializing directory handler.
+ dirsHandler.init(conf);
+
+ dispatcher1.init(conf);
+ dispatcher1.start();
+
+ ResourceLocalizationService rls =
+ new ResourceLocalizationService(dispatcher1, exec, delService,
+ localDirHandler);
+ dispatcher1.register(LocalizationEventType.class, rls);
+ rls.init(conf);
+
+ rls.handle(createApplicationLocalizationEvent(user, appId));
+
+ // We need to pre-populate the LocalizerRunner as the
+ // Resource Localization Service code internally starts them which
+ // definitely we don't want.
+
+ // creating new container and populating corresponding localizer runner
+
+ // Container - 1
+ Container container1 = createMockContainer(user, 1);
+ String localizerId1 = container1.getContainerID().toString();
+ rls.getPrivateLocalizers().put(
+ localizerId1,
+ rls.new LocalizerRunner(new LocalizerContext(user, container1
+ .getContainerID(), null), localizerId1));
+
+ // Creating two requests for container
+ // 1) Private resource
+ // 2) Application resource
+ LocalResourceRequest reqPriv =
+ new LocalResourceRequest(new Path("file:///tmp1"), 123L,
+ LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
+ List<LocalResourceRequest> privList =
+ new ArrayList<LocalResourceRequest>();
+ privList.add(reqPriv);
+
+ LocalResourceRequest reqApp =
+ new LocalResourceRequest(new Path("file:///tmp2"), 123L,
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, "");
+ List<LocalResourceRequest> appList =
+ new ArrayList<LocalResourceRequest>();
+ appList.add(reqApp);
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+ new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+ rsrcs.put(LocalResourceVisibility.APPLICATION, appList);
+ rsrcs.put(LocalResourceVisibility.PRIVATE, privList);
+
+ dispatcher1.getEventHandler().handle(
+ new ContainerLocalizationRequestEvent(container1, rsrcs));
+
+ // Now waiting for resource download to start. Here actual will not start
+ // Only the resources will be populated into pending list.
+ Assert
+ .assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 2, 500));
+
+ // Validating user and application cache paths
+
+ String userCachePath =
+ StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
+ .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
+ ContainerLocalizer.FILECACHE));
+ String userAppCachePath =
+ StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
+ .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
+ ContainerLocalizer.APPCACHE, appId.toString(),
+ ContainerLocalizer.FILECACHE));
+
+ // Now the Application and private resources may come in any order
+ // for download.
+ // For User cahce :
+ // returned destinationPath = user cache path + random number
+ // For App cache :
+ // returned destinationPath = user app cache path + random number
+
+ int returnedResources = 0;
+ boolean appRsrc = false, privRsrc = false;
+ while (returnedResources < 2) {
+ LocalizerHeartbeatResponse response =
+ rls.heartbeat(createLocalizerStatus(localizerId1));
+ for (ResourceLocalizationSpec resourceSpec : response
+ .getResourceSpecs()) {
+ returnedResources++;
+ Path destinationDirectory =
+ new Path(resourceSpec.getDestinationDirectory().getFile());
+ if (resourceSpec.getResource().getVisibility() ==
+ LocalResourceVisibility.APPLICATION) {
+ appRsrc = true;
+ Assert.assertEquals(userAppCachePath, destinationDirectory
+ .getParent().toUri().toString());
+ } else if (resourceSpec.getResource().getVisibility() ==
+ LocalResourceVisibility.PRIVATE) {
+ privRsrc = true;
+ Assert.assertEquals(userCachePath, destinationDirectory.getParent()
+ .toUri().toString());
+ } else {
+ throw new Exception("Unexpected resource recevied.");
+ }
+ }
+ }
+ // We should receive both the resources (Application and Private)
+ Assert.assertTrue(appRsrc && privRsrc);
+ } finally {
+ if (dispatcher1 != null) {
+ dispatcher1.stop();
+ }
+ }
+ }
+
private LocalizerStatus createLocalizerStatusForFailedResource(
String localizerId, LocalResourceRequest req) {
LocalizerStatus status = createLocalizerStatus(localizerId);