You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/28 08:45:07 UTC

svn commit: r1190174 [2/3] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/or...

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Fri Oct 28 06:45:04 2011
@@ -29,8 +29,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -48,7 +50,6 @@ import org.apache.hadoop.yarn.state.Mult
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /**
  * The state machine for the representation of an Application
@@ -63,6 +64,7 @@ public class ApplicationImpl implements 
   final ApplicationACLsManager aclsManager;
   private final ReadLock readLock;
   private final WriteLock writeLock;
+  private final Context context;
 
   private static final Log LOG = LogFactory.getLog(Application.class);
 
@@ -71,12 +73,13 @@ public class ApplicationImpl implements 
 
   public ApplicationImpl(Dispatcher dispatcher,
       ApplicationACLsManager aclsManager, String user, ApplicationId appId,
-      Credentials credentials) {
+      Credentials credentials, Context context) {
     this.dispatcher = dispatcher;
     this.user = user.toString();
     this.appId = appId;
     this.credentials = credentials;
     this.aclsManager = aclsManager;
+    this.context = context;
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
     writeLock = lock.writeLock();
@@ -173,7 +176,13 @@ public class ApplicationImpl implements 
                ApplicationState.FINISHED,
                ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
                new AppCompletelyDoneTransition())
-
+           
+           // Transitions from FINISHED state
+           .addTransition(ApplicationState.FINISHED,
+               ApplicationState.FINISHED,
+               ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
+               new AppLogsAggregatedTransition())
+               
            // create the topology tables
            .installTopology();
 
@@ -239,11 +248,15 @@ public class ApplicationImpl implements 
     @Override
     public void transition(ApplicationImpl app, ApplicationEvent event) {
 
+      Map<ApplicationAccessType, String> appAcls =
+          app.getContainers().values().iterator().next().getLaunchContext()
+              .getApplicationACLs();
+
       // Inform the logAggregator
       app.dispatcher.getEventHandler().handle(
-            new LogAggregatorAppStartedEvent(app.appId, app.user,
-                app.credentials,
-                ContainerLogsRetentionPolicy.ALL_CONTAINERS)); // TODO: Fix
+          new LogAggregatorAppStartedEvent(app.appId, app.user,
+              app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
+              appAcls)); 
 
       // Start all the containers waiting for ApplicationInit
       for (Container container : app.containers.values()) {
@@ -339,13 +352,20 @@ public class ApplicationImpl implements 
     @Override
     public void transition(ApplicationImpl app, ApplicationEvent event) {
 
-      app.aclsManager.removeApplication(app.getAppId());
-
       // Inform the logService
       app.dispatcher.getEventHandler().handle(
           new LogAggregatorAppFinishedEvent(app.appId));
 
-      // TODO: Also make logService write the acls to the aggregated file.
+    }
+  }
+
+  static class AppLogsAggregatedTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+      ApplicationId appId = event.getApplicationID();
+      app.context.getApplications().remove(appId);
+      app.aclsManager.removeApplication(appId);
     }
   }
 
@@ -377,6 +397,6 @@ public class ApplicationImpl implements 
 
   @Override
   public String toString() {
-    return ConverterUtils.toString(appId);
+    return appId.toString();
   }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Fri Oct 28 06:45:04 2011
@@ -119,11 +119,11 @@ public class ContainerLaunch implements 
       // /////////////////////////// Variable expansion
       // Before the container script gets written out.
       List<String> newCmds = new ArrayList<String>(command.size());
-      String appIdStr = app.toString();
+      String appIdStr = app.getAppId().toString();
       Path containerLogDir =
-          this.logDirsSelector.getLocalPathForWrite(appIdStr + Path.SEPARATOR
-              + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf, 
-              false);
+          this.logDirsSelector.getLocalPathForWrite(ContainerLaunch
+              .getRelativeContainerLogDir(appIdStr, containerIdStr),
+              LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
       for (String str : command) {
         // TODO: Should we instead work via symlinks without this grammar?
         newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
@@ -384,6 +384,11 @@ public class ContainerLaunch implements 
     return processId;
   }
 
+  public static String getRelativeContainerLogDir(String appIdStr,
+      String containerIdStr) {
+    return appIdStr + Path.SEPARATOR + containerIdStr;
+  }
+
   private String getContainerPrivateDir(String appIdStr, String containerIdStr) {
     return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
         + Path.SEPARATOR;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java Fri Oct 28 06:45:04 2011
@@ -25,10 +25,16 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.InputStreamReader;
 import java.io.IOException;
+import java.io.Writer;
 import java.security.PrivilegedExceptionAction;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 
+import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +47,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.file.tfile.TFile;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -48,32 +56,50 @@ import org.apache.hadoop.yarn.util.Conve
 public class AggregatedLogFormat {
 
   static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
-
+  private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
+  private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
+  private static final LogKey VERSION_KEY = new LogKey("VERSION");
+  private static final Map<String, LogKey> RESERVED_KEYS;
+  //Maybe write out the retention policy.
+  //Maybe write out a list of containerLogs skipped by the retention policy.
+  private static final int VERSION = 1;
+
+  static {
+    RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
+    RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
+    RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
+    RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
+  }
+  
   public static class LogKey implements Writable {
 
-    private String containerId;
+    private String keyString;
 
     public LogKey() {
 
     }
 
     public LogKey(ContainerId containerId) {
-      this.containerId = ConverterUtils.toString(containerId);
+      this.keyString = containerId.toString();
     }
 
+    public LogKey(String keyString) {
+      this.keyString = keyString;
+    }
+    
     @Override
     public void write(DataOutput out) throws IOException {
-      out.writeUTF(this.containerId);
+      out.writeUTF(this.keyString);
     }
 
     @Override
     public void readFields(DataInput in) throws IOException {
-      this.containerId = in.readUTF();
+      this.keyString = in.readUTF();
     }
 
     @Override
     public String toString() {
-      return this.containerId;
+      return this.keyString;
     }
   }
 
@@ -81,6 +107,8 @@ public class AggregatedLogFormat {
 
     private final String[] rootLogDirs;
     private final ContainerId containerId;
+    // TODO Maybe add a version string here. Instead of changing the version of
+    // the entire k-v format
 
     public LogValue(String[] rootLogDirs, ContainerId containerId) {
       this.rootLogDirs = rootLogDirs;
@@ -141,7 +169,8 @@ public class AggregatedLogFormat {
               public FSDataOutputStream run() throws Exception {
                 return FileContext.getFileContext(conf).create(
                     remoteAppLogFile,
-                    EnumSet.of(CreateFlag.CREATE), new Options.CreateOpts[] {});
+                    EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+                    new Options.CreateOpts[] {});
               }
             });
       } catch (InterruptedException e) {
@@ -154,6 +183,40 @@ public class AggregatedLogFormat {
           new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
               YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
               YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
+      //Write the version string
+      writeVersion();
+    }
+
+    private void writeVersion() throws IOException {
+      DataOutputStream out = this.writer.prepareAppendKey(-1);
+      VERSION_KEY.write(out);
+      out.close();
+      out = this.writer.prepareAppendValue(-1);
+      out.writeInt(VERSION);
+      out.close();
+      this.fsDataOStream.hflush();
+    }
+
+    public void writeApplicationOwner(String user) throws IOException {
+      DataOutputStream out = this.writer.prepareAppendKey(-1);
+      APPLICATION_OWNER_KEY.write(out);
+      out.close();
+      out = this.writer.prepareAppendValue(-1);
+      out.writeUTF(user);
+      out.close();
+    }
+
+    public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
+        throws IOException {
+      DataOutputStream out = this.writer.prepareAppendKey(-1);
+      APPLICATION_ACL_KEY.write(out);
+      out.close();
+      out = this.writer.prepareAppendValue(-1);
+      for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
+        out.writeUTF(entry.getKey().toString());
+        out.writeUTF(entry.getValue());
+      }
+      out.close();
     }
 
     public void append(LogKey logKey, LogValue logValue) throws IOException {
@@ -184,12 +247,13 @@ public class AggregatedLogFormat {
 
     private final FSDataInputStream fsDataIStream;
     private final TFile.Reader.Scanner scanner;
+    private final TFile.Reader reader;
 
     public LogReader(Configuration conf, Path remoteAppLogFile)
         throws IOException {
       FileContext fileContext = FileContext.getFileContext(conf);
       this.fsDataIStream = fileContext.open(remoteAppLogFile);
-      TFile.Reader reader =
+      reader =
           new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
               remoteAppLogFile).getLen(), conf);
       this.scanner = reader.createScanner();
@@ -198,6 +262,69 @@ public class AggregatedLogFormat {
     private boolean atBeginning = true;
 
     /**
+     * Returns the owner of the application.
+     * 
+     * @return the application owner.
+     * @throws IOException
+     */
+    public String getApplicationOwner() throws IOException {
+      TFile.Reader.Scanner ownerScanner = reader.createScanner();
+      LogKey key = new LogKey();
+      while (!ownerScanner.atEnd()) {
+        TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
+        key.readFields(entry.getKeyStream());
+        if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
+          DataInputStream valueStream = entry.getValueStream();
+          return valueStream.readUTF();
+        }
+        ownerScanner.advance();
+      }
+      return null;
+    }
+
+    /**
+     * Returns ACLs for the application. An empty map is returned if no ACLs are
+     * found.
+     * 
+     * @return a map of the Application ACLs.
+     * @throws IOException
+     */
+    public Map<ApplicationAccessType, String> getApplicationAcls()
+        throws IOException {
+      // TODO Seek directly to the key once a comparator is specified.
+      TFile.Reader.Scanner aclScanner = reader.createScanner();
+      LogKey key = new LogKey();
+      Map<ApplicationAccessType, String> acls =
+          new HashMap<ApplicationAccessType, String>();
+      while (!aclScanner.atEnd()) {
+        TFile.Reader.Scanner.Entry entry = aclScanner.entry();
+        key.readFields(entry.getKeyStream());
+        if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
+          DataInputStream valueStream = entry.getValueStream();
+          while (true) {
+            String appAccessOp = null;
+            String aclString = null;
+            try {
+              appAccessOp = valueStream.readUTF();
+            } catch (EOFException e) {
+              // Valid end of stream.
+              break;
+            }
+            try {
+              aclString = valueStream.readUTF();
+            } catch (EOFException e) {
+              throw new YarnException("Error reading ACLs", e);
+            }
+            acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
+          }
+
+        }
+        aclScanner.advance();
+      }
+      return acls;
+    }
+    
+    /**
      * Read the next key and return the value-stream.
      * 
      * @param key
@@ -215,10 +342,99 @@ public class AggregatedLogFormat {
       }
       TFile.Reader.Scanner.Entry entry = this.scanner.entry();
       key.readFields(entry.getKeyStream());
+      // Skip META keys
+      if (RESERVED_KEYS.containsKey(key.toString())) {
+        return next(key);
+      }
       DataInputStream valueStream = entry.getValueStream();
       return valueStream;
     }
 
+    
+    //TODO  Change Log format and interfaces to be containerId specific.
+    // Avoid returning completeValueStreams.
+//    public List<String> getTypesForContainer(DataInputStream valueStream){}
+//    
+//    /**
+//     * @param valueStream
+//     *          The Log stream for the container.
+//     * @param fileType
+//     *          the log type required.
+//     * @return An InputStreamReader for the required log type or null if the
+//     *         type is not found.
+//     * @throws IOException
+//     */
+//    public InputStreamReader getLogStreamForType(DataInputStream valueStream,
+//        String fileType) throws IOException {
+//      valueStream.reset();
+//      try {
+//        while (true) {
+//          String ft = valueStream.readUTF();
+//          String fileLengthStr = valueStream.readUTF();
+//          long fileLength = Long.parseLong(fileLengthStr);
+//          if (ft.equals(fileType)) {
+//            BoundedInputStream bis =
+//                new BoundedInputStream(valueStream, fileLength);
+//            return new InputStreamReader(bis);
+//          } else {
+//            long totalSkipped = 0;
+//            long currSkipped = 0;
+//            while (currSkipped != -1 && totalSkipped < fileLength) {
+//              currSkipped = valueStream.skip(fileLength - totalSkipped);
+//              totalSkipped += currSkipped;
+//            }
+//            // TODO Verify skip behaviour.
+//            if (currSkipped == -1) {
+//              return null;
+//            }
+//          }
+//        }
+//      } catch (EOFException e) {
+//        return null;
+//      }
+//    }
+
+    /**
+     * Writes all logs for a single container to the provided writer.
+     * @param valueStream
+     * @param writer
+     * @throws IOException
+     */
+    public static void readAcontainerLogs(DataInputStream valueStream,
+        Writer writer) throws IOException {
+      int bufferSize = 65536;
+      char[] cbuf = new char[bufferSize];
+      String fileType;
+      String fileLengthStr;
+      long fileLength;
+
+      while (true) {
+        try {
+          fileType = valueStream.readUTF();
+        } catch (EOFException e) {
+          // EndOfFile
+          return;
+        }
+        fileLengthStr = valueStream.readUTF();
+        fileLength = Long.parseLong(fileLengthStr);
+        writer.write("\n\nLogType:");
+        writer.write(fileType);
+        writer.write("\nLogLength:");
+        writer.write(fileLengthStr);
+        writer.write("\nLog Contents:\n");
+        // ByteLevel
+        BoundedInputStream bis =
+            new BoundedInputStream(valueStream, fileLength);
+        InputStreamReader reader = new InputStreamReader(bis);
+        int currentRead = 0;
+        int totalRead = 0;
+        while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) {
+          writer.write(cbuf);
+          totalRead += currentRead;
+        }
+      }
+    }
+
     /**
      * Keep calling this till you get a {@link EOFException} for getting logs of
      * all types for a single container.

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Fri Oct 28 06:45:04 2011
@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
-import java.io.File;
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,11 +28,16 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogWriter;
@@ -42,7 +48,10 @@ public class AppLogAggregatorImpl implem
   private static final Log LOG = LogFactory
       .getLog(AppLogAggregatorImpl.class);
   private static final int THREAD_SLEEP_TIME = 1000;
+  private static final String TMP_FILE_SUFFIX = ".tmp";
 
+  private final Dispatcher dispatcher;
+  private final ApplicationId appId;
   private final String applicationId;
   private boolean logAggregationDisabled = false;
   private final Configuration conf;
@@ -50,26 +59,34 @@ public class AppLogAggregatorImpl implem
   private final UserGroupInformation userUgi;
   private final String[] rootLogDirs;
   private final Path remoteNodeLogFileForApp;
+  private final Path remoteNodeTmpLogFileForApp;
   private final ContainerLogsRetentionPolicy retentionPolicy;
 
   private final BlockingQueue<ContainerId> pendingContainers;
   private final AtomicBoolean appFinishing = new AtomicBoolean();
   private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
+  private final Map<ApplicationAccessType, String> appAcls;
 
   private LogWriter writer = null;
 
-  public AppLogAggregatorImpl(DeletionService deletionService,
-      Configuration conf, ApplicationId appId, UserGroupInformation userUgi,
-      String[] localRootLogDirs, Path remoteNodeLogFileForApp,
-      ContainerLogsRetentionPolicy retentionPolicy) {
+  public AppLogAggregatorImpl(Dispatcher dispatcher,
+      DeletionService deletionService, Configuration conf, ApplicationId appId,
+      UserGroupInformation userUgi, String[] localRootLogDirs,
+      Path remoteNodeLogFileForApp,
+      ContainerLogsRetentionPolicy retentionPolicy,
+      Map<ApplicationAccessType, String> appAcls) {
+    this.dispatcher = dispatcher;
     this.conf = conf;
     this.delService = deletionService;
+    this.appId = appId;
     this.applicationId = ConverterUtils.toString(appId);
     this.userUgi = userUgi;
     this.rootLogDirs = localRootLogDirs;
     this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
+    this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
     this.retentionPolicy = retentionPolicy;
     this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
+    this.appAcls = appAcls;
   }
 
   private void uploadLogsForContainer(ContainerId containerId) {
@@ -80,11 +97,15 @@ public class AppLogAggregatorImpl implem
 
     // Lazy creation of the writer
     if (this.writer == null) {
-      LOG.info("Starting aggregate log-file for app " + this.applicationId);
+      LOG.info("Starting aggregate log-file for app " + this.applicationId
+          + " at " + this.remoteNodeTmpLogFileForApp);
       try {
         this.writer =
-            new LogWriter(this.conf, this.remoteNodeLogFileForApp,
+            new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
                 this.userUgi);
+        //Write ACLs once when and if the writer is created.
+        this.writer.writeApplicationACLs(appAcls);
+        this.writer.writeApplicationOwner(this.userUgi.getShortUserName());
       } catch (IOException e) {
         LOG.error("Cannot create writer for app " + this.applicationId
             + ". Disabling log-aggregation for this app.", e);
@@ -105,8 +126,8 @@ public class AppLogAggregatorImpl implem
   }
 
   @Override
-  public void run() {
-
+  @SuppressWarnings("unchecked")
+  public void run() {    
     ContainerId containerId;
 
     while (!this.appFinishing.get()) {
@@ -141,10 +162,33 @@ public class AppLogAggregatorImpl implem
       this.writer.closeWriter();
       LOG.info("Finished aggregate log-file for app " + this.applicationId);
     }
-
+    try {
+      userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          FileSystem remoteFS = FileSystem.get(conf);
+          remoteFS.rename(remoteNodeTmpLogFileForApp, remoteNodeLogFileForApp);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Failed to move temporary log file to final location: ["
+          + remoteNodeTmpLogFileForApp + "] to [" + remoteNodeLogFileForApp
+          + "]", e);
+    }
+    
+    this.dispatcher.getEventHandler().handle(
+        new ApplicationEvent(this.appId,
+            ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED));
+        
     this.appAggregationFinished.set(true);
   }
 
+  private Path getRemoteNodeTmpLogFileForApp() {
+    return new Path(remoteNodeLogFileForApp.getParent(),
+        (remoteNodeLogFileForApp.getName() + TMP_FILE_SUFFIX));
+  }
+
   private boolean shouldUploadLogs(ContainerId containerId,
       boolean wasContainerSuccessful) {
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Fri Oct 28 06:45:04 2011
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -31,20 +31,26 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -54,20 +60,43 @@ public class LogAggregationService exten
   private static final Log LOG = LogFactory
       .getLog(LogAggregationService.class);
 
+  /*
+   * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
+   * Group to which NMOwner belongs> App dirs will be created as 750,
+   * owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
+   * access / modify the files.
+   * <NMGroup> should obviously be a limited access group.
+   */
+  /**
+   * Permissions for the top level directory under which app directories will be
+   * created.
+   */
+  private static final FsPermission TLDIR_PERMISSIONS = FsPermission
+      .createImmutable((short) 01777);
+  /**
+   * Permissions for the Application directory.
+   */
+  private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
+      .createImmutable((short) 0750);
+
   private final Context context;
   private final DeletionService deletionService;
+  private final Dispatcher dispatcher;
 
   private String[] localRootLogDirs;
   Path remoteRootLogDir;
-  private String nodeFile;
+  String remoteRootLogDirSuffix;
+  private NodeId nodeId;
+  private boolean isLogAggregationEnabled = false;
 
   private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
 
   private final ExecutorService threadPool;
 
-  public LogAggregationService(Context context,
+  public LogAggregationService(Dispatcher dispatcher, Context context,
       DeletionService deletionService) {
     super(LogAggregationService.class.getName());
+    this.dispatcher = dispatcher;
     this.context = context;
     this.deletionService = deletionService;
     this.appLogAggregators =
@@ -80,10 +109,17 @@ public class LogAggregationService exten
 
   public synchronized void init(Configuration conf) {
     this.localRootLogDirs =
-        conf.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
+        conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
+            YarnConfiguration.DEFAULT_NM_LOG_DIRS);
     this.remoteRootLogDir =
         new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    this.remoteRootLogDirSuffix =
+        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+    this.isLogAggregationEnabled =
+        conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
+
     super.init(conf);
   }
 
@@ -91,37 +127,231 @@ public class LogAggregationService exten
   public synchronized void start() {
     // NodeId is only available during start, the following cannot be moved
     // anywhere else.
-    this.nodeFile = this.context.getNodeId().toString();
+    this.nodeId = this.context.getNodeId();
+    verifyAndCreateRemoteLogDir(getConfig());
     super.start();
   }
+  
+  @Override
+  public synchronized void stop() {
+    LOG.info(this.getName() + " waiting for pending aggregation during exit");
+    for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) {
+      appLogAggregator.join();
+    }
+    super.stop();
+  }
 
-  Path getRemoteNodeLogFileForApp(ApplicationId appId) {
-    return getRemoteNodeLogFileForApp(this.remoteRootLogDir, appId,
-        this.nodeFile);
+  /**
+   * Constructs the full filename for an application's log file per node.
+   * @param remoteRootLogDir
+   * @param appId
+   * @param user
+   * @param nodeId
+   * @param suffix
+   * @return the remote log file.
+   */
+  public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
+      ApplicationId appId, String user, NodeId nodeId, String suffix) {
+    return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix),
+        getNodeString(nodeId));
+  }
+
+  /**
+   * Gets the remote app log dir.
+   * @param remoteRootLogDir
+   * @param appId
+   * @param user
+   * @param suffix
+   * @return the remote application specific log dir.
+   */
+  public static Path getRemoteAppLogDir(Path remoteRootLogDir,
+      ApplicationId appId, String user, String suffix) {
+    return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix),
+        appId.toString());
+  }
+
+  /**
+   * Gets the remote suffixed log dir for the user.
+   * @param remoteRootLogDir
+   * @param user
+   * @param suffix
+   * @return the remote suffixed log dir.
+   */
+  private static Path getRemoteLogSuffixedDir(Path remoteRootLogDir,
+      String user, String suffix) {
+    if (suffix == null || suffix.isEmpty()) {
+      return getRemoteLogUserDir(remoteRootLogDir, user);
+    }
+    // TODO Maybe support suffix to be more than a single file.
+    return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix);
   }
 
-  static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
-      ApplicationId appId, String nodeFile) {
-    return new Path(getRemoteAppLogDir(remoteRootLogDir, appId),
-        nodeFile);
+  // TODO Add a utility method to list available log files. Ignore the
+  // temporary ones.
+  
+  /**
+   * Gets the remote log user dir.
+   * @param remoteRootLogDir
+   * @param user
+   * @return the remote per user log dir.
+   */
+  private static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) {
+    return new Path(remoteRootLogDir, user);
+  }
+
+  /**
+   * Returns the suffix component of the log dir.
+   * @param conf
+   * @return the suffix which will be appended to the user log dir.
+   */
+  public static String getRemoteNodeLogDirSuffix(Configuration conf) {
+    return conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+        YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+  }
+
+  
+  /**
+   * Converts a nodeId to a form used in the app log file name.
+   * @param nodeId
+   * @return the node string to be used to construct the file name.
+   */
+  private static String getNodeString(NodeId nodeId) {
+    return nodeId.toString().replace(":", "_");
+  }
+
+  
+
+
+  
+  private void verifyAndCreateRemoteLogDir(Configuration conf) {
+    // Checking the existance of the TLD
+    FileSystem remoteFS = null;
+    try {
+      remoteFS = FileSystem.get(conf);
+    } catch (IOException e) {
+      throw new YarnException("Unable to get Remote FileSystem isntance", e);
+    }
+    boolean remoteExists = false;
+    try {
+      remoteExists = remoteFS.exists(this.remoteRootLogDir);
+    } catch (IOException e) {
+      throw new YarnException("Failed to check for existance of remoteLogDir ["
+          + this.remoteRootLogDir + "]");
+    }
+    if (remoteExists) {
+      try {
+        FsPermission perms =
+            remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
+        if (!perms.equals(TLDIR_PERMISSIONS)) {
+          LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+              + "] already exist, but with incorrect permissions. "
+              + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+              + "]." + " The cluster may have problems with multiple users.");
+        }
+      } catch (IOException e) {
+        throw new YarnException(
+            "Failed while attempting to check permissions for dir ["
+                + this.remoteRootLogDir + "]");
+      }
+    } else {
+      LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+          + "] does not exist. Attempting to create it.");
+      try {
+        Path qualified =
+            this.remoteRootLogDir.makeQualified(remoteFS.getUri(),
+                remoteFS.getWorkingDirectory());
+        remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
+        remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
+      } catch (IOException e) {
+        throw new YarnException("Failed to create remoteLogDir ["
+            + this.remoteRootLogDir + "]", e);
+      }
+    }
+
   }
 
-  static Path getRemoteAppLogDir(Path remoteRootLogDir,
-      ApplicationId appId) {
-    return new Path(remoteRootLogDir, ConverterUtils.toString(appId));
+  Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
+    return LogAggregationService.getRemoteNodeLogFileForApp(
+        this.remoteRootLogDir, appId, user, this.nodeId,
+        this.remoteRootLogDirSuffix);
   }
 
-  @Override
-  public synchronized void stop() {
-    LOG.info(this.getName() + " waiting for pending aggregation during exit");
-    for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) {
-      appLogAggregator.join();
+  private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
+      throws IOException {
+    fs.mkdirs(path, new FsPermission(fsPerm));
+    fs.setPermission(path, new FsPermission(fsPerm));
+  }
+
+  private void createAppDir(final String user, final ApplicationId appId,
+      UserGroupInformation userUgi) {
+    try {
+      userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          // TODO: Reuse FS for user?
+          FileSystem remoteFS = null;
+          Path userDir = null;
+          Path suffixDir = null;
+          Path appDir = null;
+          try {
+            remoteFS = FileSystem.get(getConfig());
+          } catch (IOException e) {
+            LOG.error("Failed to get remote FileSystem while processing app "
+                + appId, e);
+            throw e;
+          }
+          try {
+            userDir =
+                getRemoteLogUserDir(
+                    LogAggregationService.this.remoteRootLogDir, user);
+            userDir =
+                userDir.makeQualified(remoteFS.getUri(),
+                    remoteFS.getWorkingDirectory());
+            createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+          } catch (IOException e) {
+            LOG.error("Failed to create user dir [" + userDir
+                + "] while processing app " + appId);
+            throw e;
+          }
+          try {
+            suffixDir =
+                getRemoteLogSuffixedDir(
+                    LogAggregationService.this.remoteRootLogDir, user,
+                    LogAggregationService.this.remoteRootLogDirSuffix);
+            suffixDir =
+                suffixDir.makeQualified(remoteFS.getUri(),
+                    remoteFS.getWorkingDirectory());
+            createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+          } catch (IOException e) {
+            LOG.error("Failed to create suffixed user dir [" + suffixDir
+                + "] while processing app " + appId);
+            throw e;
+          }
+          try {
+            appDir =
+                getRemoteAppLogDir(LogAggregationService.this.remoteRootLogDir,
+                    appId, user,
+                    LogAggregationService.this.remoteRootLogDirSuffix);
+            appDir =
+                appDir.makeQualified(remoteFS.getUri(),
+                    remoteFS.getWorkingDirectory());
+            createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+          } catch (IOException e) {
+            LOG.error("Failed to  create application log dir [" + appDir
+                + "] while processing app " + appId);
+            throw e;
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      throw new YarnException(e);
     }
-    super.stop();
   }
 
   private void initApp(final ApplicationId appId, String user,
-      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy) {
+      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
+      Map<ApplicationAccessType, String> appAcls) {
 
     // Get user's FileSystem credentials
     UserGroupInformation userUgi =
@@ -133,41 +363,27 @@ public class LogAggregationService exten
       }
     }
 
+    // Create the app dir
+    createAppDir(user, appId, userUgi);
+
     // New application
     AppLogAggregator appLogAggregator =
-        new AppLogAggregatorImpl(this.deletionService, getConfig(), appId,
-            userUgi, this.localRootLogDirs,
-            getRemoteNodeLogFileForApp(appId), logRetentionPolicy);
+        new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId,
+            userUgi, this.localRootLogDirs, 
+            getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, appAcls);
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
       throw new YarnException("Duplicate initApp for " + appId);
     }
 
-    // Create the app dir
-    try {
-      userUgi.doAs(new PrivilegedExceptionAction<Object>() {
-        @Override
-        public Object run() throws Exception {
-          // TODO: Reuse FS for user?
-          FileSystem remoteFS = FileSystem.get(getConfig());
-          remoteFS.mkdirs(getRemoteAppLogDir(
-              LogAggregationService.this.remoteRootLogDir, appId)
-              .makeQualified(remoteFS.getUri(),
-                  remoteFS.getWorkingDirectory()));
-          return null;
-        }
-      });
-    } catch (Exception e) {
-      throw new YarnException(e);
-    }
 
-    // Get the user configuration for the list of containers that need log
+    // TODO Get the user configuration for the list of containers that need log
     // aggregation.
 
     // Schedule the aggregator.
     this.threadPool.execute(appLogAggregator);
   }
 
-  private void stopContainer(ContainerId containerId, String exitCode) {
+  private void stopContainer(ContainerId containerId, int exitCode) {
 
     // A container is complete. Put this containers' logs up for aggregation if
     // this containers' logs are needed.
@@ -179,7 +395,7 @@ public class LogAggregationService exten
     }
     this.appLogAggregators.get(
         containerId.getApplicationAttemptId().getApplicationId())
-        .startContainerLogAggregation(containerId, exitCode.equals("0"));
+        .startContainerLogAggregation(containerId, exitCode == 0);
   }
 
   private void stopApp(ApplicationId appId) {
@@ -196,27 +412,30 @@ public class LogAggregationService exten
 
   @Override
   public void handle(LogAggregatorEvent event) {
-//    switch (event.getType()) {
-//    case APPLICATION_STARTED:
-//      LogAggregatorAppStartedEvent appStartEvent =
-//          (LogAggregatorAppStartedEvent) event;
-//      initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
-//          appStartEvent.getCredentials(),
-//          appStartEvent.getLogRetentionPolicy());
-//      break;
-//    case CONTAINER_FINISHED:
-//      LogAggregatorContainerFinishedEvent containerFinishEvent =
-//          (LogAggregatorContainerFinishedEvent) event;
-//      stopContainer(containerFinishEvent.getContainerId(),
-//          containerFinishEvent.getExitCode());
-//      break;
-//    case APPLICATION_FINISHED:
-//      LogAggregatorAppFinishedEvent appFinishedEvent =
-//          (LogAggregatorAppFinishedEvent) event;
-//      stopApp(appFinishedEvent.getApplicationId());
-//      break;
-//    default:
-//      ; // Ignore
-//    }
+    if (this.isLogAggregationEnabled) {
+      switch (event.getType()) {
+        case APPLICATION_STARTED:
+          LogAggregatorAppStartedEvent appStartEvent =
+              (LogAggregatorAppStartedEvent) event;
+          initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
+              appStartEvent.getCredentials(),
+              appStartEvent.getLogRetentionPolicy(),
+              appStartEvent.getApplicationAcls());
+          break;
+        case CONTAINER_FINISHED:
+          LogAggregatorContainerFinishedEvent containerFinishEvent =
+              (LogAggregatorContainerFinishedEvent) event;
+          stopContainer(containerFinishEvent.getContainerId(),
+              containerFinishEvent.getExitCode());
+          break;
+        case APPLICATION_FINISHED:
+          LogAggregatorAppFinishedEvent appFinishedEvent =
+              (LogAggregatorAppFinishedEvent) event;
+          stopApp(appFinishedEvent.getApplicationId());
+          break;
+        default:
+          ; // Ignore
+      }
+    }
   }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java Fri Oct 28 06:45:04 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -110,7 +111,12 @@ public class LogDumper extends Configure
       AggregatedLogFormat.LogReader reader =
           new AggregatedLogFormat.LogReader(getConf(),
               LogAggregationService.getRemoteNodeLogFileForApp(
-                  remoteRootLogDir, appId, nodeAddress));
+                  remoteRootLogDir,
+                  appId,
+                  UserGroupInformation.getCurrentUser().getShortUserName(),
+                  ConverterUtils.toNodeId(nodeAddress),
+                  getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+                      YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)));
       return dumpAContainerLogs(containerIdStr, reader, out);
     }
 
@@ -152,16 +158,21 @@ public class LogDumper extends Configure
     Path remoteRootLogDir =
         new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    String logDirSuffix =
+        getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+    //TODO Change this to get a list of files from the LAS.
     Path remoteAppLogDir =
-        LogAggregationService.getRemoteAppLogDir(remoteRootLogDir, appId);
+        LogAggregationService.getRemoteAppLogDir(remoteRootLogDir, appId, user,
+            logDirSuffix);
     RemoteIterator<FileStatus> nodeFiles =
         FileContext.getFileContext().listStatus(remoteAppLogDir);
     while (nodeFiles.hasNext()) {
       FileStatus thisNodeFile = nodeFiles.next();
       AggregatedLogFormat.LogReader reader =
           new AggregatedLogFormat.LogReader(getConf(),
-              LogAggregationService.getRemoteNodeLogFileForApp(
-                  remoteRootLogDir, appId, thisNodeFile.getPath().getName()));
+              new Path(remoteAppLogDir, thisNodeFile.getPath().getName()));
       try {
 
         DataInputStream valueStream;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java Fri Oct 28 06:45:04 2011
@@ -1,25 +1,28 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
 
+import java.util.Map;
+
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
 
 public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
@@ -28,14 +31,17 @@ public class LogAggregatorAppStartedEven
   private final ContainerLogsRetentionPolicy retentionPolicy;
   private final String user;
   private final Credentials credentials;
+  private final Map<ApplicationAccessType, String> appAcls;
 
   public LogAggregatorAppStartedEvent(ApplicationId appId, String user,
-      Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy) {
+      Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
+      Map<ApplicationAccessType, String> appAcls) {
     super(LogAggregatorEventType.APPLICATION_STARTED);
     this.applicationId = appId;
     this.user = user;
     this.credentials = credentials;
     this.retentionPolicy = retentionPolicy;
+    this.appAcls = appAcls;
   }
 
   public ApplicationId getApplicationId() {
@@ -54,4 +60,8 @@ public class LogAggregatorAppStartedEven
     return this.user;
   }
 
+  public Map<ApplicationAccessType, String> getApplicationAcls() {
+    return this.appAcls;
+  }
+
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java?rev=1190174&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java Fri Oct 28 06:45:04 2011
@@ -0,0 +1,175 @@
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.NM_NODENAME;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.APP_OWNER;
+
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import com.google.inject.Inject;
+
+public class AggregatedLogsBlock extends HtmlBlock {
+
+  private final Configuration conf;
+
+  @Inject
+  AggregatedLogsBlock(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  protected void render(Block html) {
+    ContainerId containerId = verifyAndGetContainerId(html);
+    NodeId nodeId = verifyAndGetNodeId(html);
+    String appOwner = verifyAndGetAppOwner(html);
+    if (containerId == null || nodeId == null || appOwner == null
+        || appOwner.isEmpty()) {
+      return;
+    }
+    
+    ApplicationId applicationId =
+        containerId.getApplicationAttemptId().getApplicationId();
+    String logEntity = $(ENTITY_STRING);
+    if (logEntity == null || logEntity.isEmpty()) {
+      logEntity = containerId.toString();
+    }
+
+    Path remoteRootLogDir =
+        new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    AggregatedLogFormat.LogReader reader = null;
+    try {
+      reader =
+          new AggregatedLogFormat.LogReader(conf,
+              LogAggregationService.getRemoteNodeLogFileForApp(
+                  remoteRootLogDir, applicationId, appOwner, nodeId,
+                  LogAggregationService.getRemoteNodeLogDirSuffix(conf)));
+    } catch (FileNotFoundException e) {
+      // ACLs not available till the log file is opened.
+      html.h1()
+          ._("Logs not available for "
+              + logEntity
+              + ". Aggregation may not be complete, "
+              + "Check back later or try the nodemanager on "
+              + nodeId)._();
+      return;
+    } catch (IOException e) {
+      html.h1()._("Error getting logs for " + logEntity)._();
+      LOG.error("Error getting logs for " + logEntity, e);
+      return;
+    }
+
+    String owner = null;
+    Map<ApplicationAccessType, String> appAcls = null;
+    try {
+      owner = reader.getApplicationOwner();
+      appAcls = reader.getApplicationAcls();
+    } catch (IOException e) {
+      html.h1()._("Error getting logs for " + logEntity)._();
+      LOG.error("Error getting logs for " + logEntity, e);
+      return;
+    }
+    ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
+    aclsManager.addApplication(applicationId, appAcls);
+
+    String remoteUser = request().getRemoteUser();
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    if (callerUGI != null
+        && !aclsManager.checkAccess(callerUGI, ApplicationAccessType.VIEW_APP,
+            owner, applicationId)) {
+      html.h1()
+          ._("User [" + remoteUser
+              + "] is not authorized to view the logs for " + logEntity)._();
+      return;
+    }
+
+    DataInputStream valueStream;
+    LogKey key = new LogKey();
+    try {
+      valueStream = reader.next(key);
+      while (valueStream != null
+          && !key.toString().equals(containerId.toString())) {
+        valueStream = reader.next(key);
+      }
+      if (valueStream == null) {
+        html.h1()._(
+            "Logs not available for " + logEntity
+                + ". Could be caused by the rentention policy")._();
+        return;
+      }
+      writer().write("<pre>");
+      AggregatedLogFormat.LogReader.readAcontainerLogs(valueStream, writer());
+      writer().write("</pre>");
+      return;
+    } catch (IOException e) {
+      html.h1()._("Error getting logs for " + logEntity)._();
+      LOG.error("Error getting logs for " + logEntity, e);
+      return;
+    }
+  }
+
+  private ContainerId verifyAndGetContainerId(Block html) {
+    String containerIdStr = $(CONTAINER_ID);
+    if (containerIdStr == null || containerIdStr.isEmpty()) {
+      html.h1()._("Cannot get container logs without a ContainerId")._();
+      return null;
+    }
+    ContainerId containerId = null;
+    try {
+      containerId = ConverterUtils.toContainerId(containerIdStr);
+    } catch (IllegalArgumentException e) {
+      html.h1()
+          ._("Cannot get container logs for invalid containerId: "
+              + containerIdStr)._();
+      return null;
+    }
+    return containerId;
+  }
+
+  private NodeId verifyAndGetNodeId(Block html) {
+    String nodeIdStr = $(NM_NODENAME);
+    if (nodeIdStr == null || nodeIdStr.isEmpty()) {
+      html.h1()._("Cannot get container logs without a NodeId")._();
+      return null;
+    }
+    NodeId nodeId = null;
+    try {
+      nodeId = ConverterUtils.toNodeId(nodeIdStr);
+    } catch (IllegalArgumentException e) {
+      html.h1()._("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
+          ._();
+      return null;
+    }
+    return nodeId;
+  }
+  
+  private String verifyAndGetAppOwner(Block html) {
+    String appOwner = $(APP_OWNER);
+    if (appOwner == null || appOwner.isEmpty()) {
+      html.h1()._("Cannot get container logs without an app owner")._();
+    }
+    return appOwner;
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsNavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsNavBlock.java?rev=1190174&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsNavBlock.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsNavBlock.java Fri Oct 28 06:45:04 2011
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+public class AggregatedLogsNavBlock extends HtmlBlock implements NMWebParams {
+
+  @Override
+  protected void render(Block html) {
+    html
+      .div("#nav")
+        .h3()._("Logs")._() // 
+      ._()
+      .div("#themeswitcher")._();
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsPage.java?rev=1190174&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsPage.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsPage.java Fri Oct 28 06:45:04 2011
@@ -0,0 +1,44 @@
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.THEMESWITCHER_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+
+
+import org.apache.hadoop.yarn.webapp.SubView;
+
+
+public class AggregatedLogsPage extends NMView {
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.yarn.server.nodemanager.webapp.NMView#preHead(org.apache.hadoop.yarn.webapp.hamlet.Hamlet.HTML)
+   */
+  @Override
+  protected void preHead(Page.HTML<_> html) {
+    String logEntity = $(ENTITY_STRING);
+    if (logEntity == null || logEntity.isEmpty()) {
+      logEntity = $(CONTAINER_ID);
+    }
+    if (logEntity == null || logEntity.isEmpty()) {
+      logEntity = "UNKNOWN";
+    }
+    set(TITLE, join("Logs for ", logEntity));
+    set(ACCORDION_ID, "nav");
+    set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
+    set(THEMESWITCHER_ID, "themeswitcher");
+  }
+
+  @Override
+  protected Class<? extends SubView> content() {
+    return AggregatedLogsBlock.class;
+  }
+  
+  @Override
+  protected Class<? extends SubView> nav() {
+    return AggregatedLogsNavBlock.class;
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java Fri Oct 28 06:45:04 2011
@@ -91,7 +91,8 @@ public class AllContainersPage extends N
             ._()
             .td()._(container.getContainerState())._()
             .td()
-                .a(url("containerlogs", containerIdStr), "logs")._()
+                .a(url("containerlogs", containerIdStr, container.getUser()),
+                    "logs")._()
           ._();
       }
       tableBody._()._()._();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java Fri Oct 28 06:45:04 2011
@@ -18,9 +18,17 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.webapp;
 
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.THEMESWITCHER_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -30,32 +38,53 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.SubView;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
 
 import com.google.inject.Inject;
 
 public class ContainerLogsPage extends NMView {
+  
+  public static final String REDIRECT_URL = "redirect.url";
+  
+  @Override protected void preHead(Page.HTML<_> html) {
+    String redirectUrl = $(REDIRECT_URL);
+    if (redirectUrl == null || redirectUrl.isEmpty()) {
+      set(TITLE, join("Logs for ", $(CONTAINER_ID)));
+      html.meta_http("refresh", "10");
+    } else {
+      if (redirectUrl.equals("false")) {
+        set(TITLE, join("Failed redirect for ", $(CONTAINER_ID)));
+        //Error getting redirect url. Fall through.
+      } else {
+        set(TITLE, join("Redirecting to log server for ", $(CONTAINER_ID)));
+        html.meta_http("refresh", "1; url=" + redirectUrl);
+      }
+    }
+    
+    set(ACCORDION_ID, "nav");
+    set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
+    set(THEMESWITCHER_ID, "themeswitcher");
+  }
+
   @Override
   protected Class<? extends SubView> content() {
     return ContainersLogsBlock.class;
   }
 
   public static class ContainersLogsBlock extends HtmlBlock implements
-      NMWebParams {
-
+      NMWebParams {    
     private final Configuration conf;
     private final LocalDirAllocator logsSelector;
     private final Context nmContext;
@@ -72,13 +101,19 @@ public class ContainerLogsPage extends N
 
     @Override
     protected void render(Block html) {
-      DIV<Hamlet> div = html.div("#content");
 
+      String redirectUrl = $(REDIRECT_URL);
+      if (redirectUrl !=null && redirectUrl.equals("false")) {
+        html.h1("Failed while trying to construct the redirect url to the log" +
+        		" server. Log Server url may not be configured");
+        //Intentional fallthrough.
+      }
+      
       ContainerId containerId;
       try {
         containerId = ConverterUtils.toContainerId($(CONTAINER_ID));
       } catch (IllegalArgumentException e) {
-        div.h1("Invalid containerId " + $(CONTAINER_ID))._();
+        html.h1("Invalid containerId " + $(CONTAINER_ID));
         return;
       }
 
@@ -88,19 +123,28 @@ public class ContainerLogsPage extends N
           applicationId);
       Container container = this.nmContext.getContainers().get(containerId);
 
-      if (application == null || container == null) {
-        div.h1(
-            "Unknown container. Container is either not yet running or "
+      if (application == null) {
+        html.h1(
+            "Unknown container. Container either has not started or "
                 + "has already completed or "
-                + "doesn't belong to this node at all.")._();
+                + "doesn't belong to this node at all.");
+        return;
+      }
+      if (container == null) {
+        // Container may have alerady completed, but logs not aggregated yet.
+        printLogs(html, containerId, applicationId, application);
         return;
       }
 
       if (EnumSet.of(ContainerState.NEW, ContainerState.LOCALIZING,
-          ContainerState.LOCALIZING).contains(container.getContainerState())) {
-        div.h1("Container is not yet running. Current state is "
-                + container.getContainerState())
-              ._();
+          ContainerState.LOCALIZED).contains(container.getContainerState())) {
+        html.h1("Container is not yet running. Current state is "
+                + container.getContainerState());
+        return;
+      }
+
+      if (container.getContainerState() == ContainerState.LOCALIZATION_FAILED) {
+        html.h1("Container wasn't started. Localization failed.");
         return;
       }
 
@@ -108,103 +152,144 @@ public class ContainerLogsPage extends N
           ContainerState.EXITED_WITH_FAILURE,
           ContainerState.EXITED_WITH_SUCCESS).contains(
           container.getContainerState())) {
+        printLogs(html, containerId, applicationId, application);
+        return;
+      }
+      if (EnumSet.of(ContainerState.KILLING,
+          ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+          ContainerState.CONTAINER_RESOURCES_CLEANINGUP).contains(
+          container.getContainerState())) {
+        //Container may have generated some logs before being killed.
+        printLogs(html, containerId, applicationId, application);
+        return;
+      }
+      if (container.getContainerState().equals(ContainerState.DONE)) {
+        // Prev state unknown. Logs may be available.
+        printLogs(html, containerId, applicationId, application);
+        return;
+      } else {
+        html.h1("Container is no longer running...");
+        return;
+      }
+    }
 
-        // Check for the authorization.
-        String remoteUser = request().getRemoteUser();
-        UserGroupInformation callerUGI = null;
-        if (remoteUser != null) {
-          callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
-        }
-        if (callerUGI != null && !this.aclsManager.checkAccess(callerUGI,
-            ApplicationAccessType.VIEW_APP, application.getUser(),
-                applicationId)) {
-          div.h1(
-              "You (User " + remoteUser
-                  + ") are not authorized to view the logs for application "
-                  + applicationId)._();
+    private void printLogs(Block html, ContainerId containerId,
+        ApplicationId applicationId, Application application) {
+      // Check for the authorization.
+      String remoteUser = request().getRemoteUser();
+      UserGroupInformation callerUGI = null;
+
+      if (remoteUser != null) {
+        callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+      }
+      if (callerUGI != null
+          && !this.aclsManager.checkAccess(callerUGI,
+              ApplicationAccessType.VIEW_APP, application.getUser(),
+              applicationId)) {
+        html.h1(
+            "User [" + remoteUser
+                + "] is not authorized to view the logs for application "
+                + applicationId);
+        return;
+      }
+
+      if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
+        File logFile = null;
+        try {
+          logFile =
+              new File(this.logsSelector
+                  .getLocalPathToRead(
+                      ContainerLaunch.getRelativeContainerLogDir(
+                          applicationId.toString(), containerId.toString())
+                          + Path.SEPARATOR + $(CONTAINER_LOG_TYPE), this.conf)
+                  .toUri().getPath());
+        } catch (Exception e) {
+          html.h1("Cannot find this log on the local disk.");
           return;
         }
-
-        if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
-          File logFile = null;
-          try {
-            logFile =
-                new File(this.logsSelector
-                    .getLocalPathToRead(
-                        ConverterUtils.toString(
-                            applicationId)
-                            + Path.SEPARATOR + $(CONTAINER_ID)
-                            + Path.SEPARATOR
-                            + $(CONTAINER_LOG_TYPE), this.conf).toUri()
-                    .getPath());
-          } catch (Exception e) {
-            div.h1("Cannot find this log on the local disk.")._();
-          }
-          div.h1(logFile == null ? "Unknown LogFile" : logFile.getName());
-          long start =
-              $("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
-          start = start < 0 ? logFile.length() + start : start;
-          start = start < 0 ? 0 : start;
-          long end =
-              $("end").isEmpty() ? logFile.length() : Long
-                  .parseLong($("end"));
-          end = end < 0 ? logFile.length() + end : end;
-          end = end < 0 ? logFile.length() : end;
-          if (start > end) {
-            writer().write("Invalid start and end values!");
-          } else {
+        long start =
+            $("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
+        start = start < 0 ? logFile.length() + start : start;
+        start = start < 0 ? 0 : start;
+        long end =
+            $("end").isEmpty() ? logFile.length() : Long.parseLong($("end"));
+        end = end < 0 ? logFile.length() + end : end;
+        end = end < 0 ? logFile.length() : end;
+        if (start > end) {
+          html.h1("Invalid start and end values. Start: [" + start + "]"
+              + ", end[" + end + "]");
+          return;
+        } else {
+          InputStreamReader reader = null;
           try {
             long toRead = end - start;
             if (toRead < logFile.length()) {
-                div._("Showing " + toRead + " bytes. Click ")
-                    .a(url("containerlogs", $(CONTAINER_ID),
-                        logFile.getName()), "here")
-                    ._(" for full log").br()._();
+              html.p()._("Showing " + toRead + " bytes. Click ")
+                  .a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER), 
+                      logFile.getName(), "?start=0"), "here").
+                      _(" for full log")._();
             }
             // TODO: Use secure IO Utils to avoid symlink attacks.
-            //TODO Fix findBugs close warning along with IOUtils change
-            FileReader reader = new FileReader(logFile);
-            char[] cbuf = new char[65536];
-            reader.skip(start);
+            // TODO Fix findBugs close warning along with IOUtils change
+            reader = new FileReader(logFile);
+            int bufferSize = 65536;
+            char[] cbuf = new char[bufferSize];
+
+            long skipped = 0;
+            long totalSkipped = 0;
+            while (totalSkipped < start) {
+              skipped = reader.skip(start - totalSkipped);
+              totalSkipped += skipped;
+            }
+
             int len = 0;
-            int totalRead = 0;
+            int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
             writer().write("<pre>");
-            while ((len = reader.read(cbuf, 0, (int) toRead)) > 0
-                && totalRead < (end - start)) {
+
+            while ((len = reader.read(cbuf, 0, currentToRead)) > 0
+                && toRead > 0) {
               writer().write(cbuf, 0, len); // TODO: HTMl Quoting?
-              totalRead += len;
-              toRead = toRead - totalRead;
+              toRead = toRead - len;
+              currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
             }
+
             reader.close();
             writer().write("</pre>");
+
           } catch (IOException e) {
-              writer().write(
-                  "Exception reading log-file "
-                      + StringUtils.stringifyException(e));
-          } 
-        }
-          div._();
-        } else {
-          // Just print out the log-types
-          List<File> containerLogsDirs =
-              getContainerLogDirs(this.conf, containerId);
-          for (File containerLogsDir : containerLogsDirs) {
-            for (File logFile : containerLogsDir.listFiles()) {
-              div
-                  .p()
-                  .a(
-                      url("containerlogs", $(CONTAINER_ID),
-                          logFile.getName(), "?start=-4076"),
-                      logFile.getName() + " : Total file length is "
-                          + logFile.length() + " bytes.")
-                  ._();
+            html.h1("Exception reading log-file. Log file was likely aggregated. "
+                + StringUtils.stringifyException(e));
+          } finally {
+            if (reader != null) {
+              try {
+                reader.close();
+              } catch (IOException e) {
+                // Ignore
+              }
             }
           }
-          div._();
         }
       } else {
-        div.h1("Container is no longer running..")._();
+        // Just print out the log-types
+        List<File> containerLogsDirs =
+            getContainerLogDirs(this.conf, containerId);
+        boolean foundLogFile = false;
+        for (File containerLogsDir : containerLogsDirs) {
+          for (File logFile : containerLogsDir.listFiles()) {
+            foundLogFile = true;
+            html.p()
+                .a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER), 
+                    logFile.getName(), "?start=-4096"),
+                    logFile.getName() + " : Total file length is "
+                        + logFile.length() + " bytes.")._();
+          }
+        }
+        if (!foundLogFile) {
+          html.h1("No logs available for container " + containerId.toString());
+          return;
+        }
       }
+      return;
     }
 
     static List<File>
@@ -222,6 +307,5 @@ public class ContainerLogsPage extends N
       }
       return containerLogDirs;
     }
-    
   }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java Fri Oct 28 06:45:04 2011
@@ -89,7 +89,8 @@ public class ContainerPage extends NMVie
         ._("User", container.getUser())
         ._("TotalMemoryNeeded",
             container.getLaunchContext().getResource().getMemory())
-        ._("logs", ujoin("containerlogs", $(CONTAINER_ID)), "Link to logs");
+        ._("logs", ujoin("containerlogs", $(CONTAINER_ID), container.getUser()),
+            "Link to logs");
       html._(InfoBlock.class);
     }
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java?rev=1190174&r1=1190173&r2=1190174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java Fri Oct 28 06:45:04 2011
@@ -21,15 +21,27 @@ package org.apache.hadoop.yarn.server.no
 import static org.apache.hadoop.yarn.util.StringHelper.join;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.Controller;
 
 import com.google.inject.Inject;
 
 public class NMController extends Controller implements NMWebParams {
 
+  private Context nmContext;
+  private Configuration nmConf;
+  
   @Inject
-  public NMController(Configuration nmConf, RequestContext requestContext) {
+  public NMController(Configuration nmConf, RequestContext requestContext,
+      Context nmContext) {
     super(requestContext);
+    this.nmContext = nmContext;
+    this.nmConf = nmConf;
   }
 
   @Override
@@ -63,6 +75,29 @@ public class NMController extends Contro
   }
 
   public void logs() {
+    String containerIdStr = $(CONTAINER_ID);
+    ContainerId containerId = null;
+    try {
+      containerId = ConverterUtils.toContainerId(containerIdStr);
+    } catch (IllegalArgumentException e) {
+      render(ContainerLogsPage.class);
+      return;
+    }
+    ApplicationId appId =
+        containerId.getApplicationAttemptId().getApplicationId();
+    Application app = nmContext.getApplications().get(appId);
+    if (app == null) {
+      String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
+      String redirectUrl = null;
+      if (logServerUrl == null || logServerUrl.isEmpty()) {
+        redirectUrl = "false";
+      } else {
+        redirectUrl =
+            url(logServerUrl, nmContext.getNodeId().toString(), containerIdStr,
+                containerIdStr, $(APP_OWNER));
+      }
+      set(ContainerLogsPage.REDIRECT_URL, redirectUrl);
+    }
     render(ContainerLogsPage.class);
   }
 }