You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/05/04 08:53:53 UTC
svn commit: r1099337 [2/3] - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
mr-client/hadoop-mapreduce-client-shuffle/src/...
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java Wed May 4 06:53:52 2011
@@ -0,0 +1,197 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class LogDumper extends Configured implements Tool {
+
+ private static final String CONTAINER_ID_OPTION = "containerId";
+ private static final String APPLICATION_ID_OPTION = "applicationId";
+ private static final String NODE_ADDRESS_OPTION = "nodeAddress";
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ Options opts = new Options();
+ opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId");
+ opts.addOption(CONTAINER_ID_OPTION, true, "ContainerId");
+ opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress");
+
+ if (args.length < 1) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("general options are: ", opts);
+ return -1;
+ }
+
+ CommandLineParser parser = new GnuParser();
+ String appIdStr = null;
+ String containerIdStr = null;
+ String nodeAddress = null;
+ try {
+ CommandLine commandLine = parser.parse(opts, args, true);
+ appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION);
+ containerIdStr = commandLine.getOptionValue(CONTAINER_ID_OPTION);
+ nodeAddress = commandLine.getOptionValue(NODE_ADDRESS_OPTION);
+ } catch (ParseException e) {
+ System.out.println("options parsing failed: " + e.getMessage());
+
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("general options are: ", opts);
+ return -1;
+ }
+
+ if (appIdStr == null) {
+ System.out.println("ApplicationId cannot be null!");
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("general options are: ", opts);
+ return -1;
+ }
+
+ RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(getConf());
+ ApplicationId appId =
+ ConverterUtils.toApplicationId(recordFactory, appIdStr);
+
+ DataOutputStream out = new DataOutputStream(System.out);
+
+ if (containerIdStr == null && nodeAddress == null) {
+ dumpAllContainersLogs(appId, out);
+ } else if ((containerIdStr == null && nodeAddress != null)
+ || (containerIdStr != null && nodeAddress == null)) {
+ System.out.println("ContainerId or NodeAddress cannot be null!");
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("general options are: ", opts);
+ return -1;
+ } else {
+ Path remoteRootLogDir =
+ new Path(getConf().get(NMConfig.REMOTE_USER_LOG_DIR,
+ NMConfig.DEFAULT_REMOTE_APP_LOG_DIR));
+ AggregatedLogFormat.LogReader reader =
+ new AggregatedLogFormat.LogReader(getConf(),
+ LogAggregationService.getRemoteNodeLogFileForApp(
+ remoteRootLogDir, appId, nodeAddress));
+ return dumpAContainerLogs(containerIdStr, reader, out);
+ }
+
+ return 0;
+ }
+
+ private int dumpAContainerLogs(String containerIdStr,
+ AggregatedLogFormat.LogReader reader, DataOutputStream out)
+ throws IOException {
+ DataInputStream valueStream;
+ LogKey key = new LogKey();
+ valueStream = reader.next(key);
+
+ while (valueStream != null && !key.toString().equals(containerIdStr)) {
+ // Next container
+ key = new LogKey();
+ valueStream = reader.next(key);
+ }
+
+ if (valueStream == null) {
+ System.out.println("Logs for container " + containerIdStr
+ + " are not present in this log-file.");
+ return -1;
+ }
+
+ while (true) {
+ try {
+ LogReader.readAContainerLogsForALogType(valueStream, out);
+ } catch (EOFException eof) {
+ break;
+ }
+ }
+ return 0;
+ }
+
+ private void
+ dumpAllContainersLogs(ApplicationId appId, DataOutputStream out)
+ throws IOException {
+ Path remoteRootLogDir =
+ new Path(getConf().get(NMConfig.REMOTE_USER_LOG_DIR,
+ NMConfig.DEFAULT_REMOTE_APP_LOG_DIR));
+ Path remoteAppLogDir =
+ LogAggregationService.getRemoteAppLogDir(remoteRootLogDir, appId);
+ RemoteIterator<FileStatus> nodeFiles =
+ FileContext.getFileContext().listStatus(remoteAppLogDir);
+ while (nodeFiles.hasNext()) {
+ FileStatus thisNodeFile = nodeFiles.next();
+ AggregatedLogFormat.LogReader reader =
+ new AggregatedLogFormat.LogReader(getConf(),
+ LogAggregationService.getRemoteNodeLogFileForApp(
+ remoteRootLogDir, appId, thisNodeFile.getPath().getName()));
+ try {
+
+ DataInputStream valueStream;
+ LogKey key = new LogKey();
+ valueStream = reader.next(key);
+
+ while (valueStream != null) {
+ while (true) {
+ try {
+ LogReader.readAContainerLogsForALogType(valueStream, out);
+ } catch (EOFException eof) {
+ break;
+ }
+ }
+
+ // Next container
+ key = new LogKey();
+ valueStream = reader.next(key);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new YarnConfiguration();
+ LogDumper logDumper = new LogDumper();
+ logDumper.setConf(conf);
+ logDumper.run(args);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java Wed May 4 06:53:52 2011
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class LogAggregatorAppFinishedEvent extends LogAggregatorEvent {
+
+ private final ApplicationId applicationId;
+
+ public LogAggregatorAppFinishedEvent(ApplicationId appId) {
+ super(LogAggregatorEventType.APPLICATION_FINISHED);
+ this.applicationId = appId;
+ }
+
+ public ApplicationId getApplicationId() {
+ return this.applicationId;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java Wed May 4 06:53:52 2011
@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
+
+public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
+
+ private final ApplicationId applicationId;
+ private final ContainerLogsRetentionPolicy retentionPolicy;
+ private final String user;
+ private final Credentials credentials;
+
+ public LogAggregatorAppStartedEvent(ApplicationId appId, String user,
+ Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy) {
+ super(LogAggregatorEventType.APPLICATION_STARTED);
+ this.applicationId = appId;
+ this.user = user;
+ this.credentials = credentials;
+ this.retentionPolicy = retentionPolicy;
+ }
+
+ public ApplicationId getApplicationId() {
+ return this.applicationId;
+ }
+
+ public Credentials getCredentials() {
+ return this.credentials;
+ }
+
+ public ContainerLogsRetentionPolicy getLogRetentionPolicy() {
+ return this.retentionPolicy;
+ }
+
+ public String getUser() {
+ return this.user;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java Wed May 4 06:53:52 2011
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class LogAggregatorContainerFinishedEvent extends LogAggregatorEvent {
+
+ private final ContainerId containerId;
+ private final String exitCode;
+
+ public LogAggregatorContainerFinishedEvent(ContainerId containerId,
+ String exitCode) {
+ super(LogAggregatorEventType.CONTAINER_FINISHED);
+ this.containerId = containerId;
+ this.exitCode = exitCode;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+ public String getExitCode() {
+ return this.exitCode;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java Wed May 4 06:53:52 2011
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class LogAggregatorEvent extends AbstractEvent<LogAggregatorEventType>{
+
+ public LogAggregatorEvent(LogAggregatorEventType type) {
+ super(type);
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java Wed May 4 06:53:52 2011
@@ -0,0 +1,23 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+
+public enum LogAggregatorEventType {
+ APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/container-log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/container-log4j.properties?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/container-log4j.properties (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/container-log4j.properties Wed May 4 06:53:52 2011
@@ -21,3 +21,9 @@ log4j.appender.CLA.totalLogFileSize=${ha
log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Wed May 4 06:53:52 2011
@@ -18,22 +18,12 @@
package org.apache.hadoop.yarn.server.nodemanager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-
-import junit.framework.Assert;
+import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
@@ -45,14 +35,17 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
-
-import static org.junit.Assert.*;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
public class DummyContainerManager extends ContainerManagerImpl {
@@ -78,7 +71,7 @@ public class DummyContainerManager exten
((ApplicationLocalizationEvent) event).getApplication();
// Simulate event from ApplicationLocalization.
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
- app.getAppId(), new Path("logdir")));
+ app.getAppId()));
break;
case INIT_CONTAINER_RESOURCES:
ContainerLocalizationRequestEvent rsrcReqs =
@@ -142,31 +135,23 @@ public class DummyContainerManager exten
};
}
- public static void waitForContainerState(ContainerManager containerManager,
- ContainerId containerID, ContainerState finalState)
- throws InterruptedException, YarnRemoteException {
- waitForContainerState(containerManager, containerID, finalState, 20);
- }
-
- public static void waitForContainerState(ContainerManager containerManager,
- ContainerId containerID, ContainerState finalState, int timeOutMax)
- throws InterruptedException, YarnRemoteException {
- GetContainerStatusRequest request = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
- request.setContainerId(containerID);
- ContainerStatus containerStatus =
- containerManager.getContainerStatus(request).getStatus();
- int timeoutSecs = 0;
- while (!containerStatus.getState().equals(finalState)
- && timeoutSecs++ < timeOutMax) {
- Thread.sleep(1000);
- LOG.info("Waiting for container " +
- ConverterUtils.toString(containerID) +
- " to get into state " + finalState
- + ". Current state is " + containerStatus.getState());
- containerStatus = containerManager.getContainerStatus(request).getStatus();
+ @Override
+ protected LogAggregationService createLogAggregationService(
+ DeletionService deletionService) {
+ return new LogAggregationService(deletionService) {
+ @Override
+ public void handle(LogAggregatorEvent event) {
+ switch (event.getType()) {
+ case APPLICATION_STARTED:
+ break;
+ case CONTAINER_FINISHED:
+ break;
+ case APPLICATION_FINISHED:
+ break;
+ default:
+ // Ignore
+ }
}
- LOG.info("Container state is " + containerStatus.getState());
- Assert.assertEquals("ContainerState is not correct (timedout)",
- finalState, containerStatus.getState());
- }
+ };
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Wed May 4 06:53:52 2011
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
@@ -27,8 +26,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -44,6 +41,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.junit.Test;
public class TestEventFlow {
@@ -53,8 +51,10 @@ public class TestEventFlow {
private static File localDir = new File("target",
TestEventFlow.class.getName() + "-localDir").getAbsoluteFile();
- private static File logDir = new File("target",
- TestEventFlow.class.getName() + "-logDir").getAbsoluteFile();
+ private static File localLogDir = new File("target",
+ TestEventFlow.class.getName() + "-localLogDir").getAbsoluteFile();
+ private static File remoteLogDir = new File("target",
+ TestEventFlow.class.getName() + "-remoteLogDir").getAbsoluteFile();
@Test
public void testSuccessfulContainerLaunch() throws InterruptedException,
@@ -63,15 +63,18 @@ public class TestEventFlow {
FileContext localFS = FileContext.getLocalFSFileContext();
localFS.delete(new Path(localDir.getAbsolutePath()), true);
- localFS.delete(new Path(logDir.getAbsolutePath()), true);
+ localFS.delete(new Path(localLogDir.getAbsolutePath()), true);
+ localFS.delete(new Path(remoteLogDir.getAbsolutePath()), true);
localDir.mkdir();
- logDir.mkdir();
+ localLogDir.mkdir();
+ remoteLogDir.mkdir();
Context context = new NMContext();
YarnConfiguration conf = new YarnConfiguration();
conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
- conf.set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
+ conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
+ conf.set(NMConfig.REMOTE_USER_LOG_DIR, remoteLogDir.getAbsolutePath());
ContainerExecutor exec = new DefaultContainerExecutor();
DeletionService del = new DeletionService(exec);
@@ -106,13 +109,13 @@ public class TestEventFlow {
request.setContainerLaunchContext(launchContext);
containerManager.startContainer(request);
- DummyContainerManager.waitForContainerState(containerManager, cID,
+ BaseContainerManagerTest.waitForContainerState(containerManager, cID,
ContainerState.RUNNING);
StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
stopRequest.setContainerId(cID);
containerManager.stopContainer(stopRequest);
- DummyContainerManager.waitForContainerState(containerManager, cID,
+ BaseContainerManagerTest.waitForContainerState(containerManager, cID,
ContainerState.COMPLETE);
containerManager.stop();
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed May 4 06:53:52 2011
@@ -114,7 +114,7 @@ public class TestNodeStatusUpdater {
launchContext.setContainerId(firstContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(2);
- Container container = new ContainerImpl(null, launchContext);
+ Container container = new ContainerImpl(null, launchContext, null);
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
@@ -140,7 +140,7 @@ public class TestNodeStatusUpdater {
launchContext.setContainerId(secondContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(3);
- Container container = new ContainerImpl(null, launchContext);
+ Container container = new ContainerImpl(null, launchContext, null);
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
@@ -211,6 +211,8 @@ public class TestNodeStatusUpdater {
conf.set(NMConfig.NM_BIND_ADDRESS, "127.0.0.1:12345");
conf.set(NMConfig.NM_LOCALIZER_BIND_ADDRESS, "127.0.0.1:12346");
conf.set(NMConfig.NM_LOG_DIR, new Path(basedir, "logs").toUri().getPath());
+ conf.set(NMConfig.REMOTE_USER_LOG_DIR, new Path(basedir, "remotelogs")
+ .toUri().getPath());
conf.set(NMConfig.NM_LOCAL_DIR, new Path(basedir, "nm0").toUri().getPath());
nm.init(conf);
new Thread() {
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Wed May 4 06:53:52 2011
@@ -0,0 +1,208 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class BaseContainerManagerTest {
+
+ protected static RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+ static {
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ }
+
+ protected static FileContext localFS;
+ protected static File localDir;
+ protected static File localLogDir;
+ protected static File remoteLogDir;
+ protected static File tmpDir;
+
+ public BaseContainerManagerTest() throws UnsupportedFileSystemException {
+ localFS = FileContext.getLocalFSFileContext();
+ localDir =
+ new File("target", this.getClass().getName() + "-localDir")
+ .getAbsoluteFile();
+ localLogDir =
+ new File("target", this.getClass().getName() + "-localLogDir")
+ .getAbsoluteFile();
+ remoteLogDir =
+ new File("target", this.getClass().getName() + "-remoteLogDir")
+ .getAbsoluteFile();
+ tmpDir = new File("target", this.getClass().getName() + "-tmpDir");
+ }
+
+ protected static Log LOG = LogFactory
+ .getLog(BaseContainerManagerTest.class);
+
+ protected Configuration conf = new YarnConfiguration();
+ protected Context context = new NMContext();
+ protected ContainerExecutor exec = new DefaultContainerExecutor();
+ protected DeletionService delSrvc;
+ protected String user = "nobody";
+
+ protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
+ context, new AsyncDispatcher(), null) {
+ @Override
+ protected ResourceTracker getRMClient() {
+ return new LocalRMInterface();
+ };
+
+ @Override
+ protected void startStatusUpdater() throws InterruptedException,
+ YarnRemoteException {
+ return; // Don't start any updating thread.
+ }
+ };
+
+ protected ContainerManagerImpl containerManager = null;
+
+ protected ContainerExecutor createContainerExecutor() {
+ return new DefaultContainerExecutor();
+ }
+
+ @Before
+ public void setup() throws IOException {
+ localFS.delete(new Path(localDir.getAbsolutePath()), true);
+ localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
+ localFS.delete(new Path(localLogDir.getAbsolutePath()), true);
+ localFS.delete(new Path(remoteLogDir.getAbsolutePath()), true);
+ localDir.mkdir();
+ tmpDir.mkdir();
+ localLogDir.mkdir();
+ remoteLogDir.mkdir();
+ LOG.info("Created localDir in " + localDir.getAbsolutePath());
+ LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
+
+ String bindAddress = "0.0.0.0:5555";
+ conf.set(NMConfig.NM_BIND_ADDRESS, bindAddress);
+ conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
+ conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
+ conf.set(NMConfig.REMOTE_USER_LOG_DIR, remoteLogDir.getAbsolutePath());
+
+ // Default delSrvc
+ delSrvc = new DeletionService(exec) {
+ @Override
+ public void delete(String user, Path subDir, Path[] baseDirs) {
+ // Don't do any deletions.
+ LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
+ + ", baseDirs - " + baseDirs);
+ };
+ };
+
+ exec = createContainerExecutor();
+ containerManager =
+ new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater);
+ containerManager.init(conf);
+ }
+
+ @After
+ public void tearDown() throws IOException, InterruptedException {
+ if (containerManager != null
+ && containerManager.getServiceState() == STATE.STARTED) {
+ containerManager.stop();
+ }
+ createContainerExecutor().deleteAsUser(user,
+ new Path(localDir.getAbsolutePath()), new Path[] {});
+ }
+
+ public static void waitForContainerState(ContainerManager containerManager,
+ ContainerId containerID, ContainerState finalState)
+ throws InterruptedException, YarnRemoteException {
+ waitForContainerState(containerManager, containerID, finalState, 20);
+ }
+
+ public static void waitForContainerState(ContainerManager containerManager,
+ ContainerId containerID, ContainerState finalState, int timeOutMax)
+ throws InterruptedException, YarnRemoteException {
+ GetContainerStatusRequest request =
+ recordFactory.newRecordInstance(GetContainerStatusRequest.class);
+ request.setContainerId(containerID);
+ ContainerStatus containerStatus =
+ containerManager.getContainerStatus(request).getStatus();
+ int timeoutSecs = 0;
+ while (!containerStatus.getState().equals(finalState)
+ && timeoutSecs++ < timeOutMax) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for container to get into state " + finalState
+ + ". Current state is " + containerStatus.getState());
+ containerStatus = containerManager.getContainerStatus(request).getStatus();
+ }
+ LOG.info("Container state is " + containerStatus.getState());
+ Assert.assertEquals("ContainerState is not correct (timedout)",
+ finalState, containerStatus.getState());
+ }
+
+ static void waitForApplicationState(ContainerManagerImpl containerManager,
+ ApplicationId appID, ApplicationState finalState)
+ throws InterruptedException {
+ // Wait for app-finish
+ Application app =
+ containerManager.context.getApplications().get(appID);
+ int timeout = 0;
+ while (!(app.getApplicationState().equals(finalState))
+ && timeout++ < 15) {
+ LOG.info("Waiting for app to reach " + finalState
+ + ".. Current state is "
+ + app.getApplicationState());
+ Thread.sleep(1000);
+ }
+
+ Assert.assertTrue("App is not in " + finalState + " yet!! Timedout!!",
+ app.getApplicationState().equals(finalState));
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Wed May 4 06:53:52 2011
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Wed May 4 06:53:52 2011
@@ -27,15 +27,10 @@ import java.util.Arrays;
import junit.framework.Assert;
-import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.NodeHealthCheckerService;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
@@ -49,125 +44,25 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.DummyContainerManager;
-import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
-import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.ConverterUtils;
-
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-public class TestContainerManager {
-
- private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- static {
- DefaultMetricsSystem.setMiniClusterMode(true);
- }
-
- protected FileContext localFS;
+public class TestContainerManager extends BaseContainerManagerTest {
public TestContainerManager() throws UnsupportedFileSystemException {
- localFS = FileContext.getLocalFSFileContext();
- }
-
- private static Log LOG = LogFactory.getLog(TestContainerManager.class);
-
- protected static File localDir = new File("target",
- TestContainerManager.class.getName() + "-localDir").getAbsoluteFile();
- protected static File logDir = new File("target",
- TestContainerManager.class.getName() + "-logDir").getAbsoluteFile();
- protected static File tmpDir = new File("target",
- TestContainerManager.class.getName() + "-tmpDir");
-
- protected Configuration conf = new YarnConfiguration();
- private Context context = new NMContext();
- private ContainerExecutor exec = new DefaultContainerExecutor();
- private DeletionService delSrvc;
- private Dispatcher dispatcher = new AsyncDispatcher();
- private NodeHealthCheckerService healthChecker = null;
- private String user = "nobody";
-
- private NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
- context, dispatcher, healthChecker) {
- @Override
- protected ResourceTracker getRMClient() {
- return new LocalRMInterface();
- };
-
- @Override
- protected void startStatusUpdater() throws InterruptedException,
- YarnRemoteException {
- return; // Don't start any updating thread.
- }
- };
-
- private ContainerManagerImpl containerManager = null;
-
- protected ContainerExecutor createContainerExecutor() {
- return new DefaultContainerExecutor();
- }
-
- @Before
- public void setup() throws IOException {
- localFS.delete(new Path(localDir.getAbsolutePath()), true);
- localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
- localFS.delete(new Path(logDir.getAbsolutePath()), true);
- localDir.mkdir();
- tmpDir.mkdir();
- logDir.mkdir();
- LOG.info("Created localDir in " + localDir.getAbsolutePath());
- LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
-
- String bindAddress = "0.0.0.0:5555";
- conf.set(NMConfig.NM_BIND_ADDRESS, bindAddress);
- conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
- conf.set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
-
- // Default delSrvc
- delSrvc = new DeletionService(exec) {
- @Override
- public void delete(String user, Path subDir, Path[] baseDirs) {
- // Don't do any deletions.
- };
- };
-
- exec = createContainerExecutor();
- containerManager =
- new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater);
- containerManager.init(conf);
+ super();
}
- @After
- public void tearDown() throws IOException, InterruptedException {
- if (containerManager != null
- && containerManager.getServiceState() == STATE.STARTED) {
- containerManager.stop();
- }
- createContainerExecutor().deleteAsUser(user,
- new Path(localDir.getAbsolutePath()), new Path[] {});
+ static {
+ LOG = LogFactory.getLog(TestContainerManager.class);
}
@Test
@@ -235,7 +130,7 @@ public class TestContainerManager {
containerManager.startContainer(startRequest);
- DummyContainerManager.waitForContainerState(containerManager, cId,
+ BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
// Now ascertain that the resources are localised correctly.
@@ -349,7 +244,7 @@ public class TestContainerManager {
stopRequest.setContainerId(cId);
containerManager.stopContainer(stopRequest);
- DummyContainerManager.waitForContainerState(containerManager, cId,
+ BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
GetContainerStatusRequest gcsRequest = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
@@ -418,10 +313,10 @@ public class TestContainerManager {
request.setContainerLaunchContext(containerLaunchContext);
containerManager.startContainer(request);
- DummyContainerManager.waitForContainerState(containerManager, cId,
+ BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
- waitForApplicationState(containerManager, cId.getAppId(),
+ BaseContainerManagerTest.waitForApplicationState(containerManager, cId.getAppId(),
ApplicationState.RUNNING);
// Now ascertain that the resources are localised correctly.
@@ -453,7 +348,7 @@ public class TestContainerManager {
containerManager.handle(new CMgrCompletedAppsEvent(Arrays
.asList(new ApplicationId[] { appId })));
- waitForApplicationState(containerManager, cId.getAppId(),
+ BaseContainerManagerTest.waitForApplicationState(containerManager, cId.getAppId(),
ApplicationState.FINISHED);
// Now ascertain that the resources are localised correctly.
@@ -475,118 +370,4 @@ public class TestContainerManager {
Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!",
targetFile.exists());
}
-
-// @Test
-// public void testCommandPreparation() {
-// ContainerLaunchContext container = new ContainerLaunchContext();
-//
-// // ////// Construct the Container-id
-// ApplicationID appId = new ApplicationID();
-// appId.id = 0;
-// appId.clusterTimeStamp = 0;
-// ContainerID containerID = new ContainerID();
-// containerID.appID = appId;
-// containerID.id = 0;
-// container.id = containerID;
-//
-// // The actual environment for the container
-// Path containerWorkDir =
-// NodeManager.getContainerWorkDir(new Path(localDir.getAbsolutePath()),
-// containerID);
-// final Map<String, String> ENVS = new HashMap<String, String>();
-// ENVS.put("JAVA_HOME", "/my/path/to/java-home");
-// ENVS.put("LD_LIBRARY_PATH", "/my/path/to/libraries");
-//
-// File workDir = new File(ContainerBuilderHelper.getWorkDir());
-// File logDir = new File(workDir, "logs");
-// File stdout = new File(logDir, "stdout");
-// File stderr = new File(logDir, "stderr");
-// File tmpDir = new File(workDir, "tmp");
-// File javaHome = new File(ContainerBuilderHelper.getEnvVar("JAVA_HOME"));
-// String ldLibraryPath =
-// ContainerBuilderHelper.getEnvVar("LD_LIBRARY_PATH");
-// List<String> classPaths = new ArrayList<String>();
-// File someJar = new File(workDir, "jar-name.jar");
-// classPaths.add(someJar.toString());
-// classPaths.add(workDir.toString());
-// String PATH_SEPARATOR = System.getProperty("path.separator");
-// String classPath = StringUtils.join(PATH_SEPARATOR, classPaths);
-// File someFile = new File(workDir, "someFileNeededinEnv");
-//
-// NMContainer nmContainer = new NMContainer(container, containerWorkDir) {
-// @Override
-// protected String checkAndGetEnvValue(String envVar) {
-// return ENVS.get(envVar);
-// }
-// };
-// List<CharSequence> command = new ArrayList<CharSequence>();
-// command.add(javaHome + "/bin/java");
-// command.add("-Djava.library.path=" + ldLibraryPath);
-// command.add("-Djava.io.tmpdir=" + tmpDir);
-// command.add("-classpath");
-// command.add(classPath);
-// command.add("2>" + stdout);
-// command.add("1>" + stderr);
-//
-// Map<String, String> env = new HashMap<String, String>();
-// env.put("FILE_IN_ENV", someFile.toString());
-// env.put("JAVA_HOME", javaHome.toString());
-// env.put("LD_LIBRARY_PATH", ldLibraryPath);
-//
-// String actualWorkDir = containerWorkDir.toUri().getPath();
-//
-// String finalCmdSent = "";
-// for (CharSequence cmd : command) {
-// finalCmdSent += cmd + " ";
-// }
-// finalCmdSent.trim();
-// LOG.info("Final command sent is : " + finalCmdSent);
-//
-// // The main method being tested
-// String[] finalCommands =
-// nmContainer.prepareCommandArgs(command, env, actualWorkDir);
-// // //////////////////////////////
-//
-// String finalCmd = "";
-// for (String cmd : finalCommands) {
-// finalCmd += cmd + " ";
-// }
-// finalCmd = finalCmd.trim();
-// LOG.info("Final command for launch is : " + finalCmd);
-//
-// File actualLogDir = new File(actualWorkDir, "logs");
-// File actualStdout = new File(actualLogDir, "stdout");
-// File actualStderr = new File(actualLogDir, "stderr");
-// File actualTmpDir = new File(actualWorkDir, "tmp");
-// File actualSomeJar = new File(actualWorkDir, "jar-name.jar");
-// File actualSomeFileInEnv = new File(actualWorkDir, "someFileNeededinEnv");
-// Assert.assertEquals(actualSomeFileInEnv.toString(),
-// env.get("FILE_IN_ENV"));
-// Assert.assertEquals("/my/path/to/java-home", env.get("JAVA_HOME"));
-// Assert.assertEquals("/my/path/to/libraries", env.get("LD_LIBRARY_PATH"));
-// Assert.assertEquals("/my/path/to/java-home/bin/java"
-// + " -Djava.library.path=/my/path/to/libraries" + " -Djava.io.tmpdir="
-// + actualTmpDir + " -classpath " + actualSomeJar + PATH_SEPARATOR
-// + actualWorkDir + " 2>" + actualStdout + " 1>" + actualStderr,
-// finalCmd);
-// }
-
- static void waitForApplicationState(ContainerManagerImpl containerManager,
- ApplicationId appID, ApplicationState finalState)
- throws InterruptedException {
- // Wait for app-finish
- Application app =
- containerManager.context.getApplications().get(appID);
- int timeout = 0;
- while (!(app.getApplicationState().equals(finalState))
- && timeout++ < 15) {
- LOG.info("Waiting for app to reach " + finalState
- + ".. Current state is "
- + app.getApplicationState());
- Thread.sleep(1000);
- }
-
- Assert.assertTrue("App is not in " + finalState + " yet!! Timedout!!",
- app.getApplicationState().equals(finalState));
- }
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Wed May 4 06:53:52 2011
@@ -83,7 +83,7 @@ public class TestContainer {
final Map<String,LocalResource> localResources = createLocalResources(r);
when(ctxt.getAllLocalResources()).thenReturn(localResources);
- final Container c = new ContainerImpl(dispatcher, ctxt);
+ final Container c = new ContainerImpl(dispatcher, ctxt, null);
assertEquals(ContainerState.NEW, c.getContainerState());
// Verify request for public/private resources to localizer
@@ -136,7 +136,7 @@ public class TestContainer {
final Map<String,LocalResource> localResources = createLocalResources(r);
when(ctxt.getAllLocalResources()).thenReturn(localResources);
- final Container c = new ContainerImpl(dispatcher, ctxt);
+ final Container c = new ContainerImpl(dispatcher, ctxt, null);
assertEquals(ContainerState.NEW, c.getContainerState());
c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
@@ -208,7 +208,7 @@ public class TestContainer {
final Map<String,ByteBuffer> serviceData = createServiceData(r);
when(ctxt.getAllServiceData()).thenReturn(serviceData);
- final Container c = new ContainerImpl(dispatcher, ctxt);
+ final Container c = new ContainerImpl(dispatcher, ctxt, null);
assertEquals(ContainerState.NEW, c.getContainerState());
// Verify propagation of service data to AuxServices
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Wed May 4 06:53:52 2011
@@ -0,0 +1,405 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Test;
+
+public class TestLogAggregationService extends BaseContainerManagerTest {
+
+ static {
+ LOG = LogFactory.getLog(TestLogAggregationService.class);
+ }
+
+ private static RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ private File remoteRootLogDir = new File("target", this.getClass()
+ .getName() + "-remoteLogDir");
+
+ public TestLogAggregationService() throws UnsupportedFileSystemException {
+ super();
+ this.remoteRootLogDir.mkdir();
+ }
+
+ @Override
+ public void tearDown() throws IOException, InterruptedException {
+ super.tearDown();
+ createContainerExecutor().deleteAsUser(user,
+ new Path(remoteRootLogDir.getAbsolutePath()), new Path[] {});
+ }
+
+ @Test
+ public void testLocalFileDeletionAfterUpload() throws IOException {
+ this.delSrvc = new DeletionService(createContainerExecutor());
+ this.conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
+ this.conf.set(NMConfig.REMOTE_USER_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+ LogAggregationService logAggregationService =
+ new LogAggregationService(this.delSrvc);
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+
+ // AppLogDir should be created
+ File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+ app1LogDir.mkdir();
+ logAggregationService
+ .handle(new LogAggregatorAppStartedEvent(
+ application1, this.user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+
+ ContainerId container11 =
+ BuilderUtils.newContainerId(recordFactory, application1, 1);
+ // Simulate log-file creation
+ writeContainerLogs(app1LogDir, container11);
+ logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+ container11, "0"));
+
+ logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ application1));
+
+ logAggregationService.stop();
+
+ String containerIdStr = ConverterUtils.toString(container11);
+ File containerLogDir = new File(app1LogDir, containerIdStr);
+ for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
+ Assert.assertFalse(new File(containerLogDir, fileType).exists());
+ }
+
+ Assert.assertTrue(new File(logAggregationService
+ .getRemoteNodeLogFileForApp(application1).toUri().getPath()).exists());
+ }
+
+ @Test
+ public void testNoContainerOnNode() {
+ this.conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
+ this.conf.set(NMConfig.REMOTE_USER_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+ LogAggregationService logAggregationService =
+ new LogAggregationService(this.delSrvc);
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+
+ // AppLogDir should be created
+ File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+ app1LogDir.mkdir();
+ logAggregationService
+ .handle(new LogAggregatorAppStartedEvent(
+ application1, this.user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+
+ logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ application1));
+
+ logAggregationService.stop();
+
+ Assert
+ .assertFalse(new File(logAggregationService
+ .getRemoteNodeLogFileForApp(application1).toUri().getPath())
+ .exists());
+ }
+
+ @Test
+ public void testMultipleAppsLogAggregation() throws IOException {
+
+ this.conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
+ this.conf.set(NMConfig.REMOTE_USER_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+ LogAggregationService logAggregationService =
+ new LogAggregationService(this.delSrvc);
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+
+ // AppLogDir should be created
+ File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+ app1LogDir.mkdir();
+ logAggregationService
+ .handle(new LogAggregatorAppStartedEvent(
+ application1, this.user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+
+ ContainerId container11 =
+ BuilderUtils.newContainerId(recordFactory, application1, 1);
+ // Simulate log-file creation
+ writeContainerLogs(app1LogDir, container11);
+ logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+ container11, "0"));
+
+ ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
+
+ File app2LogDir = logAggregationService.getLocalAppLogDir(application2);
+ app2LogDir.mkdir();
+ logAggregationService.handle(new LogAggregatorAppStartedEvent(
+ application2, this.user, null,
+ ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY));
+
+ ContainerId container21 =
+ BuilderUtils.newContainerId(recordFactory, application2, 1);
+ writeContainerLogs(app2LogDir, container21);
+ logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+ container21, "0"));
+
+ ContainerId container12 =
+ BuilderUtils.newContainerId(recordFactory, application1, 2);
+ writeContainerLogs(app1LogDir, container12);
+ logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+ container12, "0"));
+
+ ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
+
+ File app3LogDir = logAggregationService.getLocalAppLogDir(application3);
+ app3LogDir.mkdir();
+ logAggregationService.handle(new LogAggregatorAppStartedEvent(
+ application3, this.user, null,
+ ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY));
+
+ ContainerId container31 =
+ BuilderUtils.newContainerId(recordFactory, application3, 1);
+ writeContainerLogs(app3LogDir, container31);
+ logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+ container31, "0"));
+
+ ContainerId container32 =
+ BuilderUtils.newContainerId(recordFactory, application3, 2);
+ writeContainerLogs(app3LogDir, container32);
+ logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+ container32, "1")); // Failed container
+
+ ContainerId container22 =
+ BuilderUtils.newContainerId(recordFactory, application2, 2);
+ writeContainerLogs(app2LogDir, container22);
+ logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+ container22, "0"));
+
+ ContainerId container33 =
+ BuilderUtils.newContainerId(recordFactory, application3, 3);
+ writeContainerLogs(app3LogDir, container33);
+ logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+ container33, "0"));
+
+ logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ application2));
+ logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ application3));
+ logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ application1));
+
+ logAggregationService.stop();
+
+ verifyContainerLogs(logAggregationService, application1,
+ new ContainerId[] { container11, container12 });
+ verifyContainerLogs(logAggregationService, application2,
+ new ContainerId[] { container21 });
+ verifyContainerLogs(logAggregationService, application3,
+ new ContainerId[] { container31, container32 });
+ }
+
+ private void writeContainerLogs(File appLogDir, ContainerId containerId)
+ throws IOException {
+ // ContainerLogDir should be created
+ String containerStr = ConverterUtils.toString(containerId);
+ File containerLogDir = new File(appLogDir, containerStr);
+ containerLogDir.mkdir();
+ for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
+ Writer writer11 = new FileWriter(new File(containerLogDir, fileType));
+ writer11.write(containerStr + " Hello " + fileType + "!");
+ writer11.close();
+ }
+ }
+
+ private void verifyContainerLogs(
+ LogAggregationService logAggregationService, ApplicationId appId,
+ ContainerId[] expectedContainerIds) throws IOException {
+ AggregatedLogFormat.LogReader reader =
+ new AggregatedLogFormat.LogReader(this.conf,
+ logAggregationService.getRemoteNodeLogFileForApp(appId));
+ try {
+ Map<String, Map<String, String>> logMap =
+ new HashMap<String, Map<String, String>>();
+ DataInputStream valueStream;
+
+ LogKey key = new LogKey();
+ valueStream = reader.next(key);
+
+ while (valueStream != null) {
+ LOG.info("Found container " + key.toString());
+ Map<String, String> perContainerMap = new HashMap<String, String>();
+ logMap.put(key.toString(), perContainerMap);
+
+ while (true) {
+ try {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ LogReader.readAContainerLogsForALogType(valueStream, dob);
+
+ DataInputBuffer dib = new DataInputBuffer();
+ dib.reset(dob.getData(), dob.getLength());
+
+ Assert.assertEquals("\nLogType:", dib.readUTF());
+ String fileType = dib.readUTF();
+
+ Assert.assertEquals("\nLogLength:", dib.readUTF());
+ String fileLengthStr = dib.readUTF();
+ long fileLength = Long.parseLong(fileLengthStr);
+
+ Assert.assertEquals("\nLog Contents:\n", dib.readUTF());
+ byte[] buf = new byte[(int) fileLength]; // cast is okay in this
+ // test.
+ dib.read(buf, 0, (int) fileLength);
+ perContainerMap.put(fileType, new String(buf));
+
+ LOG.info("LogType:" + fileType);
+ LOG.info("LogType:" + fileLength);
+ LOG.info("Log Contents:\n" + perContainerMap.get(fileType));
+ } catch (EOFException eof) {
+ break;
+ }
+ }
+
+ // Next container
+ key = new LogKey();
+ valueStream = reader.next(key);
+ }
+
+ // 1 for each container
+ Assert.assertEquals(expectedContainerIds.length, logMap.size());
+ for (ContainerId cId : expectedContainerIds) {
+ String containerStr = ConverterUtils.toString(cId);
+ Map<String, String> thisContainerMap = logMap.remove(containerStr);
+ Assert.assertEquals(3, thisContainerMap.size());
+ for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
+ String expectedValue = containerStr + " Hello " + fileType + "!";
+ LOG.info("Expected log-content : " + new String(expectedValue));
+ String foundValue = thisContainerMap.remove(fileType);
+ Assert.assertNotNull(cId + " " + fileType
+ + " not present in aggregated log-file!", foundValue);
+ Assert.assertEquals(expectedValue, foundValue);
+ }
+ Assert.assertEquals(0, thisContainerMap.size());
+ }
+ Assert.assertEquals(0, logMap.size());
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public void testLogAggregationForRealContainerLaunch() throws IOException,
+ InterruptedException {
+
+ this.containerManager.start();
+
+ File scriptFile = new File(tmpDir, "scriptFile.sh");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ fileWriter.write("\necho Hello World! Stdout! > "
+ + new File(localLogDir, "stdout"));
+ fileWriter.write("\necho Hello World! Stderr! > "
+ + new File(localLogDir, "stderr"));
+ fileWriter.write("\necho Hello World! Syslog! > "
+ + new File(localLogDir, "syslog"));
+ fileWriter.close();
+
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ // ////// Construct the Container-id
+ ApplicationId appId =
+ recordFactory.newRecordInstance(ApplicationId.class);
+ ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
+ cId.setAppId(appId);
+ containerLaunchContext.setContainerId(cId);
+
+ containerLaunchContext.setUser(this.user);
+
+ URL resource_alpha =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file";
+ containerLaunchContext.setLocalResource(destinationFile, rsrc_alpha);
+ containerLaunchContext.setUser(containerLaunchContext.getUser());
+ containerLaunchContext.addCommand("/bin/bash");
+ containerLaunchContext.addCommand(scriptFile.getAbsolutePath());
+ containerLaunchContext.setResource(recordFactory
+ .newRecordInstance(Resource.class));
+ containerLaunchContext.getResource().setMemory(100 * 1024 * 1024);
+ StartContainerRequest startRequest =
+ recordFactory.newRecordInstance(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ this.containerManager.startContainer(startRequest);
+
+ BaseContainerManagerTest.waitForContainerState(this.containerManager,
+ cId, ContainerState.COMPLETE);
+
+ this.containerManager.handle(new CMgrCompletedAppsEvent(Arrays
+ .asList(appId)));
+ this.containerManager.stop();
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Wed May 4 06:53:52 2011
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import static org.junit.Assert.assertFalse;
@@ -12,15 +30,10 @@ import java.util.regex.Pattern;
import junit.framework.Assert;
-import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.NodeHealthCheckerService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -33,116 +46,32 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.ResourceTracker;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.DummyContainerManager;
-import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.TestProcfsBasedProcessTree;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class TestContainersMonitor {
-
- private static final Log LOG = LogFactory
- .getLog(TestContainersMonitor.class);
-
- protected static File localDir = new File("target",
- TestContainersMonitor.class.getName() + "-localDir").getAbsoluteFile();
- protected static File logDir = new File("target",
- TestContainersMonitor.class.getName() + "-logDir").getAbsoluteFile();
- protected static File tmpDir = new File("target",
- TestContainersMonitor.class.getName() + "-tmpDir");
-
- private static RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
- static {
- DefaultMetricsSystem.setMiniClusterMode(true);
- }
-
- protected FileContext localFS;
+public class TestContainersMonitor extends BaseContainerManagerTest {
public TestContainersMonitor() throws UnsupportedFileSystemException {
- localFS = FileContext.getLocalFSFileContext();
+ super();
}
- protected Configuration conf = new YarnConfiguration();
- private Context context = new NMContext();
- private ContainerExecutor exec = new DefaultContainerExecutor();
- private DeletionService delSrvc= new DeletionService(exec);
- private Dispatcher dispatcher = new AsyncDispatcher();
- private NodeHealthCheckerService healthChecker = null;
- private String user = "nobody";
-
- private NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
- context, dispatcher, healthChecker) {
- @Override
- protected ResourceTracker getRMClient() {
- return new LocalRMInterface();
- };
-
- @Override
- protected void startStatusUpdater() throws InterruptedException,
- YarnRemoteException {
- return; // Don't start any updating thread.
- }
- };
-
- private ContainerManagerImpl containerManager = null;
-
+ static {
+ LOG = LogFactory.getLog(TestContainersMonitor.class);
+ }
@Before
public void setup() throws IOException {
- localFS.delete(new Path(localDir.getAbsolutePath()), true);
- localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
- localFS.delete(new Path(logDir.getAbsolutePath()), true);
- localDir.mkdir();
- tmpDir.mkdir();
- logDir.mkdir();
- LOG.info("Created localDir in " + localDir.getAbsolutePath());
- LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
-
- String bindAddress = "0.0.0.0:5555";
- conf.set(NMConfig.NM_BIND_ADDRESS, bindAddress);
- conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
- conf.set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
-
- containerManager =
- new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater);
conf.setClass(
ContainersMonitorImpl.RESOURCE_CALCULATOR_PLUGIN_CONFIG_KEY,
LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
- containerManager.init(conf);
- }
-
- @After
- public void tearDown() throws IOException, InterruptedException {
- if (containerManager != null
- && containerManager.getServiceState() == STATE.STARTED) {
- containerManager.stop();
- }
- exec.deleteAsUser(user, new Path(localDir.getAbsolutePath()),
- new Path[] {});
+ super.setup();
}
/**
@@ -308,7 +237,7 @@ public class TestContainersMonitor {
// No more lines
Assert.assertEquals(null, reader.readLine());
- DummyContainerManager.waitForContainerState(containerManager, cId,
+ BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE, 60);
GetContainerStatusRequest gcsRequest =
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Wed May 4 06:53:52 2011
@@ -101,7 +101,7 @@ public class TestNMWebServer {
recordFactory.newRecordInstance(ContainerLaunchContext.class);
launchContext.setContainerId(containerId);
launchContext.setUser(user);
- Container container = new ContainerImpl(dispatcher, launchContext) {
+ Container container = new ContainerImpl(dispatcher, launchContext, null) {
public ContainerState getContainerState() {
return ContainerState.RUNNING;
};