You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/05/10 17:18:59 UTC

svn commit: r1101502 [2/2] - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/ yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ yarn/ya...

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue May 10 15:18:58 2011
@@ -54,6 +54,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.net.NetUtils;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@@ -102,16 +104,16 @@ public class ResourceLocalizationService
   public static final FsPermission NM_PRIVATE_PERM = new FsPermission((short) 0700);
 
   private Server server;
-  private InetSocketAddress locAddr;
+  private InetSocketAddress localizationServerAddress;
   private List<Path> logDirs;
   private List<Path> localDirs;
   private List<Path> sysDirs;
   private final ContainerExecutor exec;
   protected final Dispatcher dispatcher;
   private final DeletionService delService;
-  private LocalizerTracker localizers;
-  private final RecordFactory recordFactory =
-    RecordFactoryProvider.getRecordFactory(null);
+  private LocalizerTracker localizerTracker;
+  private RecordFactory recordFactory;
+  private final LocalDirAllocator localDirsSelector;
 
   //private final LocalResourcesTracker publicRsrc;
   private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
@@ -125,6 +127,7 @@ public class ResourceLocalizationService
     this.exec = exec;
     this.dispatcher = dispatcher;
     this.delService = delService;
+    this.localDirsSelector = new LocalDirAllocator(NMConfig.NM_LOCAL_DIR);
   }
 
   FileContext getLocalFileContext(Configuration conf) {
@@ -137,6 +140,7 @@ public class ResourceLocalizationService
 
   @Override
   public void init(Configuration conf) {
+    this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     try {
       // TODO queue deletions here, rather than NM init?
       FileContext lfs = getLocalFileContext(conf);
@@ -172,16 +176,16 @@ public class ResourceLocalizationService
     localDirs = Collections.unmodifiableList(localDirs);
     logDirs = Collections.unmodifiableList(logDirs);
     sysDirs = Collections.unmodifiableList(sysDirs);
-    locAddr = NetUtils.createSocketAddr(
+    localizationServerAddress = NetUtils.createSocketAddr(
       conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS));
-    localizers = new LocalizerTracker();
-    dispatcher.register(LocalizerEventType.class, localizers);
+    localizerTracker = new LocalizerTracker();
+    dispatcher.register(LocalizerEventType.class, localizerTracker);
     super.init(conf);
   }
 
   @Override
   public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
-    return localizers.processHeartbeat(status);
+    return localizerTracker.processHeartbeat(status);
   }
 
   @Override
@@ -202,8 +206,8 @@ public class ResourceLocalizationService
           LocalizerSecurityInfo.class, SecurityInfo.class);
       secretManager = new LocalizerTokenSecretManager();
     }
-    return rpc.getServer(
-        LocalizationProtocol.class, this, locAddr, conf, secretManager);
+    return rpc.getServer(LocalizationProtocol.class, this,
+        localizationServerAddress, conf, secretManager);
   }
 
   @Override
@@ -211,8 +215,8 @@ public class ResourceLocalizationService
     if (server != null) {
       server.close();
     }
-    if (localizers != null) {
-      localizers.stop();
+    if (localizerTracker != null) {
+      localizerTracker.stop();
     }
     super.stop();
   }
@@ -233,7 +237,7 @@ public class ResourceLocalizationService
       if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
           new LocalResourcesTrackerImpl(dispatcher))) {
         LOG.warn("Initializing application " + app + " already present");
-        assert false;
+        assert false; // TODO: FIXME assert doesn't help
       }
       // 1) Signal container init
       dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
@@ -261,6 +265,8 @@ public class ResourceLocalizationService
           appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId()));
         break;
       }
+      // We get separate events one each for all resources of one visibility. So
+      // all the resources in this event are of the same visibility.
       for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
         tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
       }
@@ -270,24 +276,25 @@ public class ResourceLocalizationService
         ((ContainerLocalizationEvent)event).getContainer();
 
       // Delete the container directories
-      userName = container.getUser();;
+      userName = container.getUser();
       String containerIDStr = container.toString();
       appIDStr =
         ConverterUtils.toString(container.getContainerID().getAppId());
       for (Path localDir : localDirs) {
+
+        // Delete the user-owned container-dir
         Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
-        Path userdir =
-            new Path(usersdir, userName);
+        Path userdir = new Path(usersdir, userName);
         Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
         Path appDir = new Path(allAppsdir, appIDStr);
-        Path containerDir =
-            new Path(appDir, containerIDStr);
-        delService.delete(userName, containerDir, null);
+        Path containerDir = new Path(appDir, containerIDStr);
+        delService.delete(userName, containerDir, new Path[] {});
 
+        // Delete the nmPrivate container-dir
         Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
         Path appSysDir = new Path(sysDir, appIDStr);
         Path containerSysDir = new Path(appSysDir, containerIDStr);
-        delService.delete(null, containerSysDir, null);
+        delService.delete(null, containerSysDir,  new Path[] {});
       }
 
       dispatcher.getEventHandler().handle(new ContainerEvent(
@@ -298,24 +305,28 @@ public class ResourceLocalizationService
 
       Application application =
           ((ApplicationLocalizationEvent) event).getApplication();
-      if (null == appRsrc.remove(application)) {
+      LocalResourcesTracker appLocalRsrcsTracker = appRsrc.remove(application);
+      if (null == appLocalRsrcsTracker) {
         LOG.warn("Removing uninitialized application " + application);
       }
+      // TODO: What to do with appLocalRsrcsTracker?
 
       // Delete the application directories
       userName = application.getUser();
       appIDStr = application.toString();
       for (Path localDir : localDirs) {
+
+        // Delete the user-owned app-dir
         Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
-        Path userdir =
-            new Path(usersdir, userName);
+        Path userdir = new Path(usersdir, userName);
         Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
         Path appDir = new Path(allAppsdir, appIDStr);
-        delService.delete(userName, appDir, null);
+        delService.delete(userName, appDir, new Path[] {});
 
+        // Delete the nmPrivate app-dir
         Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
         Path appSysDir = new Path(sysDir, appIDStr);
-        delService.delete(null, appSysDir, null);
+        delService.delete(null, appSysDir, new Path[] {});
       }
 
       // TODO: decrement reference counts of all resources associated with this
@@ -328,109 +339,114 @@ public class ResourceLocalizationService
     }
   }
 
+  /**
+   * Sub-component handling the spawning of {@link ContainerLocalizer}s
+   * 
+   */
   class LocalizerTracker implements EventHandler<LocalizerEvent> {
 
-    private final Map<String,LocalizerRunner> trackers;
+    private final Map<String,LocalizerRunner> localizerRunners;
 
     LocalizerTracker() {
       this(new HashMap<String,LocalizerRunner>());
     }
 
     LocalizerTracker(Map<String,LocalizerRunner> trackers) {
-      this.trackers = trackers;
+      this.localizerRunners = trackers;
     }
 
     public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
       String locId = status.getLocalizerId();
-      synchronized (trackers) {
-        LocalizerRunner localizer = trackers.get(locId);
-        if (null == localizer) {
+      synchronized (localizerRunners) {
+        LocalizerRunner localizerRunner = localizerRunners.get(locId);
+        if (null == localizerRunner) {
           // TODO process resources anyway
+          LOG.info("Unknown localizer with localizerId " + locId
+              + " is sending heartbeat. Ordering it to DIE");
           LocalizerHeartbeatResponse response =
             recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
           response.setLocalizerAction(LocalizerAction.DIE);
           return response;
         }
-        return localizer.update(status.getResources());
+        return localizerRunner.update(status.getResources());
       }
     }
 
     public void stop() {
-      for (LocalizerRunner localizer : trackers.values()) {
+      for (LocalizerRunner localizer : localizerRunners.values()) {
         localizer.interrupt();
       }
     }
 
     @Override
     public void handle(LocalizerEvent event) {
-      synchronized (trackers) {
-        String locId = event.getLocalizerId();
-        LocalizerRunner localizer = trackers.get(locId);
+      synchronized (localizerRunners) {
+        String localizerId = event.getLocalizerId();
+        LocalizerRunner localizerRunner = localizerRunners.get(localizerId);
         switch(event.getType()) {
           case REQUEST_RESOURCE_LOCALIZATION:
             // 0) find running localizer or start new thread
             LocalizerResourceRequestEvent req =
               (LocalizerResourceRequestEvent)event;
-            if (null == localizer) {
-              LOG.info("Created localizer for " + req.getLocalizerId());
-              // TODO: ROUND_ROBIN below.
-              localizer = new LocalizerRunner(req.getContext(),
-                  sysDirs.get(0), req.getLocalizerId(), logDirs.get(0));
-              trackers.put(locId, localizer);
-              localizer.start();
+            if (null == localizerRunner) {
+              LOG.info("Created localizerRunner for " + req.getLocalizerId());
+            localizerRunner =
+                new LocalizerRunner(req.getContext(), req.getLocalizerId());
+              localizerRunners.put(localizerId, localizerRunner);
+              localizerRunner.start();
             }
             // 1) propagate event
-            localizer.addResource(req);
+            localizerRunner.addResource(req);
             break;
           case ABORT_LOCALIZATION:
+            // TODO: Who calls this?
             // 0) find running localizer, interrupt and remove
-            if (null == localizer) {
+            if (null == localizerRunner) {
               return; // ignore; already gone
             }
-            trackers.remove(locId);
-            localizer.interrupt();
+            localizerRunners.remove(localizerId);
+            localizerRunner.interrupt();
             break;
         }
       }
     }
   }
 
+  /**
+   * Runs the {@link ContainerLocalizer} itself in a separate process with
+   * access to user's credentials. One {@link LocalizerRunner} per localizerId.
+   * 
+   */
   class LocalizerRunner extends Thread {
 
     final LocalizerContext context;
     final String localizerId;
-    final Path nmPrivate;
-    final Path rootLogDir;
     final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
     final List<LocalizerResourceRequestEvent> pending;
 
     private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
 
-    LocalizerRunner(LocalizerContext context, Path nmPrivate,
-        String localizerId, Path logDir) {
-      this(context, nmPrivate, localizerId, logDir,
-          new ArrayList<LocalizerResourceRequestEvent>(),
-          new HashMap<LocalResourceRequest,LocalizerResourceRequestEvent>());
-    }
-
-    LocalizerRunner(LocalizerContext context, Path nmPrivate,
-        String localizerId, Path logDir,
-        List<LocalizerResourceRequestEvent> pending,
-        Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled) {
-      this.nmPrivate = nmPrivate;
+    LocalizerRunner(LocalizerContext context, String localizerId) {
       this.context = context;
       this.localizerId = localizerId;
-      this.rootLogDir = logDir;
-      this.pending = pending;
-      this.scheduled = scheduled;
+      this.pending = new ArrayList<LocalizerResourceRequestEvent>();
+      this.scheduled =
+          new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>();
     }
 
     public void addResource(LocalizerResourceRequestEvent request) {
+      // TDOO: Synchronization
       pending.add(request);
     }
 
-    LocalResource findNextResource() {
+    /**
+     * Find next resource to be given to a spawned localizer.
+     * 
+     * @return
+     */
+    private LocalResource findNextResource() {
+      // TODO: Synchronization
       for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
            i.hasNext();) {
         LocalizerResourceRequestEvent evt = i.next();
@@ -457,16 +473,19 @@ public class ResourceLocalizationService
 
     // TODO this sucks. Fix it later
     LocalizerHeartbeatResponse update(
-        List<LocalResourceStatus> stats) {
+        List<LocalResourceStatus> remoteResourceStatuses) {
       LocalizerHeartbeatResponse response =
         recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
 
-      if (stats.isEmpty()) {
+      // The localizer has just spawned. Start giving it resources for
+      // remote-fetching.
+      if (remoteResourceStatuses.isEmpty()) {
         LocalResource next = findNextResource();
         if (next != null) {
           response.setLocalizerAction(LocalizerAction.LIVE);
           response.addResource(next);
         } else if (pending.isEmpty()) {
+          // TODO: Synchronization
           response.setLocalizerAction(LocalizerAction.DIE);
         } else {
           response.setLocalizerAction(LocalizerAction.LIVE);
@@ -474,7 +493,7 @@ public class ResourceLocalizationService
         return response;
       }
 
-      for (LocalResourceStatus stat : stats) {
+      for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
         LocalResourceRequest req = null;
         try {
@@ -498,6 +517,7 @@ public class ResourceLocalizationService
                     stat.getLocalSize()));
             } catch (URISyntaxException e) { }
             if (pending.isEmpty()) {
+              // TODO: Synchronization
               response.setLocalizerAction(LocalizerAction.DIE);
               break;
             }
@@ -534,20 +554,28 @@ public class ResourceLocalizationService
     @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
       try {
+        // Use LocalDirAllocator to get nmPrivateDir
+        Path nmPrivateCTokensPath =
+            localDirsSelector.getLocalPathForWrite(
+                NM_PRIVATE_DIR
+                    + Path.SEPARATOR
+                    + String.format(ContainerLocalizer.TOKEN_FILE_FMT,
+                        localizerId), getConfig());
         // 0) init queue, etc.
         // 1) write credentials to private dir
         DataOutputStream tokenOut = null;
         try {
           Credentials credentials = context.getCredentials();
-          Path cTokens = new Path(nmPrivate, String.format(
-                ContainerLocalizer.TOKEN_FILE_FMT, localizerId));
           FileContext lfs = getLocalFileContext(getConfig());
-          tokenOut = lfs.create(cTokens, EnumSet.of(CREATE, OVERWRITE));
+          tokenOut =
+              lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
           LOG.info("Writing credentials to the nmPrivate file "
-              + cTokens.toString() + ". Credentials list: ");
-          for (Token<? extends TokenIdentifier> tk :
-              credentials.getAllTokens()) {
-            LOG.info(tk.getService() + " : " + tk.encodeToUrlString());
+              + nmPrivateCTokensPath.toString() + ". Credentials list: ");
+          if (LOG.isDebugEnabled()) {
+            for (Token<? extends TokenIdentifier> tk : credentials
+                .getAllTokens()) {
+              LOG.debug(tk.getService() + " : " + tk.encodeToUrlString());
+            }
           }
           credentials.writeTokenStorageToStream(tokenOut);
         } finally {
@@ -556,9 +584,10 @@ public class ResourceLocalizationService
           }
         }
         // 2) exec initApplication and wait
-        exec.startLocalizer(nmPrivate, locAddr, context.getUser(),
+        exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
+            context.getUser(),
             ConverterUtils.toString(context.getContainerId().getAppId()),
-            localizerId, rootLogDir, localDirs);
+            localizerId, localDirs);
       } catch (Exception e) {
         // 3) on error, report failure to Container and signal ABORT
         // 3.1) notify resource of failed localization

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java Tue May 10 15:18:58 2011
@@ -23,13 +23,20 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 
-public class ContainerLocalizationRequestEvent extends ContainerLocalizationEvent {
+public class ContainerLocalizationRequestEvent extends
+    ContainerLocalizationEvent {
 
   private final LocalResourceVisibility vis;
   private final Collection<LocalResourceRequest> reqs;
 
-  public ContainerLocalizationRequestEvent(Container c, Collection<LocalResourceRequest> reqs,
-      LocalResourceVisibility vis) {
+  /**
+   * Event requesting the localization of the reqs all with visibility vis
+   * @param c
+   * @param reqs
+   * @param vis
+   */
+  public ContainerLocalizationRequestEvent(Container c,
+      Collection<LocalResourceRequest> reqs, LocalResourceVisibility vis) {
     super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
     this.vis = vis;
     this.reqs = reqs;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEvent.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEvent.java Tue May 10 15:18:58 2011
@@ -29,7 +29,7 @@ public class ResourceEvent extends Abstr
     this.rsrc = rsrc;
   }
 
-  public LocalResourceRequest getLocalResource() {
+  public LocalResourceRequest getLocalResourceRequest() {
     return rsrc;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java Tue May 10 15:18:58 2011
@@ -78,31 +78,41 @@ public class AggregatedLogFormat {
 
   public static class LogValue {
 
-    private final File containerLogDir;
+    private final String[] rootLogDirs;
+    private final ContainerId containerId;
 
-    public LogValue(File containerLogDir) {
-      this.containerLogDir = containerLogDir;
+    public LogValue(String[] rootLogDirs, ContainerId containerId) {
+      this.rootLogDirs = rootLogDirs;
+      this.containerId = containerId;
     }
 
     public void write(DataOutputStream out) throws IOException {
-      if (!this.containerLogDir.isDirectory()) {
-        return; // ContainerDir may have been deleted by the user.
-      }
+      for (String rootLogDir : this.rootLogDirs) {
+        File appLogDir =
+            new File(rootLogDir, ConverterUtils.toString(this.containerId
+                .getAppId()));
+        File containerLogDir =
+            new File(appLogDir, ConverterUtils.toString(this.containerId));
+
+        if (!containerLogDir.isDirectory()) {
+          continue; // ContainerDir may have been deleted by the user.
+        }
 
-      for (File logFile : this.containerLogDir.listFiles()) {
+        for (File logFile : containerLogDir.listFiles()) {
 
-        // Write the logFile Type
-        out.writeUTF(logFile.getName());
+          // Write the logFile Type
+          out.writeUTF(logFile.getName());
 
-        // Write the log length as UTF so that it is printable
-        out.writeUTF(String.valueOf(logFile.length()));
+          // Write the log length as UTF so that it is printable
+          out.writeUTF(String.valueOf(logFile.length()));
 
-        // Write the log itself
-        FileInputStream in = new FileInputStream(logFile);
-        byte[] buf = new byte[65535];
-        int len = 0;
-        while ((len = in.read(buf)) != -1) {
-          out.write(buf, 0, len);
+          // Write the log itself
+          FileInputStream in = new FileInputStream(logFile);
+          byte[] buf = new byte[65535];
+          int len = 0;
+          while ((len = in.read(buf)) != -1) {
+            out.write(buf, 0, len);
+          }
         }
       }
     }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Tue May 10 15:18:58 2011
@@ -20,8 +20,6 @@ package org.apache.hadoop.yarn.server.no
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,12 +43,12 @@ public class AppLogAggregatorImpl implem
       .getLog(AppLogAggregatorImpl.class);
   private static final int THREAD_SLEEP_TIME = 1000;
 
-  private final ApplicationId applicationId;
+  private final String applicationId;
   private boolean logAggregationDisabled = false;
   private final Configuration conf;
   private final DeletionService delService;
   private final UserGroupInformation userUgi;
-  private final File localAppLogDir;
+  private final String[] rootLogDirs;
   private final Path remoteNodeLogFileForApp;
   private final ContainerLogsRetentionPolicy retentionPolicy;
 
@@ -62,22 +60,18 @@ public class AppLogAggregatorImpl implem
 
   public AppLogAggregatorImpl(DeletionService deletionService,
       Configuration conf, ApplicationId appId, UserGroupInformation userUgi,
-      File localAppLogDir, Path remoteNodeLogFileForApp,
+      String[] localRootLogDirs, Path remoteNodeLogFileForApp,
       ContainerLogsRetentionPolicy retentionPolicy) {
     this.conf = conf;
     this.delService = deletionService;
-    this.applicationId = appId;
+    this.applicationId = ConverterUtils.toString(appId);
     this.userUgi = userUgi;
-    this.localAppLogDir = localAppLogDir;
+    this.rootLogDirs = localRootLogDirs;
     this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
     this.retentionPolicy = retentionPolicy;
     this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
   }
 
-  private File getLocalContainerLogDir(ContainerId containerId) {
-    return new File(this.localAppLogDir, ConverterUtils.toString(containerId));
-  }
-
   private void uploadLogsForContainer(ContainerId containerId) {
 
     if (this.logAggregationDisabled) {
@@ -99,11 +93,9 @@ public class AppLogAggregatorImpl implem
       }
     }
 
-    File containerLogDir = getLocalContainerLogDir(containerId);
-    LOG.info("Uploading logs for container " + containerId + " from "
-        + containerLogDir);
+    LOG.info("Uploading logs for container " + containerId);
     LogKey logKey = new LogKey(containerId);
-    LogValue logValue = new LogValue(containerLogDir);
+    LogValue logValue = new LogValue(this.rootLogDirs, containerId);
     try {
       this.writer.append(logKey, logValue);
     } catch (IOException e) {
@@ -135,9 +127,12 @@ public class AppLogAggregatorImpl implem
       uploadLogsForContainer(containerId);
     }
 
-    // Remove the local app-log-dir
-    this.delService.delete(this.userUgi.getShortUserName(), new Path(
-        this.localAppLogDir.getAbsolutePath()), new Path[] {});
+    // Remove the local app-log-dirs
+    for (String rootLogDir : this.rootLogDirs) {
+      File localAppLogDir = new File(rootLogDir, this.applicationId);
+      this.delService.delete(this.userUgi.getShortUserName(), new Path(
+          localAppLogDir.getAbsolutePath()), new Path[] {});
+    }
 
     if (this.writer != null) {
       this.writer.closeWriter();

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Tue May 10 15:18:58 2011
@@ -21,9 +21,6 @@ package org.apache.hadoop.yarn.server.no
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_BIND_ADDRESS;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_BIND_ADDRESS;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -33,10 +30,8 @@ import java.util.concurrent.Executors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -62,7 +57,7 @@ public class LogAggregationService exten
 
   private final DeletionService deletionService;
 
-  private File localRootLogDir;
+  private String[] localRootLogDirs;
   Path remoteRootLogDir;
   private String nodeFile;
 
@@ -86,8 +81,8 @@ public class LogAggregationService exten
   }
 
   public synchronized void init(Configuration conf) {
-    this.localRootLogDir =
-        new File(conf.get(NMConfig.NM_LOG_DIR, NMConfig.DEFAULT_NM_LOG_DIR));
+    this.localRootLogDirs =
+        conf.getStrings(NMConfig.NM_LOG_DIR, NMConfig.DEFAULT_NM_LOG_DIR);
     this.remoteRootLogDir =
         new Path(conf.get(NMConfig.REMOTE_USER_LOG_DIR,
             NMConfig.DEFAULT_REMOTE_APP_LOG_DIR));
@@ -119,10 +114,6 @@ public class LogAggregationService exten
     return new Path(remoteRootLogDir, ConverterUtils.toString(appId));
   }
 
-  File getLocalAppLogDir(ApplicationId appId) {
-    return new File(this.localRootLogDir, ConverterUtils.toString(appId));
-  }
-
   @Override
   public synchronized void stop() {
     LOG.info(this.getName() + " waiting for pending aggregation during exit");
@@ -148,7 +139,7 @@ public class LogAggregationService exten
     // New application
     AppLogAggregator appLogAggregator =
         new AppLogAggregatorImpl(this.deletionService, getConfig(), appId,
-            userUgi, getLocalAppLogDir(appId),
+            userUgi, this.localRootLogDirs,
             getRemoteNodeLogFileForApp(appId), logRetentionPolicy);
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
       throw new YarnException("Duplicate initApp for " + appId);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java Tue May 10 15:18:58 2011
@@ -24,14 +24,19 @@ import static org.apache.hadoop.yarn.ser
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -52,12 +57,14 @@ public class ContainerLogsPage extends N
       NMWebParams {
 
     private final Configuration conf;
+    private final LocalDirAllocator logsSelector;
     private final Context nmContext;
     private final RecordFactory recordFactory;
 
     @Inject
     public ContainersLogsBlock(Configuration conf, Context context) {
       this.conf = conf;
+      this.logsSelector = new LocalDirAllocator(NMConfig.NM_LOG_DIR);
       this.nmContext = context;
       this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     }
@@ -81,10 +88,19 @@ public class ContainerLogsPage extends N
           container.getContainerState())) {
 
         if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
-          // TODO: Get the following from logs' owning component.
-          File containerLogsDir =
-              getContainerLogDir(this.conf, containerId);
-          File logFile = new File(containerLogsDir, $(CONTAINER_LOG_TYPE));
+          File logFile = null;
+          try {
+            logFile =
+                new File(this.logsSelector
+                    .getLocalPathToRead(
+                        ConverterUtils.toString(containerId.getAppId())
+                            + Path.SEPARATOR + $(CONTAINER_ID)
+                            + Path.SEPARATOR
+                            + $(CONTAINER_LOG_TYPE), this.conf).toUri()
+                    .getPath());
+          } catch (Exception e) {
+            div.h1("Cannot find this log on the local disk.")._();
+          }
           div.h1(logFile.getName());
           long start =
               $("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
@@ -130,19 +146,20 @@ public class ContainerLogsPage extends N
           div._();
         } else {
           // Just print out the log-types
-          File containerLogsDir =
-              getContainerLogDir(this.conf, containerId);
-          // TODO: No nested dir structure. Fix MR userlogs.
-          for (File logFile : containerLogsDir.listFiles()) {
-            div
-              .p()
-                .a(
-                    url($(NM_HTTP_URL), "yarn", "containerlogs",
-                        $(CONTAINER_ID),
-                        logFile.getName(), "?start=-4076"),
-                    logFile.getName() + " : Total file length is " 
-                    + logFile.length() + " bytes.")
-              ._();
+          List<File> containerLogsDirs =
+              getContainerLogDirs(this.conf, containerId);
+          for (File containerLogsDir : containerLogsDirs) {
+            for (File logFile : containerLogsDir.listFiles()) {
+              div
+                  .p()
+                  .a(
+                      url($(NM_HTTP_URL), "yarn", "containerlogs",
+                          $(CONTAINER_ID),
+                          logFile.getName(), "?start=-4076"),
+                      logFile.getName() + " : Total file length is "
+                          + logFile.length() + " bytes.")
+                  ._();
+            }
           }
           div._();
         }
@@ -151,16 +168,18 @@ public class ContainerLogsPage extends N
       }
     }
 
-    static File
-        getContainerLogDir(Configuration conf, ContainerId containerId) {
+    static List<File>
+        getContainerLogDirs(Configuration conf, ContainerId containerId) {
       String[] logDirs =
           conf.getStrings(NM_LOG_DIR, DEFAULT_NM_LOG_DIR);
-      File logDir = new File(logDirs[0]); // TODO: In case of ROUND_ROBIN
-      String appIdStr = ConverterUtils.toString(containerId.getAppId());
-      File appLogDir = new File(logDir, appIdStr);
-      String containerIdStr = ConverterUtils.toString(containerId);
-      File containerLogDir = new File(appLogDir, containerIdStr);
-      return containerLogDir;
+      List<File> containerLogDirs = new ArrayList<File>(logDirs.length);
+      for (String logDir : logDirs) {
+        String appIdStr = ConverterUtils.toString(containerId.getAppId());
+        File appLogDir = new File(logDir, appIdStr);
+        String containerIdStr = ConverterUtils.toString(containerId);
+        containerLogDirs.add(new File(appLogDir, containerIdStr));
+      }
+      return containerLogDirs;
     }
     
   }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java Tue May 10 15:18:58 2011
@@ -96,9 +96,11 @@ public class TestDeletionService {
     System.out.println("SEED: " + seed);
     List<Path> dirs = buildDirs(r, base, 20);
     createDirs(new Path("."), dirs);
-    DeletionService del =
-      new DeletionService(new FakeDefaultContainerExecutor());
-    del.init(new Configuration());
+    FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor();
+    Configuration conf = new Configuration();
+    exec.setConf(conf);
+    DeletionService del = new DeletionService(exec);
+    del.init(conf);
     del.start();
     try {
       for (Path p : dirs) {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Tue May 10 15:18:58 2011
@@ -77,6 +77,7 @@ public class TestEventFlow {
     conf.set(NMConfig.REMOTE_USER_LOG_DIR, remoteLogDir.getAbsolutePath());
 
     ContainerExecutor exec = new DefaultContainerExecutor();
+    exec.setConf(conf);
     DeletionService del = new DeletionService(exec);
     Dispatcher dispatcher = new AsyncDispatcher();
     NodeHealthCheckerService healthChecker = null;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Tue May 10 15:18:58 2011
@@ -90,7 +90,7 @@ public abstract class BaseContainerManag
 
   protected Configuration conf = new YarnConfiguration();
   protected Context context = new NMContext();
-  protected ContainerExecutor exec = new DefaultContainerExecutor();
+  protected ContainerExecutor exec;
   protected DeletionService delSrvc;
   protected String user = "nobody";
 
@@ -111,7 +111,9 @@ public abstract class BaseContainerManag
   protected ContainerManagerImpl containerManager = null;
 
   protected ContainerExecutor createContainerExecutor() {
-    return new DefaultContainerExecutor();
+    DefaultContainerExecutor exec = new DefaultContainerExecutor();
+    exec.setConf(conf);
+    return exec;
   }
 
   @Before

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Tue May 10 15:18:58 2011
@@ -90,15 +90,13 @@ public class TestContainerLocalizer {
     final String appId = "app_RM_0";
     final String cId = "container_0";
     final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 4344);
-    final Path logDir = lfs.makeQualified(new Path(basedir, "logs"));
     final List<Path> localDirs = new ArrayList<Path>();
     for (int i = 0; i < 4; ++i) {
       localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
     }
     RecordFactory mockRF = getMockLocalizerRecordFactory();
     ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, user,
-        appId, cId, logDir, localDirs,
-        new HashMap<LocalResource,Future<Path>>(), mockRF);
+        appId, cId, localDirs, mockRF);
     ContainerLocalizer localizer = spy(concreteLoc);
 
     // return credential stream instead of opening local file
@@ -166,29 +164,25 @@ public class TestContainerLocalizer {
     // run localization
     assertEquals(0, localizer.runLocalization(nmAddr));
 
-    // verify created cache, application dirs
+    // verify created cache
     for (Path p : localDirs) {
       Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), user);
       Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
       // $x/usercache/$user/filecache
-      verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(true));
+      verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(false));
       Path appDir =
         new Path(base, new Path(ContainerLocalizer.APPCACHE, appId));
       // $x/usercache/$user/appcache/$appId/filecache
       Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
-      verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(true));
+      verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(false));
       // $x/usercache/$user/appcache/$appId/output
       Path appOutput = new Path(appDir, ContainerLocalizer.OUTPUTDIR);
-      verify(spylfs).mkdir(eq(appOutput), isA(FsPermission.class), eq(true));
+      verify(spylfs).mkdir(eq(appOutput), isA(FsPermission.class), eq(false));
     }
 
     // verify tokens read at expected location
     verify(spylfs).open(tokenPath);
 
-    // verify log dir creation
-    verify(spylfs).mkdir(eq(new Path(logDir, appId)),
-        isA(FsPermission.class), anyBoolean());
-
     // verify downloaded resources reported to NM
     verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA)));
     verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB)));

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Tue May 10 15:18:58 2011
@@ -99,7 +99,8 @@ public class TestLogAggregationService e
     ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
 
     // AppLogDir should be created
-    File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+    File app1LogDir =
+        new File(localLogDir, ConverterUtils.toString(application1));
     app1LogDir.mkdir();
     logAggregationService
         .handle(new LogAggregatorAppStartedEvent(
@@ -141,7 +142,8 @@ public class TestLogAggregationService e
     ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
 
     // AppLogDir should be created
-    File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+    File app1LogDir =
+      new File(localLogDir, ConverterUtils.toString(application1));
     app1LogDir.mkdir();
     logAggregationService
         .handle(new LogAggregatorAppStartedEvent(
@@ -173,7 +175,8 @@ public class TestLogAggregationService e
     ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
 
     // AppLogDir should be created
-    File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+    File app1LogDir =
+      new File(localLogDir, ConverterUtils.toString(application1));
     app1LogDir.mkdir();
     logAggregationService
         .handle(new LogAggregatorAppStartedEvent(
@@ -189,7 +192,8 @@ public class TestLogAggregationService e
 
     ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
 
-    File app2LogDir = logAggregationService.getLocalAppLogDir(application2);
+    File app2LogDir =
+      new File(localLogDir, ConverterUtils.toString(application2));
     app2LogDir.mkdir();
     logAggregationService.handle(new LogAggregatorAppStartedEvent(
         application2, this.user, null,
@@ -209,7 +213,8 @@ public class TestLogAggregationService e
 
     ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
 
-    File app3LogDir = logAggregationService.getLocalAppLogDir(application3);
+    File app3LogDir =
+      new File(localLogDir, ConverterUtils.toString(application3));
     app3LogDir.mkdir();
     logAggregationService.handle(new LogAggregatorAppStartedEvent(
         application3, this.user, null,

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Tue May 10 15:18:58 2011
@@ -122,8 +122,8 @@ public class TestNMWebServer {
         throws IOException {
     // ContainerLogDir should be created
     File containerLogDir =
-        ContainerLogsPage.ContainersLogsBlock.getContainerLogDir(conf,
-            containerId);
+        ContainerLogsPage.ContainersLogsBlock.getContainerLogDirs(conf,
+            containerId).get(0);
     containerLogDir.mkdirs();
     for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
       Writer writer = new FileWriter(new File(containerLogDir, fileType));