You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/23 06:37:46 UTC

svn commit: r1470812 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ hadoop-yarn/hadoop-yarn-server/hado...

Author: vinodkv
Date: Tue Apr 23 04:37:45 2013
New Revision: 1470812

URL: http://svn.apache.org/r1470812
Log:
YARN-583. Moved application level local resources to be localized under the filecache sub-directory under application directory. Contributed by Omkar Vinit Joshi.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1470812&r1=1470811&r2=1470812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Apr 23 04:37:45 2013
@@ -167,6 +167,10 @@ Release 2.0.5-beta - UNRELEASED
     YARN-542. Changed the default global AM max-attempts value to be not one.
     (Zhijie Shen via vinodkv)
 
+    YARN-583. Moved application level local resources to be localized under the
+    filecache sub-directory under application directory. (Omkar Vinit Joshi via
+    vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1470812&r1=1470811&r2=1470812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Apr 23 04:37:45 2013
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -46,7 +47,6 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -65,6 +65,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -481,18 +482,15 @@ public class ResourceLocalizationService
   }
 
   private String getUserFileCachePath(String user) {
-    String path =
-        "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
-            + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
-    return path;
+    return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
+      ContainerLocalizer.USERCACHE, user, ContainerLocalizer.FILECACHE));
+
   }
 
-  private String getUserAppCachePath(String user, String appId) {
-    String path =
-        "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
-            + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
-            + Path.SEPARATOR + appId;
-    return path;
+  private String getAppFileCachePath(String user, String appId) {
+    return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
+      ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
+      ContainerLocalizer.FILECACHE));
   }
   
   @VisibleForTesting
@@ -942,7 +940,7 @@ public class ResourceLocalizationService
       if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
         cacheDirectory = getUserFileCachePath(user);
       } else {// APPLICATION ONLY
-        cacheDirectory = getUserAppCachePath(user, appId.toString());
+        cacheDirectory = getAppFileCachePath(user, appId.toString());
       }
       Path dirPath =
           dirsHandler.getLocalPathForWrite(cacheDirectory,

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1470812&r1=1470811&r2=1470812&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Tue Apr 23 04:37:45 2013
@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -72,6 +73,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -848,6 +851,163 @@ public class TestResourceLocalizationSer
     }
   }
 
+  @Test(timeout = 10000)
+  @SuppressWarnings("unchecked")
+  public void testLocalResourcePath() throws Exception {
+
+    // test the local path where application and user cache files will be
+    // localized.
+
+    DrainDispatcher dispatcher1 = null;
+    try {
+      dispatcher1 = new DrainDispatcher();
+      String user = "testuser";
+      ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+
+      // mocked Resource Localization Service
+      Configuration conf = new Configuration();
+      AbstractFileSystem spylfs =
+          spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+      final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+      // We don't want files to be created
+      doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
+        anyBoolean());
+
+      // creating one local directory
+      List<Path> localDirs = new ArrayList<Path>();
+      String[] sDirs = new String[1];
+      for (int i = 0; i < 1; ++i) {
+        localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+        sDirs[i] = localDirs.get(i).toString();
+      }
+      conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+      // setting log directory.
+      String logDir =
+          lfs.makeQualified(new Path(basedir, "logdir ")).toString();
+      conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+
+      LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
+      localDirHandler.init(conf);
+      // Registering event handlers
+      EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+      dispatcher1.register(ApplicationEventType.class, applicationBus);
+      EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+      dispatcher1.register(ContainerEventType.class, containerBus);
+
+      ContainerExecutor exec = mock(ContainerExecutor.class);
+      DeletionService delService = mock(DeletionService.class);
+      LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+      // initializing directory handler.
+      dirsHandler.init(conf);
+
+      dispatcher1.init(conf);
+      dispatcher1.start();
+
+      ResourceLocalizationService rls =
+          new ResourceLocalizationService(dispatcher1, exec, delService,
+            localDirHandler);
+      dispatcher1.register(LocalizationEventType.class, rls);
+      rls.init(conf);
+
+      rls.handle(createApplicationLocalizationEvent(user, appId));
+
+      // We need to pre-populate the LocalizerRunner as the
+      // Resource Localization Service code internally starts them which
+      // definitely we don't want.
+
+      // creating new container and populating corresponding localizer runner
+
+      // Container - 1
+      Container container1 = createMockContainer(user, 1);
+      String localizerId1 = container1.getContainerID().toString();
+      rls.getPrivateLocalizers().put(
+        localizerId1,
+        rls.new LocalizerRunner(new LocalizerContext(user, container1
+          .getContainerID(), null), localizerId1));
+
+      // Creating two requests for container
+      // 1) Private resource
+      // 2) Application resource
+      LocalResourceRequest reqPriv =
+          new LocalResourceRequest(new Path("file:///tmp1"), 123L,
+            LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
+      List<LocalResourceRequest> privList =
+          new ArrayList<LocalResourceRequest>();
+      privList.add(reqPriv);
+
+      LocalResourceRequest reqApp =
+          new LocalResourceRequest(new Path("file:///tmp2"), 123L,
+            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, "");
+      List<LocalResourceRequest> appList =
+          new ArrayList<LocalResourceRequest>();
+      appList.add(reqApp);
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+          new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+      rsrcs.put(LocalResourceVisibility.APPLICATION, appList);
+      rsrcs.put(LocalResourceVisibility.PRIVATE, privList);
+
+      dispatcher1.getEventHandler().handle(
+        new ContainerLocalizationRequestEvent(container1, rsrcs));
+
+      // Now waiting for resource download to start. Here actual will not start
+      // Only the resources will be populated into pending list.
+      Assert
+        .assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 2, 500));
+
+      // Validating user and application cache paths
+
+      String userCachePath =
+          StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
+            .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
+            ContainerLocalizer.FILECACHE));
+      String userAppCachePath =
+          StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
+            .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
+            ContainerLocalizer.APPCACHE, appId.toString(),
+            ContainerLocalizer.FILECACHE));
+
+      // Now the Application and private resources may come in any order
+      // for download.
+      // For User cahce :
+      // returned destinationPath = user cache path + random number
+      // For App cache :
+      // returned destinationPath = user app cache path + random number
+
+      int returnedResources = 0;
+      boolean appRsrc = false, privRsrc = false;
+      while (returnedResources < 2) {
+        LocalizerHeartbeatResponse response =
+            rls.heartbeat(createLocalizerStatus(localizerId1));
+        for (ResourceLocalizationSpec resourceSpec : response
+          .getResourceSpecs()) {
+          returnedResources++;
+          Path destinationDirectory =
+              new Path(resourceSpec.getDestinationDirectory().getFile());
+          if (resourceSpec.getResource().getVisibility() ==
+              LocalResourceVisibility.APPLICATION) {
+            appRsrc = true;
+            Assert.assertEquals(userAppCachePath, destinationDirectory
+              .getParent().toUri().toString());
+          } else if (resourceSpec.getResource().getVisibility() == 
+              LocalResourceVisibility.PRIVATE) {
+            privRsrc = true;
+            Assert.assertEquals(userCachePath, destinationDirectory.getParent()
+              .toUri().toString());
+          } else {
+            throw new Exception("Unexpected resource recevied.");
+          }
+        }
+      }
+      // We should receive both the resources (Application and Private)
+      Assert.assertTrue(appRsrc && privRsrc);
+    } finally {
+      if (dispatcher1 != null) {
+        dispatcher1.stop();
+      }
+    }
+  }
+
   private LocalizerStatus createLocalizerStatusForFailedResource(
       String localizerId, LocalResourceRequest req) {
     LocalizerStatus status = createLocalizerStatus(localizerId);