You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by at...@apache.org on 2012/08/24 22:38:21 UTC

svn commit: r1377092 - in /hadoop/common/branches/HDFS-3077/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/co...

Author: atm
Date: Fri Aug 24 20:38:08 2012
New Revision: 1377092

URL: http://svn.apache.org/viewvc?rev=1377092&view=rev
Log:
Merge trunk into HDFS-3077 branch.

Added:
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
      - copied unchanged from r1377085, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
      - copied unchanged from r1377085, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
      - copied unchanged from r1377085, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
Modified:
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/   (props changed)
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
    hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/pom.xml

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/CHANGES.txt?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/CHANGES.txt Fri Aug 24 20:38:08 2012
@@ -20,6 +20,9 @@ Branch-2 ( Unreleased changes )
 
   BUG FIXES
 
+    MAPREDUCE-2374. "Text File Busy" errors launching MR tasks. (Andy Isaacson
+    via atm)
+
 Release 2.1.0-alpha - Unreleased 
 
   INCOMPATIBLE CHANGES
@@ -34,6 +37,9 @@ Release 2.1.0-alpha - Unreleased 
 
     YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy) 
 
+    YARN-22. Fix ContainerLogs to work if the log-dir is specified as a URI.
+    (Mayank Bansal via sseth)
+
 Release 0.23.3 - Unreleased 
 
   INCOMPATIBLE CHANGES
@@ -47,3 +53,9 @@ Release 0.23.3 - Unreleased 
     YARN-14. Symlinks to peer distributed cache files no longer work 
     (Jason Lowe via bobby) 
 
+    YARN-25. remove old aggregated logs  (Robert Evans via tgraves)
+
+    YARN-27. Failed refreshQueues due to misconfiguration prevents further 
+    refreshing of queues (Arun Murthy via tgraves)
+
+    MAPREDUCE-4323. NM leaks filesystems (Jason Lowe via jeagles)

Propchange: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Aug 24 20:38:08 2012
@@ -1 +1,4 @@
 target
+.classpath
+.project
+.settings

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Fri Aug 24 20:38:08 2012
@@ -353,6 +353,14 @@ public class YarnConfiguration extends C
       + "log-aggregation-enable";
   public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
   
+  /** 
+   * How long to wait before deleting aggregated logs, -1 disables.
+   * Be careful set this too small and you will spam the name node.
+   */
+  public static final String LOG_AGGREGATION_RETAIN_SECONDS = YARN_PREFIX
+      + "log-aggregation.retain-seconds";
+  public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1;
+  
   /**
    * Number of seconds to retain logs on the NodeManager. Only applicable if Log
    * aggregation is disabled

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java Fri Aug 24 20:38:08 2012
@@ -410,7 +410,7 @@ public class ProcfsBasedProcessTree {
       in = new BufferedReader(fReader);
     } catch (FileNotFoundException f) {
       // The process vanished in the interim!
-      LOG.warn("The process " + pinfo.getPid()
+      LOG.info("The process " + pinfo.getPid()
           + " may have finished in the interim.");
       return ret;
     }

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java Fri Aug 24 20:38:08 2012
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.yarn.webapp.log;
 
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java Fri Aug 24 20:38:08 2012
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.yarn.webapp.log;
 
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Fri Aug 24 20:38:08 2012
@@ -367,6 +367,13 @@
     <name>yarn.log-aggregation-enable</name>
     <value>false</value>
   </property>
+
+  <property>
+    <description>How long to keep aggregation logs before deleting them.  -1 disables. 
+    Be careful set this too small and you will spam the name node.</description>
+    <name>yarn.log-aggregation.retain-seconds</name>
+    <value>-1</value>
+  </property> 
   
   <property>
     <description>Time in seconds to retain user logs. Only applicable if

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Fri Aug 24 20:38:08 2012
@@ -169,7 +169,7 @@ public class DefaultContainerExecutor ex
           ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
 
       // Setup command to run
-      String[] command = {"bash", "-c",
+      String[] command = {"bash",
           wrapperScriptDst.toUri().getPath().toString()};
       LOG.info("launchContainer: " + Arrays.toString(command));
       shExec = new ShellCommandExecutor(
@@ -211,7 +211,6 @@ public class DefaultContainerExecutor ex
     sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n");
     sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec");
     sb.append(" /bin/bash ");
-    sb.append("-c ");
     sb.append("\"");
     sb.append(launchScriptDst);
     sb.append("\"\n");

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Fri Aug 24 20:38:08 2012
@@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -176,10 +177,14 @@ public class ContainerLocalizer {
       e.printStackTrace(System.out);
       return -1;
     } finally {
-      if (exec != null) {
-        exec.shutdownNow();
+      try {
+        if (exec != null) {
+          exec.shutdownNow();
+        }
+        LocalDirAllocator.removeContext(appCacheDirContextName);
+      } finally {
+        closeFileSystems(ugi);
       }
-      LocalDirAllocator.removeContext(appCacheDirContextName);
     }
   }
 
@@ -215,7 +220,15 @@ public class ContainerLocalizer {
     TimeUnit.SECONDS.sleep(duration);
   }
 
-  private void localizeFiles(LocalizationProtocol nodemanager,
+  protected void closeFileSystems(UserGroupInformation ugi) {
+    try {
+      FileSystem.closeAllForUGI(ugi);
+    } catch (IOException e) {
+      LOG.warn("Failed to close filesystems: ", e);
+    }
+  }
+
+  protected void localizeFiles(LocalizationProtocol nodemanager,
       CompletionService<Path> cs, UserGroupInformation ugi)
       throws IOException {
     while (true) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Fri Aug 24 20:38:08 2012
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
 import org.apache.hadoop.yarn.service.AbstractService;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class LogAggregationService extends AbstractService implements
@@ -70,7 +69,7 @@ public class LogAggregationService exten
 
   /*
    * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
-   * Group to which NMOwner belongs> App dirs will be created as 750,
+   * Group to which NMOwner belongs> App dirs will be created as 770,
    * owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
    * access / modify the files.
    * <NMGroup> should obviously be a limited access group.
@@ -85,7 +84,7 @@ public class LogAggregationService exten
    * Permissions for the Application directory.
    */
   private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
-      .createImmutable((short) 0750);
+      .createImmutable((short) 0770);
 
   private final Context context;
   private final DeletionService deletionService;
@@ -203,7 +202,7 @@ public class LogAggregationService exten
     fs.setPermission(path, new FsPermission(fsPerm));
   }
 
-  private void createAppDir(final String user, final ApplicationId appId,
+  protected void createAppDir(final String user, final ApplicationId appId,
       UserGroupInformation userUgi) {
     try {
       userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -286,13 +285,12 @@ public class LogAggregationService exten
     this.dispatcher.getEventHandler().handle(eventResponse);
   }
 
-  @VisibleForTesting
-  public void initAppAggregator(final ApplicationId appId, String user,
+  protected void initAppAggregator(final ApplicationId appId, String user,
       Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
       Map<ApplicationAccessType, String> appAcls) {
 
     // Get user's FileSystem credentials
-    UserGroupInformation userUgi =
+    final UserGroupInformation userUgi =
         UserGroupInformation.createRemoteUser(user);
     if (credentials != null) {
       for (Token<? extends TokenIdentifier> token : credentials
@@ -301,9 +299,6 @@ public class LogAggregationService exten
       }
     }
 
-    // Create the app dir
-    createAppDir(user, appId, userUgi);
-
     // New application
     final AppLogAggregator appLogAggregator =
         new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
@@ -313,6 +308,14 @@ public class LogAggregationService exten
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
       throw new YarnException("Duplicate initApp for " + appId);
     }
+    // wait until check for existing aggregator to create dirs
+    try {
+      // Create the app dir
+      createAppDir(user, appId, userUgi);
+    } catch (YarnException e) {
+      closeFileSystems(userUgi);
+      throw e;
+    }
 
 
     // TODO Get the user configuration for the list of containers that need log
@@ -325,12 +328,21 @@ public class LogAggregationService exten
           appLogAggregator.run();
         } finally {
           appLogAggregators.remove(appId);
+          closeFileSystems(userUgi);
         }
       }
     };
     this.threadPool.execute(aggregatorWrapper);
   }
 
+  protected void closeFileSystems(final UserGroupInformation userUgi) {
+    try {
+      FileSystem.closeAllForUGI(userUgi);
+    } catch (IOException e) {
+      LOG.warn("Failed to close filesystems: ", e);
+    }
+  }
+
   // for testing only
   @Private
   int getNumAggregators() {

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java Fri Aug 24 20:38:08 2012
@@ -29,6 +29,8 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.util.Conve
 import org.apache.hadoop.yarn.webapp.YarnWebParams;
 import org.apache.hadoop.yarn.webapp.SubView;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+import org.mortbay.log.Log;
 
 import com.google.inject.Inject;
 
@@ -198,12 +201,14 @@ public class ContainerLogsPage extends N
       if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
         File logFile = null;
         try {
-          logFile =
-              new File(this.dirsHandler.getLogPathToRead(
-                  ContainerLaunch.getRelativeContainerLogDir(
+          URI logPathURI = new URI(this.dirsHandler.getLogPathToRead(
+              ContainerLaunch.getRelativeContainerLogDir(
                   applicationId.toString(), containerId.toString())
-                  + Path.SEPARATOR + $(CONTAINER_LOG_TYPE))
-                  .toUri().getPath());
+                  + Path.SEPARATOR + $(CONTAINER_LOG_TYPE)).toString());
+          logFile = new File(logPathURI.getPath());
+        } catch (URISyntaxException e) {
+          html.h1("Cannot find this log on the local disk.");
+          return;
         } catch (Exception e) {
           html.h1("Cannot find this log on the local disk.");
           return;
@@ -278,14 +283,16 @@ public class ContainerLogsPage extends N
         boolean foundLogFile = false;
         for (File containerLogsDir : containerLogsDirs) {
           File[] logFiles = containerLogsDir.listFiles();
-          Arrays.sort(logFiles);
-          for (File logFile : logFiles) {
-            foundLogFile = true;
-            html.p()
-                .a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER), 
-                    logFile.getName(), "?start=-4096"),
-                    logFile.getName() + " : Total file length is "
-                        + logFile.length() + " bytes.")._();
+          if (logFiles != null) {
+            Arrays.sort(logFiles);
+            for (File logFile : logFiles) {
+              foundLogFile = true;
+              html.p()
+                  .a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
+                      logFile.getName(), "?start=-4096"),
+                      logFile.getName() + " : Total file length is "
+                          + logFile.length() + " bytes.")._();
+            }
           }
         }
         if (!foundLogFile) {
@@ -297,13 +304,17 @@ public class ContainerLogsPage extends N
     }
 
     static List<File> getContainerLogDirs(ContainerId containerId,
-            LocalDirsHandlerService dirsHandler) {
+        LocalDirsHandlerService dirsHandler) {
       List<String> logDirs = dirsHandler.getLogDirs();
       List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
       for (String logDir : logDirs) {
-        String appIdStr = 
-            ConverterUtils.toString(
-                containerId.getApplicationAttemptId().getApplicationId());
+        try {
+          logDir = new URI(logDir).getPath();
+        } catch (URISyntaxException e) {
+          Log.warn(e.getMessage());
+        }
+        String appIdStr = ConverterUtils.toString(containerId
+            .getApplicationAttemptId().getApplicationId());
         File appLogDir = new File(logDir, appIdStr);
         String containerIdStr = ConverterUtils.toString(containerId);
         containerLogDirs.add(new File(appLogDir, containerIdStr));

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Fri Aug 24 20:38:08 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.argThat;
@@ -27,6 +28,7 @@ import static org.mockito.Matchers.isA;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -57,6 +59,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -76,47 +79,28 @@ public class TestContainerLocalizer {
   static final Path basedir =
       new Path("target", TestContainerLocalizer.class.getName());
 
+  static final String appUser = "yak";
+  static final String appId = "app_RM_0";
+  static final String containerId = "container_0";
+  static final InetSocketAddress nmAddr =
+      new InetSocketAddress("foobar", 8040);
+
+  private AbstractFileSystem spylfs;
+  private Random random;
+  private List<Path> localDirs;
+  private Path tokenPath;
+  private LocalizationProtocol nmProxy;
+
   @Test
-  @SuppressWarnings("unchecked") // mocked generics
   public void testContainerLocalizerMain() throws Exception {
-    Configuration conf = new Configuration();
-    AbstractFileSystem spylfs =
-      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
-    // don't actually create dirs
-    doNothing().when(spylfs).mkdir(
-        isA(Path.class), isA(FsPermission.class), anyBoolean());
-    FileContext lfs = FileContext.getFileContext(spylfs, conf);
-    final String user = "yak";
-    final String appId = "app_RM_0";
-    final String cId = "container_0";
-    final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 8040);
-    final List<Path> localDirs = new ArrayList<Path>();
-    for (int i = 0; i < 4; ++i) {
-      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
-    }
-    RecordFactory mockRF = getMockLocalizerRecordFactory();
-    ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, user,
-        appId, cId, localDirs, mockRF);
-    ContainerLocalizer localizer = spy(concreteLoc);
-
-    // return credential stream instead of opening local file
-    final Random r = new Random();
-    long seed = r.nextLong();
-    r.setSeed(seed);
-    System.out.println("SEED: " + seed);
-    DataInputBuffer appTokens = createFakeCredentials(r, 10);
-    Path tokenPath =
-      lfs.makeQualified(new Path(
-            String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, cId)));
-    doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
-        ).when(spylfs).open(tokenPath);
+    ContainerLocalizer localizer = setupContainerLocalizerForTest();
 
     // mock heartbeat responses from NM
-    LocalizationProtocol nmProxy = mock(LocalizationProtocol.class);
-    LocalResource rsrcA = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
-    LocalResource rsrcB = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
-    LocalResource rsrcC = getMockRsrc(r, LocalResourceVisibility.APPLICATION);
-    LocalResource rsrcD = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
+    LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
+    LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
+    LocalResource rsrcC = getMockRsrc(random,
+        LocalResourceVisibility.APPLICATION);
+    LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
     when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
             Collections.singletonList(rsrcA)))
@@ -130,6 +114,7 @@ public class TestContainerLocalizer {
             Collections.<LocalResource>emptyList()))
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
             null));
+
     doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
         localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
         isA(UserGroupInformation.class));
@@ -142,33 +127,13 @@ public class TestContainerLocalizer {
     doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
         localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
         isA(UserGroupInformation.class));
-    doReturn(nmProxy).when(localizer).getProxy(nmAddr);
-    doNothing().when(localizer).sleep(anyInt());
-
-    // return result instantly for deterministic test
-    ExecutorService syncExec = mock(ExecutorService.class);
-    CompletionService<Path> cs = mock(CompletionService.class);
-    when(cs.submit(isA(Callable.class)))
-      .thenAnswer(new Answer<Future<Path>>() {
-          @Override
-          public Future<Path> answer(InvocationOnMock invoc)
-              throws Throwable {
-            Future<Path> done = mock(Future.class);
-            when(done.isDone()).thenReturn(true);
-            FakeDownload d = (FakeDownload) invoc.getArguments()[0];
-            when(done.get()).thenReturn(d.call());
-            return done;
-          }
-        });
-    doReturn(syncExec).when(localizer).createDownloadThreadPool();
-    doReturn(cs).when(localizer).createCompletionService(syncExec);
 
     // run localization
     assertEquals(0, localizer.runLocalization(nmAddr));
 
     // verify created cache
     for (Path p : localDirs) {
-      Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), user);
+      Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
       Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
       // $x/usercache/$user/filecache
       verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(false));
@@ -194,11 +159,91 @@ public class TestContainerLocalizer {
           @Override
           public boolean matches(Object o) {
             LocalizerStatus status = (LocalizerStatus) o;
-            return !cId.equals(status.getLocalizerId());
+            return !containerId.equals(status.getLocalizerId());
           }
         }));
   }
 
+  @Test
+  @SuppressWarnings("unchecked") // mocked generics
+  public void testContainerLocalizerClosesFilesystems() throws Exception {
+    // verify filesystems are closed when localizer doesn't fail
+    ContainerLocalizer localizer = setupContainerLocalizerForTest();
+    doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
+        any(CompletionService.class), any(UserGroupInformation.class));
+    verify(localizer, never()).closeFileSystems(
+        any(UserGroupInformation.class));
+    localizer.runLocalization(nmAddr);
+    verify(localizer).closeFileSystems(any(UserGroupInformation.class));
+
+    // verify filesystems are closed when localizer fails
+    localizer = setupContainerLocalizerForTest();
+    doThrow(new YarnException("Forced Failure")).when(localizer).localizeFiles(
+        any(LocalizationProtocol.class), any(CompletionService.class),
+        any(UserGroupInformation.class));
+    verify(localizer, never()).closeFileSystems(
+        any(UserGroupInformation.class));
+    localizer.runLocalization(nmAddr);
+    verify(localizer).closeFileSystems(any(UserGroupInformation.class));
+  }
+
+  @SuppressWarnings("unchecked") // mocked generics
+  private ContainerLocalizer setupContainerLocalizerForTest()
+      throws Exception {
+    spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    // don't actually create dirs
+    doNothing().when(spylfs).mkdir(
+        isA(Path.class), isA(FsPermission.class), anyBoolean());
+
+    Configuration conf = new Configuration();
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    localDirs = new ArrayList<Path>();
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+    }
+    RecordFactory mockRF = getMockLocalizerRecordFactory();
+    ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, appUser,
+        appId, containerId, localDirs, mockRF);
+    ContainerLocalizer localizer = spy(concreteLoc);
+
+    // return credential stream instead of opening local file
+    random = new Random();
+    long seed = random.nextLong();
+    System.out.println("SEED: " + seed);
+    random.setSeed(seed);
+    DataInputBuffer appTokens = createFakeCredentials(random, 10);
+    tokenPath =
+      lfs.makeQualified(new Path(
+            String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
+                containerId)));
+    doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
+        ).when(spylfs).open(tokenPath);
+
+    nmProxy = mock(LocalizationProtocol.class);
+    doReturn(nmProxy).when(localizer).getProxy(nmAddr);
+    doNothing().when(localizer).sleep(anyInt());
+
+    // return result instantly for deterministic test
+    ExecutorService syncExec = mock(ExecutorService.class);
+    CompletionService<Path> cs = mock(CompletionService.class);
+    when(cs.submit(isA(Callable.class)))
+      .thenAnswer(new Answer<Future<Path>>() {
+          @Override
+          public Future<Path> answer(InvocationOnMock invoc)
+              throws Throwable {
+            Future<Path> done = mock(Future.class);
+            when(done.isDone()).thenReturn(true);
+            FakeDownload d = (FakeDownload) invoc.getArguments()[0];
+            when(done.get()).thenReturn(d.call());
+            return done;
+          }
+        });
+    doReturn(syncExec).when(localizer).createDownloadThreadPool();
+    doReturn(cs).when(localizer).createCompletionService(syncExec);
+
+    return localizer;
+  }
+
   static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
     final LocalResource rsrc;
     HBMatches(LocalResource rsrc) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Fri Aug 24 20:38:08 2012
@@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -430,7 +431,7 @@ public class TestResourceLocalizationSer
         new FSDataOutputStream(new DataOutputBuffer(), null);
       doReturn(out).when(spylfs).createInternal(isA(Path.class),
           isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
-          anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
+          anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
       final LocalResource resource = getPrivateMockedResource(r);
       final LocalResourceRequest req = new LocalResourceRequest(resource);
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Fri Aug 24 20:38:08 2012
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.UnsupportedF
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -126,9 +127,9 @@ public class TestLogAggregationService e
     EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
     dispatcher.register(ApplicationEventType.class, appEventHandler);
     
-    LogAggregationService logAggregationService =
+    LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
-                                  super.dirsHandler);
+                                  super.dirsHandler));
     logAggregationService.init(this.conf);
     logAggregationService.start();
 
@@ -156,7 +157,9 @@ public class TestLogAggregationService e
         application1));
 
     logAggregationService.stop();
-
+    // ensure filesystems were closed
+    verify(logAggregationService).closeFileSystems(
+        any(UserGroupInformation.class));
     
     String containerIdStr = ConverterUtils.toString(container11);
     File containerLogDir = new File(app1LogDir, containerIdStr);
@@ -380,7 +383,60 @@ public class TestLogAggregationService e
   
   @Test
   @SuppressWarnings("unchecked")
-  public void testLogAggregationFailsWithoutKillingNM() throws Exception {
+  public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
+
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS,
+        localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+
+    DrainDispatcher dispatcher = createDispatcher();
+    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+    LogAggregationService logAggregationService = spy(
+        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+                                  super.dirsHandler));
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ApplicationId appId = BuilderUtils.newApplicationId(
+        System.currentTimeMillis(), (int)Math.random());
+    doThrow(new YarnException("KABOOM!"))
+      .when(logAggregationService).initAppAggregator(
+          eq(appId), eq(user), any(Credentials.class),
+          any(ContainerLogsRetentionPolicy.class), anyMap());
+
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+        this.user, null,
+        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
+        this.acls));
+
+    dispatcher.await();
+    ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
+        new ApplicationFinishEvent(appId,
+            "Application failed to init aggregation: KABOOM!")
+    };
+    checkEvents(appEventHandler, expectedEvents, false,
+        "getType", "getApplicationID", "getDiagnostic");
+    // no filesystems instantiated yet
+    verify(logAggregationService, never()).closeFileSystems(
+        any(UserGroupInformation.class));
+
+    // verify trying to collect logs for containers/apps we don't know about
+    // doesn't blow up and tear down the NM
+    logAggregationService.handle(new LogHandlerContainerFinishedEvent(
+        BuilderUtils.newContainerId(4, 1, 1, 1), 0));
+    dispatcher.await();
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(
+        BuilderUtils.newApplicationId(1, 5)));
+    dispatcher.await();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testLogAggregationCreateDirsFailsWithoutKillingNM()
+      throws Exception {
     
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
@@ -399,10 +455,8 @@ public class TestLogAggregationService e
     ApplicationId appId = BuilderUtils.newApplicationId(
         System.currentTimeMillis(), (int)Math.random());
     doThrow(new YarnException("KABOOM!"))
-      .when(logAggregationService).initAppAggregator(
-          eq(appId), eq(user), any(Credentials.class),
-          any(ContainerLogsRetentionPolicy.class), anyMap());
-    
+      .when(logAggregationService).createAppDir(any(String.class),
+          any(ApplicationId.class), any(UserGroupInformation.class));
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
         this.user, null,
         ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));        
@@ -413,6 +467,9 @@ public class TestLogAggregationService e
     };
     checkEvents(appEventHandler, expectedEvents, false,
         "getType", "getApplicationID", "getDiagnostic");
+    // filesystems may have been instantiated
+    verify(logAggregationService).closeFileSystems(
+        any(UserGroupInformation.class));
 
     // verify trying to collect logs for containers/apps we don't know about
     // doesn't blow up and tear down the NM
@@ -423,7 +480,7 @@ public class TestLogAggregationService e
         BuilderUtils.newApplicationId(1, 5)));
     dispatcher.await();
   }
-  
+
   private void writeContainerLogs(File appLogDir, ContainerId containerId)
       throws IOException {
     // ContainerLogDir should be created

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Fri Aug 24 20:38:08 2012
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsInfo;
@@ -120,14 +121,41 @@ public class QueueMetrics implements Met
                     enableUserMetrics, conf);
   }
 
-  public static QueueMetrics forQueue(MetricsSystem ms, String queueName,
+  /**
+   * Helper method to clear cache - used only for unit tests.
+   */
+  @Private
+  public synchronized static void clearQueueMetrics() {
+    queueMetrics.clear();
+  }
+  
+  /**
+   * Simple metrics cache to help prevent re-registrations.
+   */
+  private static Map<String, QueueMetrics> queueMetrics =
+      new HashMap<String, QueueMetrics>();
+  
+  public synchronized 
+  static QueueMetrics forQueue(MetricsSystem ms, String queueName,
                                       Queue parent, boolean enableUserMetrics,
 				      Configuration conf) {
-    QueueMetrics metrics = 
-      new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf
-		       ).tag(QUEUE_INFO, queueName);
-    return ms == null ? metrics : ms.register(sourceName(queueName).toString(),
-        "Metrics for queue: " + queueName, metrics);
+    QueueMetrics metrics = queueMetrics.get(queueName);
+    if (metrics == null) {
+      metrics =
+          new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf).
+          tag(QUEUE_INFO, queueName);
+      
+      // Register with the MetricsSystems
+      if (ms != null) {
+        metrics = 
+            ms.register(
+                sourceName(queueName).toString(), 
+                "Metrics for queue: " + queueName, metrics);
+      }
+      queueMetrics.put(queueName, metrics);
+    }
+
+    return metrics;
   }
 
   public synchronized QueueMetrics getUserMetrics(String userName) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Fri Aug 24 20:38:08 2012
@@ -38,14 +38,22 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestQueueMetrics {
   static final int GB = 1024; // MB
   private static final Configuration conf = new Configuration();
 
-  final MetricsSystem ms = new MetricsSystemImpl();
+  private MetricsSystem ms;
 
+  @Before
+  public void setUp() {
+    ms = new MetricsSystemImpl();
+    QueueMetrics.clearQueueMetrics();
+  }
+  
   @Test public void testDefaultSingleQueueMetrics() {
     String queueName = "single";
     String user = "alice";
@@ -226,6 +234,37 @@ public class TestQueueMetrics {
     checkApps(userSource, 1, 0, 0, 1, 0, 0);
     checkApps(parentUserSource, 1, 0, 0, 1, 0, 0);
   }
+  
+  @Test 
+  public void testMetricsCache() {
+    MetricsSystem ms = new MetricsSystemImpl("cache");
+    ms.start();
+    
+    try {
+      String p1 = "root1";
+      String leafQueueName = "root1.leaf";
+
+      QueueMetrics p1Metrics =
+          QueueMetrics.forQueue(ms, p1, null, true, conf);
+      Queue parentQueue1 = make(stub(Queue.class).returning(p1Metrics).
+          from.getMetrics());
+      QueueMetrics metrics =
+          QueueMetrics.forQueue(ms, leafQueueName, parentQueue1, true, conf);
+
+      Assert.assertNotNull("QueueMetrics for A shoudn't be null", metrics);
+
+      // Re-register to check for cache hit, shouldn't blow up metrics-system...
+      // also, verify parent-metrics
+      QueueMetrics alterMetrics =
+          QueueMetrics.forQueue(ms, leafQueueName, parentQueue1, true, conf);
+
+      Assert.assertNotNull("QueueMetrics for alterMetrics shoudn't be null", 
+          alterMetrics);
+    } finally {
+      ms.shutdown();
+    }
+  }
+
 
   public static void checkApps(MetricsSource source, int submitted, int pending,
       int running, int completed, int failed, int killed) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Fri Aug 24 20:38:08 2012
@@ -98,7 +98,8 @@ public class TestLeafQueue {
     csConf = 
         new CapacitySchedulerConfiguration();
     csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
-    setupQueueConfiguration(csConf);
+    final String newRoot = "root" + System.currentTimeMillis();
+    setupQueueConfiguration(csConf, newRoot);
     YarnConfiguration conf = new YarnConfiguration();
     cs.setConf(conf);
     
@@ -112,7 +113,8 @@ public class TestLeafQueue {
     when(csContext.getClusterResources()).
         thenReturn(Resources.createResource(100 * 16 * GB));
     root = 
-        CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
+        CapacityScheduler.parseQueue(csContext, csConf, null, 
+            CapacitySchedulerConfiguration.ROOT, 
             queues, queues, 
             CapacityScheduler.queueComparator, 
             CapacityScheduler.applicationComparator, 
@@ -126,25 +128,33 @@ public class TestLeafQueue {
   private static final String C = "c";
   private static final String C1 = "c1";
   private static final String D = "d";
-  private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+  private void setupQueueConfiguration(
+      CapacitySchedulerConfiguration conf, 
+      final String newRoot) {
     
     // Define top-level queues
-    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D});
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {newRoot});
     conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
     conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100);
     conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
     
-    final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
+    final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
+    conf.setQueues(Q_newRoot, new String[] {A, B, C, D});
+    conf.setCapacity(Q_newRoot, 100);
+    conf.setMaximumCapacity(Q_newRoot, 100);
+    conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
+
+    final String Q_A = Q_newRoot + "." + A;
     conf.setCapacity(Q_A, 8.5f);
     conf.setMaximumCapacity(Q_A, 20);
     conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
     
-    final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
+    final String Q_B = Q_newRoot + "." + B;
     conf.setCapacity(Q_B, 80);
     conf.setMaximumCapacity(Q_B, 99);
     conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*");
 
-    final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
+    final String Q_C = Q_newRoot + "." + C;
     conf.setCapacity(Q_C, 1.5f);
     conf.setMaximumCapacity(Q_C, 10);
     conf.setAcl(Q_C, QueueACL.SUBMIT_APPLICATIONS, " ");
@@ -154,7 +164,7 @@ public class TestLeafQueue {
     final String Q_C1 = Q_C + "." + C1;
     conf.setCapacity(Q_C1, 100);
 
-    final String Q_D = CapacitySchedulerConfiguration.ROOT + "." + D;
+    final String Q_D = Q_newRoot + "." + D;
     conf.setCapacity(Q_D, 10);
     conf.setMaximumCapacity(Q_D, 11);
     conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm Fri Aug 24 20:38:08 2012
@@ -314,6 +314,19 @@ Hadoop MapReduce Next Generation - Clust
 | | | Shuffle service that needs to be set for Map Reduce applications. |
 *-------------------------+-------------------------+------------------------+
 
+        * Configurations for History Server (Needs to be moved elsewhere):
+
+*-------------------------+-------------------------+------------------------+
+|| Parameter              || Value                  || Notes                 |
+*-------------------------+-------------------------+------------------------+
+| <<<yarn.log-aggregation.retain-seconds>>> | | |
+| | <-1> | |
+| | | How long to keep aggregation logs before deleting them. -1 disables. |
+| | | Be careful, set this too small and you will spam the name node. |
+*-------------------------+-------------------------+------------------------+
+
+
+
       * <<<conf/mapred-site.xml>>>
 
         * Configurations for MapReduce Applications:

Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/pom.xml?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/pom.xml (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/pom.xml Fri Aug 24 20:38:08 2012
@@ -125,6 +125,10 @@
       <artifactId>jersey-server</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.sun.jersey.contribs</groupId>
       <artifactId>jersey-guice</artifactId>
     </dependency>