You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by at...@apache.org on 2012/08/24 22:38:21 UTC
svn commit: r1377092 - in
/hadoop/common/branches/HDFS-3077/hadoop-yarn-project: ./ hadoop-yarn/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/
hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/co...
Author: atm
Date: Fri Aug 24 20:38:08 2012
New Revision: 1377092
URL: http://svn.apache.org/viewvc?rev=1377092&view=rev
Log:
Merge trunk into HDFS-3077 branch.
Added:
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
- copied unchanged from r1377085, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
- copied unchanged from r1377085, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
- copied unchanged from r1377085, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
Modified:
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/ (props changed)
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/pom.xml
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/CHANGES.txt?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/CHANGES.txt Fri Aug 24 20:38:08 2012
@@ -20,6 +20,9 @@ Branch-2 ( Unreleased changes )
BUG FIXES
+ MAPREDUCE-2374. "Text File Busy" errors launching MR tasks. (Andy Isaacson
+ via atm)
+
Release 2.1.0-alpha - Unreleased
INCOMPATIBLE CHANGES
@@ -34,6 +37,9 @@ Release 2.1.0-alpha - Unreleased
YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy)
+ YARN-22. Fix ContainerLogs to work if the log-dir is specified as a URI.
+ (Mayank Bansal via sseth)
+
Release 0.23.3 - Unreleased
INCOMPATIBLE CHANGES
@@ -47,3 +53,9 @@ Release 0.23.3 - Unreleased
YARN-14. Symlinks to peer distributed cache files no longer work
(Jason Lowe via bobby)
+ YARN-25. remove old aggregated logs (Robert Evans via tgraves)
+
+ YARN-27. Failed refreshQueues due to misconfiguration prevents further
+ refreshing of queues (Arun Murthy via tgraves)
+
+ MAPREDUCE-4323. NM leaks filesystems (Jason Lowe via jeagles)
Propchange: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Aug 24 20:38:08 2012
@@ -1 +1,4 @@
target
+.classpath
+.project
+.settings
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Fri Aug 24 20:38:08 2012
@@ -353,6 +353,14 @@ public class YarnConfiguration extends C
+ "log-aggregation-enable";
public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
+ /**
+ * How long to wait before deleting aggregated logs, -1 disables.
+ * Be careful set this too small and you will spam the name node.
+ */
+ public static final String LOG_AGGREGATION_RETAIN_SECONDS = YARN_PREFIX
+ + "log-aggregation.retain-seconds";
+ public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1;
+
/**
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
* aggregation is disabled
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java Fri Aug 24 20:38:08 2012
@@ -410,7 +410,7 @@ public class ProcfsBasedProcessTree {
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// The process vanished in the interim!
- LOG.warn("The process " + pinfo.getPid()
+ LOG.info("The process " + pinfo.getPid()
+ " may have finished in the interim.");
return ret;
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java Fri Aug 24 20:38:08 2012
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.hadoop.yarn.webapp.log;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java Fri Aug 24 20:38:08 2012
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.hadoop.yarn.webapp.log;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Fri Aug 24 20:38:08 2012
@@ -367,6 +367,13 @@
<name>yarn.log-aggregation-enable</name>
<value>false</value>
</property>
+
+ <property>
+ <description>How long to keep aggregation logs before deleting them. -1 disables.
+ Be careful set this too small and you will spam the name node.</description>
+ <name>yarn.log-aggregation.retain-seconds</name>
+ <value>-1</value>
+ </property>
<property>
<description>Time in seconds to retain user logs. Only applicable if
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Fri Aug 24 20:38:08 2012
@@ -169,7 +169,7 @@ public class DefaultContainerExecutor ex
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
// Setup command to run
- String[] command = {"bash", "-c",
+ String[] command = {"bash",
wrapperScriptDst.toUri().getPath().toString()};
LOG.info("launchContainer: " + Arrays.toString(command));
shExec = new ShellCommandExecutor(
@@ -211,7 +211,6 @@ public class DefaultContainerExecutor ex
sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n");
sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec");
sb.append(" /bin/bash ");
- sb.append("-c ");
sb.append("\"");
sb.append(launchScriptDst);
sb.append("\"\n");
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Fri Aug 24 20:38:08 2012
@@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -176,10 +177,14 @@ public class ContainerLocalizer {
e.printStackTrace(System.out);
return -1;
} finally {
- if (exec != null) {
- exec.shutdownNow();
+ try {
+ if (exec != null) {
+ exec.shutdownNow();
+ }
+ LocalDirAllocator.removeContext(appCacheDirContextName);
+ } finally {
+ closeFileSystems(ugi);
}
- LocalDirAllocator.removeContext(appCacheDirContextName);
}
}
@@ -215,7 +220,15 @@ public class ContainerLocalizer {
TimeUnit.SECONDS.sleep(duration);
}
- private void localizeFiles(LocalizationProtocol nodemanager,
+ protected void closeFileSystems(UserGroupInformation ugi) {
+ try {
+ FileSystem.closeAllForUGI(ugi);
+ } catch (IOException e) {
+ LOG.warn("Failed to close filesystems: ", e);
+ }
+ }
+
+ protected void localizeFiles(LocalizationProtocol nodemanager,
CompletionService<Path> cs, UserGroupInformation ugi)
throws IOException {
while (true) {
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Fri Aug 24 20:38:08 2012
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.service.AbstractService;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LogAggregationService extends AbstractService implements
@@ -70,7 +69,7 @@ public class LogAggregationService exten
/*
* Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
- * Group to which NMOwner belongs> App dirs will be created as 750,
+ * Group to which NMOwner belongs> App dirs will be created as 770,
* owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
* access / modify the files.
* <NMGroup> should obviously be a limited access group.
@@ -85,7 +84,7 @@ public class LogAggregationService exten
* Permissions for the Application directory.
*/
private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
- .createImmutable((short) 0750);
+ .createImmutable((short) 0770);
private final Context context;
private final DeletionService deletionService;
@@ -203,7 +202,7 @@ public class LogAggregationService exten
fs.setPermission(path, new FsPermission(fsPerm));
}
- private void createAppDir(final String user, final ApplicationId appId,
+ protected void createAppDir(final String user, final ApplicationId appId,
UserGroupInformation userUgi) {
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -286,13 +285,12 @@ public class LogAggregationService exten
this.dispatcher.getEventHandler().handle(eventResponse);
}
- @VisibleForTesting
- public void initAppAggregator(final ApplicationId appId, String user,
+ protected void initAppAggregator(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
// Get user's FileSystem credentials
- UserGroupInformation userUgi =
+ final UserGroupInformation userUgi =
UserGroupInformation.createRemoteUser(user);
if (credentials != null) {
for (Token<? extends TokenIdentifier> token : credentials
@@ -301,9 +299,6 @@ public class LogAggregationService exten
}
}
- // Create the app dir
- createAppDir(user, appId, userUgi);
-
// New application
final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
@@ -313,6 +308,14 @@ public class LogAggregationService exten
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnException("Duplicate initApp for " + appId);
}
+ // wait until check for existing aggregator to create dirs
+ try {
+ // Create the app dir
+ createAppDir(user, appId, userUgi);
+ } catch (YarnException e) {
+ closeFileSystems(userUgi);
+ throw e;
+ }
// TODO Get the user configuration for the list of containers that need log
@@ -325,12 +328,21 @@ public class LogAggregationService exten
appLogAggregator.run();
} finally {
appLogAggregators.remove(appId);
+ closeFileSystems(userUgi);
}
}
};
this.threadPool.execute(aggregatorWrapper);
}
+ protected void closeFileSystems(final UserGroupInformation userUgi) {
+ try {
+ FileSystem.closeAllForUGI(userUgi);
+ } catch (IOException e) {
+ LOG.warn("Failed to close filesystems: ", e);
+ }
+ }
+
// for testing only
@Private
int getNumAggregators() {
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java Fri Aug 24 20:38:08 2012
@@ -29,6 +29,8 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.util.Conve
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+import org.mortbay.log.Log;
import com.google.inject.Inject;
@@ -198,12 +201,14 @@ public class ContainerLogsPage extends N
if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
File logFile = null;
try {
- logFile =
- new File(this.dirsHandler.getLogPathToRead(
- ContainerLaunch.getRelativeContainerLogDir(
+ URI logPathURI = new URI(this.dirsHandler.getLogPathToRead(
+ ContainerLaunch.getRelativeContainerLogDir(
applicationId.toString(), containerId.toString())
- + Path.SEPARATOR + $(CONTAINER_LOG_TYPE))
- .toUri().getPath());
+ + Path.SEPARATOR + $(CONTAINER_LOG_TYPE)).toString());
+ logFile = new File(logPathURI.getPath());
+ } catch (URISyntaxException e) {
+ html.h1("Cannot find this log on the local disk.");
+ return;
} catch (Exception e) {
html.h1("Cannot find this log on the local disk.");
return;
@@ -278,14 +283,16 @@ public class ContainerLogsPage extends N
boolean foundLogFile = false;
for (File containerLogsDir : containerLogsDirs) {
File[] logFiles = containerLogsDir.listFiles();
- Arrays.sort(logFiles);
- for (File logFile : logFiles) {
- foundLogFile = true;
- html.p()
- .a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
- logFile.getName(), "?start=-4096"),
- logFile.getName() + " : Total file length is "
- + logFile.length() + " bytes.")._();
+ if (logFiles != null) {
+ Arrays.sort(logFiles);
+ for (File logFile : logFiles) {
+ foundLogFile = true;
+ html.p()
+ .a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
+ logFile.getName(), "?start=-4096"),
+ logFile.getName() + " : Total file length is "
+ + logFile.length() + " bytes.")._();
+ }
}
}
if (!foundLogFile) {
@@ -297,13 +304,17 @@ public class ContainerLogsPage extends N
}
static List<File> getContainerLogDirs(ContainerId containerId,
- LocalDirsHandlerService dirsHandler) {
+ LocalDirsHandlerService dirsHandler) {
List<String> logDirs = dirsHandler.getLogDirs();
List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
for (String logDir : logDirs) {
- String appIdStr =
- ConverterUtils.toString(
- containerId.getApplicationAttemptId().getApplicationId());
+ try {
+ logDir = new URI(logDir).getPath();
+ } catch (URISyntaxException e) {
+ Log.warn(e.getMessage());
+ }
+ String appIdStr = ConverterUtils.toString(containerId
+ .getApplicationAttemptId().getApplicationId());
File appLogDir = new File(logDir, appIdStr);
String containerIdStr = ConverterUtils.toString(containerId);
containerLogDirs.add(new File(appLogDir, containerIdStr));
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Fri Aug 24 20:38:08 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.no
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat;
@@ -27,6 +28,7 @@ import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -57,6 +59,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -76,47 +79,28 @@ public class TestContainerLocalizer {
static final Path basedir =
new Path("target", TestContainerLocalizer.class.getName());
+ static final String appUser = "yak";
+ static final String appId = "app_RM_0";
+ static final String containerId = "container_0";
+ static final InetSocketAddress nmAddr =
+ new InetSocketAddress("foobar", 8040);
+
+ private AbstractFileSystem spylfs;
+ private Random random;
+ private List<Path> localDirs;
+ private Path tokenPath;
+ private LocalizationProtocol nmProxy;
+
@Test
- @SuppressWarnings("unchecked") // mocked generics
public void testContainerLocalizerMain() throws Exception {
- Configuration conf = new Configuration();
- AbstractFileSystem spylfs =
- spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
- // don't actually create dirs
- doNothing().when(spylfs).mkdir(
- isA(Path.class), isA(FsPermission.class), anyBoolean());
- FileContext lfs = FileContext.getFileContext(spylfs, conf);
- final String user = "yak";
- final String appId = "app_RM_0";
- final String cId = "container_0";
- final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 8040);
- final List<Path> localDirs = new ArrayList<Path>();
- for (int i = 0; i < 4; ++i) {
- localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
- }
- RecordFactory mockRF = getMockLocalizerRecordFactory();
- ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, user,
- appId, cId, localDirs, mockRF);
- ContainerLocalizer localizer = spy(concreteLoc);
-
- // return credential stream instead of opening local file
- final Random r = new Random();
- long seed = r.nextLong();
- r.setSeed(seed);
- System.out.println("SEED: " + seed);
- DataInputBuffer appTokens = createFakeCredentials(r, 10);
- Path tokenPath =
- lfs.makeQualified(new Path(
- String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, cId)));
- doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
- ).when(spylfs).open(tokenPath);
+ ContainerLocalizer localizer = setupContainerLocalizerForTest();
// mock heartbeat responses from NM
- LocalizationProtocol nmProxy = mock(LocalizationProtocol.class);
- LocalResource rsrcA = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
- LocalResource rsrcB = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
- LocalResource rsrcC = getMockRsrc(r, LocalResourceVisibility.APPLICATION);
- LocalResource rsrcD = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
+ LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
+ LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
+ LocalResource rsrcC = getMockRsrc(random,
+ LocalResourceVisibility.APPLICATION);
+ LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcA)))
@@ -130,6 +114,7 @@ public class TestContainerLocalizer {
Collections.<LocalResource>emptyList()))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
null));
+
doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
isA(UserGroupInformation.class));
@@ -142,33 +127,13 @@ public class TestContainerLocalizer {
doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
isA(UserGroupInformation.class));
- doReturn(nmProxy).when(localizer).getProxy(nmAddr);
- doNothing().when(localizer).sleep(anyInt());
-
- // return result instantly for deterministic test
- ExecutorService syncExec = mock(ExecutorService.class);
- CompletionService<Path> cs = mock(CompletionService.class);
- when(cs.submit(isA(Callable.class)))
- .thenAnswer(new Answer<Future<Path>>() {
- @Override
- public Future<Path> answer(InvocationOnMock invoc)
- throws Throwable {
- Future<Path> done = mock(Future.class);
- when(done.isDone()).thenReturn(true);
- FakeDownload d = (FakeDownload) invoc.getArguments()[0];
- when(done.get()).thenReturn(d.call());
- return done;
- }
- });
- doReturn(syncExec).when(localizer).createDownloadThreadPool();
- doReturn(cs).when(localizer).createCompletionService(syncExec);
// run localization
assertEquals(0, localizer.runLocalization(nmAddr));
// verify created cache
for (Path p : localDirs) {
- Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), user);
+ Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
// $x/usercache/$user/filecache
verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(false));
@@ -194,11 +159,91 @@ public class TestContainerLocalizer {
@Override
public boolean matches(Object o) {
LocalizerStatus status = (LocalizerStatus) o;
- return !cId.equals(status.getLocalizerId());
+ return !containerId.equals(status.getLocalizerId());
}
}));
}
+ @Test
+ @SuppressWarnings("unchecked") // mocked generics
+ public void testContainerLocalizerClosesFilesystems() throws Exception {
+ // verify filesystems are closed when localizer doesn't fail
+ ContainerLocalizer localizer = setupContainerLocalizerForTest();
+ doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
+ any(CompletionService.class), any(UserGroupInformation.class));
+ verify(localizer, never()).closeFileSystems(
+ any(UserGroupInformation.class));
+ localizer.runLocalization(nmAddr);
+ verify(localizer).closeFileSystems(any(UserGroupInformation.class));
+
+ // verify filesystems are closed when localizer fails
+ localizer = setupContainerLocalizerForTest();
+ doThrow(new YarnException("Forced Failure")).when(localizer).localizeFiles(
+ any(LocalizationProtocol.class), any(CompletionService.class),
+ any(UserGroupInformation.class));
+ verify(localizer, never()).closeFileSystems(
+ any(UserGroupInformation.class));
+ localizer.runLocalization(nmAddr);
+ verify(localizer).closeFileSystems(any(UserGroupInformation.class));
+ }
+
+ @SuppressWarnings("unchecked") // mocked generics
+ private ContainerLocalizer setupContainerLocalizerForTest()
+ throws Exception {
+ spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ // don't actually create dirs
+ doNothing().when(spylfs).mkdir(
+ isA(Path.class), isA(FsPermission.class), anyBoolean());
+
+ Configuration conf = new Configuration();
+ FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ localDirs = new ArrayList<Path>();
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ }
+ RecordFactory mockRF = getMockLocalizerRecordFactory();
+ ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, appUser,
+ appId, containerId, localDirs, mockRF);
+ ContainerLocalizer localizer = spy(concreteLoc);
+
+ // return credential stream instead of opening local file
+ random = new Random();
+ long seed = random.nextLong();
+ System.out.println("SEED: " + seed);
+ random.setSeed(seed);
+ DataInputBuffer appTokens = createFakeCredentials(random, 10);
+ tokenPath =
+ lfs.makeQualified(new Path(
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
+ containerId)));
+ doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
+ ).when(spylfs).open(tokenPath);
+
+ nmProxy = mock(LocalizationProtocol.class);
+ doReturn(nmProxy).when(localizer).getProxy(nmAddr);
+ doNothing().when(localizer).sleep(anyInt());
+
+ // return result instantly for deterministic test
+ ExecutorService syncExec = mock(ExecutorService.class);
+ CompletionService<Path> cs = mock(CompletionService.class);
+ when(cs.submit(isA(Callable.class)))
+ .thenAnswer(new Answer<Future<Path>>() {
+ @Override
+ public Future<Path> answer(InvocationOnMock invoc)
+ throws Throwable {
+ Future<Path> done = mock(Future.class);
+ when(done.isDone()).thenReturn(true);
+ FakeDownload d = (FakeDownload) invoc.getArguments()[0];
+ when(done.get()).thenReturn(d.call());
+ return done;
+ }
+ });
+ doReturn(syncExec).when(localizer).createDownloadThreadPool();
+ doReturn(cs).when(localizer).createCompletionService(syncExec);
+
+ return localizer;
+ }
+
static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
final LocalResource rsrc;
HBMatches(LocalResource rsrc) {
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Fri Aug 24 20:38:08 2012
@@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -430,7 +431,7 @@ public class TestResourceLocalizationSer
new FSDataOutputStream(new DataOutputBuffer(), null);
doReturn(out).when(spylfs).createInternal(isA(Path.class),
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
- anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
+ anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
final LocalResource resource = getPrivateMockedResource(r);
final LocalResourceRequest req = new LocalResourceRequest(resource);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Fri Aug 24 20:38:08 2012
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.UnsupportedF
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -126,9 +127,9 @@ public class TestLogAggregationService e
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
- LogAggregationService logAggregationService =
+ LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
- super.dirsHandler);
+ super.dirsHandler));
logAggregationService.init(this.conf);
logAggregationService.start();
@@ -156,7 +157,9 @@ public class TestLogAggregationService e
application1));
logAggregationService.stop();
-
+ // ensure filesystems were closed
+ verify(logAggregationService).closeFileSystems(
+ any(UserGroupInformation.class));
String containerIdStr = ConverterUtils.toString(container11);
File containerLogDir = new File(app1LogDir, containerIdStr);
@@ -380,7 +383,60 @@ public class TestLogAggregationService e
@Test
@SuppressWarnings("unchecked")
- public void testLogAggregationFailsWithoutKillingNM() throws Exception {
+ public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
+
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS,
+ localLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+
+ DrainDispatcher dispatcher = createDispatcher();
+ EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+ LogAggregationService logAggregationService = spy(
+ new LogAggregationService(dispatcher, this.context, this.delSrvc,
+ super.dirsHandler));
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ApplicationId appId = BuilderUtils.newApplicationId(
+ System.currentTimeMillis(), (int)Math.random());
+ doThrow(new YarnException("KABOOM!"))
+ .when(logAggregationService).initAppAggregator(
+ eq(appId), eq(user), any(Credentials.class),
+ any(ContainerLogsRetentionPolicy.class), anyMap());
+
+ logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+ this.user, null,
+ ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
+ this.acls));
+
+ dispatcher.await();
+ ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
+ new ApplicationFinishEvent(appId,
+ "Application failed to init aggregation: KABOOM!")
+ };
+ checkEvents(appEventHandler, expectedEvents, false,
+ "getType", "getApplicationID", "getDiagnostic");
+ // no filesystems instantiated yet
+ verify(logAggregationService, never()).closeFileSystems(
+ any(UserGroupInformation.class));
+
+ // verify trying to collect logs for containers/apps we don't know about
+ // doesn't blow up and tear down the NM
+ logAggregationService.handle(new LogHandlerContainerFinishedEvent(
+ BuilderUtils.newContainerId(4, 1, 1, 1), 0));
+ dispatcher.await();
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
+ BuilderUtils.newApplicationId(1, 5)));
+ dispatcher.await();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testLogAggregationCreateDirsFailsWithoutKillingNM()
+ throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
@@ -399,10 +455,8 @@ public class TestLogAggregationService e
ApplicationId appId = BuilderUtils.newApplicationId(
System.currentTimeMillis(), (int)Math.random());
doThrow(new YarnException("KABOOM!"))
- .when(logAggregationService).initAppAggregator(
- eq(appId), eq(user), any(Credentials.class),
- any(ContainerLogsRetentionPolicy.class), anyMap());
-
+ .when(logAggregationService).createAppDir(any(String.class),
+ any(ApplicationId.class), any(UserGroupInformation.class));
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
@@ -413,6 +467,9 @@ public class TestLogAggregationService e
};
checkEvents(appEventHandler, expectedEvents, false,
"getType", "getApplicationID", "getDiagnostic");
+ // filesystems may have been instantiated
+ verify(logAggregationService).closeFileSystems(
+ any(UserGroupInformation.class));
// verify trying to collect logs for containers/apps we don't know about
// doesn't blow up and tear down the NM
@@ -423,7 +480,7 @@ public class TestLogAggregationService e
BuilderUtils.newApplicationId(1, 5)));
dispatcher.await();
}
-
+
private void writeContainerLogs(File appLogDir, ContainerId containerId)
throws IOException {
// ContainerLogDir should be created
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Fri Aug 24 20:38:08 2012
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
@@ -120,14 +121,41 @@ public class QueueMetrics implements Met
enableUserMetrics, conf);
}
- public static QueueMetrics forQueue(MetricsSystem ms, String queueName,
+ /**
+ * Helper method to clear cache - used only for unit tests.
+ */
+ @Private
+ public synchronized static void clearQueueMetrics() {
+ queueMetrics.clear();
+ }
+
+ /**
+ * Simple metrics cache to help prevent re-registrations.
+ */
+ private static Map<String, QueueMetrics> queueMetrics =
+ new HashMap<String, QueueMetrics>();
+
+ public synchronized
+ static QueueMetrics forQueue(MetricsSystem ms, String queueName,
Queue parent, boolean enableUserMetrics,
Configuration conf) {
- QueueMetrics metrics =
- new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf
- ).tag(QUEUE_INFO, queueName);
- return ms == null ? metrics : ms.register(sourceName(queueName).toString(),
- "Metrics for queue: " + queueName, metrics);
+ QueueMetrics metrics = queueMetrics.get(queueName);
+ if (metrics == null) {
+ metrics =
+ new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf).
+ tag(QUEUE_INFO, queueName);
+
+ // Register with the MetricsSystems
+ if (ms != null) {
+ metrics =
+ ms.register(
+ sourceName(queueName).toString(),
+ "Metrics for queue: " + queueName, metrics);
+ }
+ queueMetrics.put(queueName, metrics);
+ }
+
+ return metrics;
}
public synchronized QueueMetrics getUserMetrics(String userName) {
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Fri Aug 24 20:38:08 2012
@@ -38,14 +38,22 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
public class TestQueueMetrics {
static final int GB = 1024; // MB
private static final Configuration conf = new Configuration();
- final MetricsSystem ms = new MetricsSystemImpl();
+ private MetricsSystem ms;
+ @Before
+ public void setUp() {
+ ms = new MetricsSystemImpl();
+ QueueMetrics.clearQueueMetrics();
+ }
+
@Test public void testDefaultSingleQueueMetrics() {
String queueName = "single";
String user = "alice";
@@ -226,6 +234,37 @@ public class TestQueueMetrics {
checkApps(userSource, 1, 0, 0, 1, 0, 0);
checkApps(parentUserSource, 1, 0, 0, 1, 0, 0);
}
+
+ @Test
+ public void testMetricsCache() {
+ MetricsSystem ms = new MetricsSystemImpl("cache");
+ ms.start();
+
+ try {
+ String p1 = "root1";
+ String leafQueueName = "root1.leaf";
+
+ QueueMetrics p1Metrics =
+ QueueMetrics.forQueue(ms, p1, null, true, conf);
+ Queue parentQueue1 = make(stub(Queue.class).returning(p1Metrics).
+ from.getMetrics());
+ QueueMetrics metrics =
+ QueueMetrics.forQueue(ms, leafQueueName, parentQueue1, true, conf);
+
+ Assert.assertNotNull("QueueMetrics for A shoudn't be null", metrics);
+
+ // Re-register to check for cache hit, shouldn't blow up metrics-system...
+ // also, verify parent-metrics
+ QueueMetrics alterMetrics =
+ QueueMetrics.forQueue(ms, leafQueueName, parentQueue1, true, conf);
+
+ Assert.assertNotNull("QueueMetrics for alterMetrics shoudn't be null",
+ alterMetrics);
+ } finally {
+ ms.shutdown();
+ }
+ }
+
public static void checkApps(MetricsSource source, int submitted, int pending,
int running, int completed, int failed, int killed) {
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Fri Aug 24 20:38:08 2012
@@ -98,7 +98,8 @@ public class TestLeafQueue {
csConf =
new CapacitySchedulerConfiguration();
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
- setupQueueConfiguration(csConf);
+ final String newRoot = "root" + System.currentTimeMillis();
+ setupQueueConfiguration(csConf, newRoot);
YarnConfiguration conf = new YarnConfiguration();
cs.setConf(conf);
@@ -112,7 +113,8 @@ public class TestLeafQueue {
when(csContext.getClusterResources()).
thenReturn(Resources.createResource(100 * 16 * GB));
root =
- CapacityScheduler.parseQueue(csContext, csConf, null, "root",
+ CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT,
queues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
@@ -126,25 +128,33 @@ public class TestLeafQueue {
private static final String C = "c";
private static final String C1 = "c1";
private static final String D = "d";
- private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+ private void setupQueueConfiguration(
+ CapacitySchedulerConfiguration conf,
+ final String newRoot) {
// Define top-level queues
- conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D});
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {newRoot});
conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100);
conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
- final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
+ final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
+ conf.setQueues(Q_newRoot, new String[] {A, B, C, D});
+ conf.setCapacity(Q_newRoot, 100);
+ conf.setMaximumCapacity(Q_newRoot, 100);
+ conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
+
+ final String Q_A = Q_newRoot + "." + A;
conf.setCapacity(Q_A, 8.5f);
conf.setMaximumCapacity(Q_A, 20);
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
- final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
+ final String Q_B = Q_newRoot + "." + B;
conf.setCapacity(Q_B, 80);
conf.setMaximumCapacity(Q_B, 99);
conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*");
- final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
+ final String Q_C = Q_newRoot + "." + C;
conf.setCapacity(Q_C, 1.5f);
conf.setMaximumCapacity(Q_C, 10);
conf.setAcl(Q_C, QueueACL.SUBMIT_APPLICATIONS, " ");
@@ -154,7 +164,7 @@ public class TestLeafQueue {
final String Q_C1 = Q_C + "." + C1;
conf.setCapacity(Q_C1, 100);
- final String Q_D = CapacitySchedulerConfiguration.ROOT + "." + D;
+ final String Q_D = Q_newRoot + "." + D;
conf.setCapacity(Q_D, 10);
conf.setMaximumCapacity(Q_D, 11);
conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm Fri Aug 24 20:38:08 2012
@@ -314,6 +314,19 @@ Hadoop MapReduce Next Generation - Clust
| | | Shuffle service that needs to be set for Map Reduce applications. |
*-------------------------+-------------------------+------------------------+
+ * Configurations for History Server (Needs to be moved elsewhere):
+
+*-------------------------+-------------------------+------------------------+
+|| Parameter || Value || Notes |
+*-------------------------+-------------------------+------------------------+
+| <<<yarn.log-aggregation.retain-seconds>>> | | |
+| | <-1> | |
+| | | How long to keep aggregation logs before deleting them. -1 disables. |
+| | | Be careful, set this too small and you will spam the name node. |
+*-------------------------+-------------------------+------------------------+
+
+
+
* <<<conf/mapred-site.xml>>>
* Configurations for MapReduce Applications:
Modified: hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/pom.xml?rev=1377092&r1=1377091&r2=1377092&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/pom.xml (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-yarn-project/hadoop-yarn/pom.xml Fri Aug 24 20:38:08 2012
@@ -125,6 +125,10 @@
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</dependency>