You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/05/04 08:53:53 UTC

svn commit: r1099337 [2/3] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ mr-client/hadoop-mapreduce-client-shuffle/src/...

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java Wed May  4 06:53:52 2011
@@ -0,0 +1,197 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class LogDumper extends Configured implements Tool {
+
+  private static final String CONTAINER_ID_OPTION = "containerId";
+  private static final String APPLICATION_ID_OPTION = "applicationId";
+  private static final String NODE_ADDRESS_OPTION = "nodeAddress";
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    Options opts = new Options();
+    opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId");
+    opts.addOption(CONTAINER_ID_OPTION, true, "ContainerId");
+    opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress");
+
+    if (args.length < 1) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("general options are: ", opts);
+      return -1;
+    }
+
+    CommandLineParser parser = new GnuParser();
+    String appIdStr = null;
+    String containerIdStr = null;
+    String nodeAddress = null;
+    try {
+      CommandLine commandLine = parser.parse(opts, args, true);
+      appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION);
+      containerIdStr = commandLine.getOptionValue(CONTAINER_ID_OPTION);
+      nodeAddress = commandLine.getOptionValue(NODE_ADDRESS_OPTION);
+    } catch (ParseException e) {
+      System.out.println("options parsing failed: " + e.getMessage());
+
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("general options are: ", opts);
+      return -1;
+    }
+
+    if (appIdStr == null) {
+      System.out.println("ApplicationId cannot be null!");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("general options are: ", opts);
+      return -1;
+    }
+
+    RecordFactory recordFactory =
+        RecordFactoryProvider.getRecordFactory(getConf());
+    ApplicationId appId =
+        ConverterUtils.toApplicationId(recordFactory, appIdStr);
+
+    DataOutputStream out = new DataOutputStream(System.out);
+
+    if (containerIdStr == null && nodeAddress == null) {
+      dumpAllContainersLogs(appId, out);
+    } else if ((containerIdStr == null && nodeAddress != null)
+        || (containerIdStr != null && nodeAddress == null)) {
+      System.out.println("ContainerId or NodeAddress cannot be null!");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("general options are: ", opts);
+      return -1;
+    } else {
+      Path remoteRootLogDir =
+        new Path(getConf().get(NMConfig.REMOTE_USER_LOG_DIR,
+            NMConfig.DEFAULT_REMOTE_APP_LOG_DIR));
+      AggregatedLogFormat.LogReader reader =
+          new AggregatedLogFormat.LogReader(getConf(),
+              LogAggregationService.getRemoteNodeLogFileForApp(
+                  remoteRootLogDir, appId, nodeAddress));
+      return dumpAContainerLogs(containerIdStr, reader, out);
+    }
+
+    return 0;
+  }
+
+  private int dumpAContainerLogs(String containerIdStr,
+      AggregatedLogFormat.LogReader reader, DataOutputStream out)
+      throws IOException {
+    DataInputStream valueStream;
+    LogKey key = new LogKey();
+    valueStream = reader.next(key);
+
+    while (valueStream != null && !key.toString().equals(containerIdStr)) {
+      // Next container
+      key = new LogKey();
+      valueStream = reader.next(key);
+    }
+
+    if (valueStream == null) {
+      System.out.println("Logs for container " + containerIdStr
+          + " are not present in this log-file.");
+      return -1;
+    }
+
+    while (true) {
+      try {
+        LogReader.readAContainerLogsForALogType(valueStream, out);
+      } catch (EOFException eof) {
+        break;
+      }
+    }
+    return 0;
+  }
+
+  private void
+      dumpAllContainersLogs(ApplicationId appId, DataOutputStream out)
+          throws IOException {
+    Path remoteRootLogDir =
+        new Path(getConf().get(NMConfig.REMOTE_USER_LOG_DIR,
+            NMConfig.DEFAULT_REMOTE_APP_LOG_DIR));
+    Path remoteAppLogDir =
+        LogAggregationService.getRemoteAppLogDir(remoteRootLogDir, appId);
+    RemoteIterator<FileStatus> nodeFiles =
+        FileContext.getFileContext().listStatus(remoteAppLogDir);
+    while (nodeFiles.hasNext()) {
+      FileStatus thisNodeFile = nodeFiles.next();
+      AggregatedLogFormat.LogReader reader =
+          new AggregatedLogFormat.LogReader(getConf(),
+              LogAggregationService.getRemoteNodeLogFileForApp(
+                  remoteRootLogDir, appId, thisNodeFile.getPath().getName()));
+      try {
+
+        DataInputStream valueStream;
+        LogKey key = new LogKey();
+        valueStream = reader.next(key);
+
+        while (valueStream != null) {
+          while (true) {
+            try {
+              LogReader.readAContainerLogsForALogType(valueStream, out);
+            } catch (EOFException eof) {
+              break;
+            }
+          }
+
+          // Next container
+          key = new LogKey();
+          valueStream = reader.next(key);
+        }
+      } finally {
+        reader.close();
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new YarnConfiguration();
+    LogDumper logDumper = new LogDumper();
+    logDumper.setConf(conf);
+    logDumper.run(args);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java Wed May  4 06:53:52 2011
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class LogAggregatorAppFinishedEvent extends LogAggregatorEvent {
+
+  private final ApplicationId applicationId;
+
+  public LogAggregatorAppFinishedEvent(ApplicationId appId) {
+    super(LogAggregatorEventType.APPLICATION_FINISHED);
+    this.applicationId = appId;
+  }
+
+  public ApplicationId getApplicationId() {
+    return this.applicationId;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java Wed May  4 06:53:52 2011
@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
+
+public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
+
+  private final ApplicationId applicationId;
+  private final ContainerLogsRetentionPolicy retentionPolicy;
+  private final String user;
+  private final Credentials credentials;
+
+  public LogAggregatorAppStartedEvent(ApplicationId appId, String user,
+      Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy) {
+    super(LogAggregatorEventType.APPLICATION_STARTED);
+    this.applicationId = appId;
+    this.user = user;
+    this.credentials = credentials;
+    this.retentionPolicy = retentionPolicy;
+  }
+
+  public ApplicationId getApplicationId() {
+    return this.applicationId;
+  }
+
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
+
+  public ContainerLogsRetentionPolicy getLogRetentionPolicy() {
+    return this.retentionPolicy;
+  }
+
+  public String getUser() {
+    return this.user;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java Wed May  4 06:53:52 2011
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class LogAggregatorContainerFinishedEvent extends LogAggregatorEvent {
+
+  private final ContainerId containerId;
+  private final String exitCode;
+
+  public LogAggregatorContainerFinishedEvent(ContainerId containerId,
+      String exitCode) {
+    super(LogAggregatorEventType.CONTAINER_FINISHED);
+    this.containerId = containerId;
+    this.exitCode = exitCode;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+  public String getExitCode() {
+    return this.exitCode;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java Wed May  4 06:53:52 2011
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class LogAggregatorEvent extends AbstractEvent<LogAggregatorEventType>{
+
+  public LogAggregatorEvent(LogAggregatorEventType type) {
+    super(type);
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java Wed May  4 06:53:52 2011
@@ -0,0 +1,23 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
+
+public enum LogAggregatorEventType {
+  APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/container-log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/container-log4j.properties?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/container-log4j.properties (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/container-log4j.properties Wed May  4 06:53:52 2011
@@ -21,3 +21,9 @@ log4j.appender.CLA.totalLogFileSize=${ha
 
 log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
 log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Wed May  4 06:53:52 2011
@@ -18,22 +18,12 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-
-import junit.framework.Assert;
+import static org.junit.Assert.fail;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
@@ -45,14 +35,17 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
-
-import static org.junit.Assert.*;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
 
 public class DummyContainerManager extends ContainerManagerImpl {
 
@@ -78,7 +71,7 @@ public class DummyContainerManager exten
               ((ApplicationLocalizationEvent) event).getApplication();
           // Simulate event from ApplicationLocalization.
           dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
-                app.getAppId(), new Path("logdir")));
+                app.getAppId()));
           break;
         case INIT_CONTAINER_RESOURCES:
           ContainerLocalizationRequestEvent rsrcReqs =
@@ -142,31 +135,23 @@ public class DummyContainerManager exten
     };
   }
 
-  public static void waitForContainerState(ContainerManager containerManager,
-      ContainerId containerID, ContainerState finalState)
-      throws InterruptedException, YarnRemoteException {
-    waitForContainerState(containerManager, containerID, finalState, 20);
-  }
-
-  public static void waitForContainerState(ContainerManager containerManager,
-        ContainerId containerID, ContainerState finalState, int timeOutMax)
-        throws InterruptedException, YarnRemoteException {
-      GetContainerStatusRequest request = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-      request.setContainerId(containerID);
-      ContainerStatus containerStatus =
-          containerManager.getContainerStatus(request).getStatus();
-      int timeoutSecs = 0;
-    while (!containerStatus.getState().equals(finalState)
-        && timeoutSecs++ < timeOutMax) {
-        Thread.sleep(1000);
-        LOG.info("Waiting for container " +
-            ConverterUtils.toString(containerID) +
-            " to get into state " + finalState
-            + ". Current state is " + containerStatus.getState());
-        containerStatus = containerManager.getContainerStatus(request).getStatus();
+  @Override
+  protected LogAggregationService createLogAggregationService(
+      DeletionService deletionService) {
+    return new LogAggregationService(deletionService) {
+      @Override
+      public void handle(LogAggregatorEvent event) {
+        switch (event.getType()) {
+        case APPLICATION_STARTED:
+          break;
+        case CONTAINER_FINISHED:
+          break;
+        case APPLICATION_FINISHED:
+          break;
+        default:
+          // Ignore
+        }
       }
-      LOG.info("Container state is " + containerStatus.getState());
-      Assert.assertEquals("ContainerState is not correct (timedout)",
-          finalState, containerStatus.getState());
-    }
+    };
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Wed May  4 06:53:52 2011
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
@@ -27,8 +26,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.NodeHealthCheckerService;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -44,6 +41,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.junit.Test;
 
 public class TestEventFlow {
@@ -53,8 +51,10 @@ public class TestEventFlow {
 
   private static File localDir = new File("target",
       TestEventFlow.class.getName() + "-localDir").getAbsoluteFile();
-  private static File logDir = new File("target",
-      TestEventFlow.class.getName() + "-logDir").getAbsoluteFile();
+  private static File localLogDir = new File("target",
+      TestEventFlow.class.getName() + "-localLogDir").getAbsoluteFile();
+  private static File remoteLogDir = new File("target",
+      TestEventFlow.class.getName() + "-remoteLogDir").getAbsoluteFile();
 
   @Test
   public void testSuccessfulContainerLaunch() throws InterruptedException,
@@ -63,15 +63,18 @@ public class TestEventFlow {
     FileContext localFS = FileContext.getLocalFSFileContext();
 
     localFS.delete(new Path(localDir.getAbsolutePath()), true);
-    localFS.delete(new Path(logDir.getAbsolutePath()), true);
+    localFS.delete(new Path(localLogDir.getAbsolutePath()), true);
+    localFS.delete(new Path(remoteLogDir.getAbsolutePath()), true);
     localDir.mkdir();
-    logDir.mkdir();
+    localLogDir.mkdir();
+    remoteLogDir.mkdir();
 
     Context context = new NMContext();
 
     YarnConfiguration conf = new YarnConfiguration();
     conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
-    conf.set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
+    conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
+    conf.set(NMConfig.REMOTE_USER_LOG_DIR, remoteLogDir.getAbsolutePath());
 
     ContainerExecutor exec = new DefaultContainerExecutor();
     DeletionService del = new DeletionService(exec);
@@ -106,13 +109,13 @@ public class TestEventFlow {
     request.setContainerLaunchContext(launchContext);
     containerManager.startContainer(request);
 
-    DummyContainerManager.waitForContainerState(containerManager, cID,
+    BaseContainerManagerTest.waitForContainerState(containerManager, cID,
         ContainerState.RUNNING);
 
     StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
     stopRequest.setContainerId(cID);
     containerManager.stopContainer(stopRequest);
-    DummyContainerManager.waitForContainerState(containerManager, cID,
+    BaseContainerManagerTest.waitForContainerState(containerManager, cID,
         ContainerState.COMPLETE);
 
     containerManager.stop();

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed May  4 06:53:52 2011
@@ -114,7 +114,7 @@ public class TestNodeStatusUpdater {
         launchContext.setContainerId(firstContainerID);
         launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
         launchContext.getResource().setMemory(2);
-        Container container = new ContainerImpl(null, launchContext);
+        Container container = new ContainerImpl(null, launchContext, null);
         this.context.getContainers().put(firstContainerID, container);
       } else if (heartBeatID == 2) {
         // Checks on the RM end
@@ -140,7 +140,7 @@ public class TestNodeStatusUpdater {
         launchContext.setContainerId(secondContainerID);
         launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
         launchContext.getResource().setMemory(3);
-        Container container = new ContainerImpl(null, launchContext);
+        Container container = new ContainerImpl(null, launchContext, null);
         this.context.getContainers().put(secondContainerID, container);
       } else if (heartBeatID == 3) {
         // Checks on the RM end
@@ -211,6 +211,8 @@ public class TestNodeStatusUpdater {
     conf.set(NMConfig.NM_BIND_ADDRESS, "127.0.0.1:12345");
     conf.set(NMConfig.NM_LOCALIZER_BIND_ADDRESS, "127.0.0.1:12346");
     conf.set(NMConfig.NM_LOG_DIR, new Path(basedir, "logs").toUri().getPath());
+    conf.set(NMConfig.REMOTE_USER_LOG_DIR, new Path(basedir, "remotelogs")
+        .toUri().getPath());
     conf.set(NMConfig.NM_LOCAL_DIR, new Path(basedir, "nm0").toUri().getPath());
     nm.init(conf);
     new Thread() {

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Wed May  4 06:53:52 2011
@@ -0,0 +1,208 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class BaseContainerManagerTest {
+
+  protected static RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+  static {
+    DefaultMetricsSystem.setMiniClusterMode(true);
+  }
+
+  protected static FileContext localFS;
+  protected static File localDir;
+  protected static File localLogDir;
+  protected static File remoteLogDir;
+  protected static File tmpDir;
+
+  public BaseContainerManagerTest() throws UnsupportedFileSystemException {
+    localFS = FileContext.getLocalFSFileContext();
+    localDir =
+        new File("target", this.getClass().getName() + "-localDir")
+            .getAbsoluteFile();
+    localLogDir =
+        new File("target", this.getClass().getName() + "-localLogDir")
+            .getAbsoluteFile();
+    remoteLogDir =
+      new File("target", this.getClass().getName() + "-remoteLogDir")
+          .getAbsoluteFile();
+    tmpDir = new File("target", this.getClass().getName() + "-tmpDir");
+  }
+
+  protected static Log LOG = LogFactory
+      .getLog(BaseContainerManagerTest.class);
+
+  protected Configuration conf = new YarnConfiguration();
+  protected Context context = new NMContext();
+  protected ContainerExecutor exec = new DefaultContainerExecutor();
+  protected DeletionService delSrvc;
+  protected String user = "nobody";
+
+  protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
+      context, new AsyncDispatcher(), null) {
+    @Override
+    protected ResourceTracker getRMClient() {
+      return new LocalRMInterface();
+    };
+
+    @Override
+    protected void startStatusUpdater() throws InterruptedException,
+        YarnRemoteException {
+      return; // Don't start any updating thread.
+    }
+  };
+
+  protected ContainerManagerImpl containerManager = null;
+
+  protected ContainerExecutor createContainerExecutor() {
+    return new DefaultContainerExecutor();
+  }
+
+  @Before
+  public void setup() throws IOException {
+    localFS.delete(new Path(localDir.getAbsolutePath()), true);
+    localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
+    localFS.delete(new Path(localLogDir.getAbsolutePath()), true);
+    localFS.delete(new Path(remoteLogDir.getAbsolutePath()), true);
+    localDir.mkdir();
+    tmpDir.mkdir();
+    localLogDir.mkdir();
+    remoteLogDir.mkdir();
+    LOG.info("Created localDir in " + localDir.getAbsolutePath());
+    LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
+
+    String bindAddress = "0.0.0.0:5555";
+    conf.set(NMConfig.NM_BIND_ADDRESS, bindAddress);
+    conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
+    conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
+    conf.set(NMConfig.REMOTE_USER_LOG_DIR, remoteLogDir.getAbsolutePath());
+
+    // Default delSrvc
+    delSrvc = new DeletionService(exec) {
+      @Override
+      public void delete(String user, Path subDir, Path[] baseDirs) {
+        // Don't do any deletions.
+        LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
+            + ", baseDirs - " + baseDirs); 
+      };
+    };
+
+    exec = createContainerExecutor();
+    containerManager =
+        new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater);
+    containerManager.init(conf);
+  }
+
+  @After
+  public void tearDown() throws IOException, InterruptedException {
+    if (containerManager != null
+        && containerManager.getServiceState() == STATE.STARTED) {
+      containerManager.stop();
+    }
+    createContainerExecutor().deleteAsUser(user,
+        new Path(localDir.getAbsolutePath()), new Path[] {});
+  }
+
+  public static void waitForContainerState(ContainerManager containerManager,
+      ContainerId containerID, ContainerState finalState)
+      throws InterruptedException, YarnRemoteException {
+    waitForContainerState(containerManager, containerID, finalState, 20);
+  }
+
+  public static void waitForContainerState(ContainerManager containerManager,
+          ContainerId containerID, ContainerState finalState, int timeOutMax)
+          throws InterruptedException, YarnRemoteException {
+    GetContainerStatusRequest request =
+        recordFactory.newRecordInstance(GetContainerStatusRequest.class);
+        request.setContainerId(containerID);
+        ContainerStatus containerStatus =
+            containerManager.getContainerStatus(request).getStatus();
+        int timeoutSecs = 0;
+      while (!containerStatus.getState().equals(finalState)
+          && timeoutSecs++ < timeOutMax) {
+          Thread.sleep(1000);
+          LOG.info("Waiting for container to get into state " + finalState
+              + ". Current state is " + containerStatus.getState());
+          containerStatus = containerManager.getContainerStatus(request).getStatus();
+        }
+        LOG.info("Container state is " + containerStatus.getState());
+        Assert.assertEquals("ContainerState is not correct (timedout)",
+            finalState, containerStatus.getState());
+      }
+
+  static void waitForApplicationState(ContainerManagerImpl containerManager,
+      ApplicationId appID, ApplicationState finalState)
+      throws InterruptedException {
+    // Wait for app-finish
+    Application app =
+        containerManager.context.getApplications().get(appID);
+    int timeout = 0;
+    while (!(app.getApplicationState().equals(finalState))
+        && timeout++ < 15) {
+      LOG.info("Waiting for app to reach " + finalState
+          + ".. Current state is "
+          + app.getApplicationState());
+      Thread.sleep(1000);
+    }
+  
+    Assert.assertTrue("App is not in " + finalState + " yet!! Timedout!!",
+        app.getApplicationState().equals(finalState));
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Wed May  4 06:53:52 2011
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Wed May  4 06:53:52 2011
@@ -27,15 +27,10 @@ import java.util.Arrays;
 
 import junit.framework.Assert;
 
-import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.NodeHealthCheckerService;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
@@ -49,125 +44,25 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.DummyContainerManager;
-import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
-import org.apache.hadoop.yarn.service.Service.STATE;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
-public class TestContainerManager {
-
-  private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  static {
-    DefaultMetricsSystem.setMiniClusterMode(true);
-  }
-
-  protected FileContext localFS;
+public class TestContainerManager extends BaseContainerManagerTest {
 
   public TestContainerManager() throws UnsupportedFileSystemException {
-    localFS = FileContext.getLocalFSFileContext();
-  }
-
-  private static Log LOG = LogFactory.getLog(TestContainerManager.class);
-
-  protected static File localDir = new File("target",
-      TestContainerManager.class.getName() + "-localDir").getAbsoluteFile();
-  protected static File logDir = new File("target",
-      TestContainerManager.class.getName() + "-logDir").getAbsoluteFile();
-  protected static File tmpDir = new File("target",
-      TestContainerManager.class.getName() + "-tmpDir");
-
-  protected Configuration conf = new YarnConfiguration();
-  private Context context = new NMContext();
-  private ContainerExecutor exec = new DefaultContainerExecutor();
-  private DeletionService delSrvc;
-  private Dispatcher dispatcher = new AsyncDispatcher();
-  private NodeHealthCheckerService healthChecker = null;
-  private String user = "nobody";
-
-  private NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
-      context, dispatcher, healthChecker) {
-    @Override
-    protected ResourceTracker getRMClient() {
-      return new LocalRMInterface();
-    };
-
-    @Override
-    protected void startStatusUpdater() throws InterruptedException,
-        YarnRemoteException {
-      return; // Don't start any updating thread.
-    }
-  };
-
-  private ContainerManagerImpl containerManager = null;
-
-  protected ContainerExecutor createContainerExecutor() {
-    return new DefaultContainerExecutor();
-  }
-
-  @Before
-  public void setup() throws IOException {
-    localFS.delete(new Path(localDir.getAbsolutePath()), true);
-    localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
-    localFS.delete(new Path(logDir.getAbsolutePath()), true);
-    localDir.mkdir();
-    tmpDir.mkdir();
-    logDir.mkdir();
-    LOG.info("Created localDir in " + localDir.getAbsolutePath());
-    LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
-
-    String bindAddress = "0.0.0.0:5555";
-    conf.set(NMConfig.NM_BIND_ADDRESS, bindAddress);
-    conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
-    conf.set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
-
-    // Default delSrvc
-    delSrvc = new DeletionService(exec) {
-      @Override
-      public void delete(String user, Path subDir, Path[] baseDirs) {
-        // Don't do any deletions.
-      };
-    };
-
-    exec = createContainerExecutor();
-    containerManager =
-        new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater);
-    containerManager.init(conf);
+    super();
   }
 
-  @After
-  public void tearDown() throws IOException, InterruptedException {
-    if (containerManager != null
-        && containerManager.getServiceState() == STATE.STARTED) {
-      containerManager.stop();
-    }
-    createContainerExecutor().deleteAsUser(user,
-        new Path(localDir.getAbsolutePath()), new Path[] {});
+  static {
+    LOG = LogFactory.getLog(TestContainerManager.class);
   }
 
   @Test
@@ -235,7 +130,7 @@ public class TestContainerManager {
     
     containerManager.startContainer(startRequest);
 
-    DummyContainerManager.waitForContainerState(containerManager, cId,
+    BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE);
 
     // Now ascertain that the resources are localised correctly.
@@ -349,7 +244,7 @@ public class TestContainerManager {
     stopRequest.setContainerId(cId);
     containerManager.stopContainer(stopRequest);
 
-    DummyContainerManager.waitForContainerState(containerManager, cId,
+    BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE);
     
     GetContainerStatusRequest gcsRequest = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
@@ -418,10 +313,10 @@ public class TestContainerManager {
     request.setContainerLaunchContext(containerLaunchContext);
     containerManager.startContainer(request);
 
-    DummyContainerManager.waitForContainerState(containerManager, cId,
+    BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE);
 
-    waitForApplicationState(containerManager, cId.getAppId(),
+    BaseContainerManagerTest.waitForApplicationState(containerManager, cId.getAppId(),
         ApplicationState.RUNNING);
 
     // Now ascertain that the resources are localised correctly.
@@ -453,7 +348,7 @@ public class TestContainerManager {
     containerManager.handle(new CMgrCompletedAppsEvent(Arrays
         .asList(new ApplicationId[] { appId })));
 
-    waitForApplicationState(containerManager, cId.getAppId(),
+    BaseContainerManagerTest.waitForApplicationState(containerManager, cId.getAppId(),
         ApplicationState.FINISHED);
 
     // Now ascertain that the resources are localised correctly.
@@ -475,118 +370,4 @@ public class TestContainerManager {
     Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!",
         targetFile.exists());
   }
-
-//  @Test
-//  public void testCommandPreparation() {
-//    ContainerLaunchContext container = new ContainerLaunchContext();
-//
-//    // ////// Construct the Container-id
-//    ApplicationID appId = new ApplicationID();
-//    appId.id = 0;
-//    appId.clusterTimeStamp = 0;
-//    ContainerID containerID = new ContainerID();
-//    containerID.appID = appId;
-//    containerID.id = 0;
-//    container.id = containerID;
-//
-//    // The actual environment for the container
-//    Path containerWorkDir =
-//        NodeManager.getContainerWorkDir(new Path(localDir.getAbsolutePath()),
-//            containerID);
-//    final Map<String, String> ENVS = new HashMap<String, String>();
-//    ENVS.put("JAVA_HOME", "/my/path/to/java-home");
-//    ENVS.put("LD_LIBRARY_PATH", "/my/path/to/libraries");
-//
-//    File workDir = new File(ContainerBuilderHelper.getWorkDir());
-//    File logDir = new File(workDir, "logs");
-//    File stdout = new File(logDir, "stdout");
-//    File stderr = new File(logDir, "stderr");
-//    File tmpDir = new File(workDir, "tmp");
-//    File javaHome = new File(ContainerBuilderHelper.getEnvVar("JAVA_HOME"));
-//    String ldLibraryPath =
-//        ContainerBuilderHelper.getEnvVar("LD_LIBRARY_PATH");
-//    List<String> classPaths = new ArrayList<String>();
-//    File someJar = new File(workDir, "jar-name.jar");
-//    classPaths.add(someJar.toString());
-//    classPaths.add(workDir.toString());
-//    String PATH_SEPARATOR = System.getProperty("path.separator");
-//    String classPath = StringUtils.join(PATH_SEPARATOR, classPaths);
-//    File someFile = new File(workDir, "someFileNeededinEnv");
-//
-//    NMContainer nmContainer = new NMContainer(container, containerWorkDir) {
-//      @Override
-//      protected String checkAndGetEnvValue(String envVar) {
-//        return ENVS.get(envVar);
-//      }
-//    };
-//    List<CharSequence> command = new ArrayList<CharSequence>();
-//    command.add(javaHome + "/bin/java");
-//    command.add("-Djava.library.path=" + ldLibraryPath);
-//    command.add("-Djava.io.tmpdir=" + tmpDir);
-//    command.add("-classpath");
-//    command.add(classPath);
-//    command.add("2>" + stdout);
-//    command.add("1>" + stderr);
-//
-//    Map<String, String> env = new HashMap<String, String>();
-//    env.put("FILE_IN_ENV", someFile.toString());
-//    env.put("JAVA_HOME", javaHome.toString());
-//    env.put("LD_LIBRARY_PATH", ldLibraryPath);
-//
-//    String actualWorkDir = containerWorkDir.toUri().getPath();
-//
-//    String finalCmdSent = "";
-//    for (CharSequence cmd : command) {
-//      finalCmdSent += cmd + " ";
-//    }
-//    finalCmdSent.trim();
-//    LOG.info("Final command sent is : " + finalCmdSent);
-//
-//    // The main method being tested
-//    String[] finalCommands =
-//        nmContainer.prepareCommandArgs(command, env, actualWorkDir);
-//    // //////////////////////////////
-//
-//    String finalCmd = "";
-//    for (String cmd : finalCommands) {
-//      finalCmd += cmd + " ";
-//    }
-//    finalCmd = finalCmd.trim();
-//    LOG.info("Final command for launch is : " + finalCmd);
-//
-//    File actualLogDir = new File(actualWorkDir, "logs");
-//    File actualStdout = new File(actualLogDir, "stdout");
-//    File actualStderr = new File(actualLogDir, "stderr");
-//    File actualTmpDir = new File(actualWorkDir, "tmp");
-//    File actualSomeJar = new File(actualWorkDir, "jar-name.jar");
-//    File actualSomeFileInEnv = new File(actualWorkDir, "someFileNeededinEnv");
-//    Assert.assertEquals(actualSomeFileInEnv.toString(),
-//        env.get("FILE_IN_ENV"));
-//    Assert.assertEquals("/my/path/to/java-home", env.get("JAVA_HOME"));
-//    Assert.assertEquals("/my/path/to/libraries", env.get("LD_LIBRARY_PATH"));
-//    Assert.assertEquals("/my/path/to/java-home/bin/java"
-//        + " -Djava.library.path=/my/path/to/libraries" + " -Djava.io.tmpdir="
-//        + actualTmpDir + " -classpath " + actualSomeJar + PATH_SEPARATOR
-//        + actualWorkDir + " 2>" + actualStdout + " 1>" + actualStderr,
-//        finalCmd);
-//  }
-
-  static void waitForApplicationState(ContainerManagerImpl containerManager,
-      ApplicationId appID, ApplicationState finalState)
-      throws InterruptedException {
-    // Wait for app-finish
-    Application app =
-        containerManager.context.getApplications().get(appID);
-    int timeout = 0;
-    while (!(app.getApplicationState().equals(finalState))
-        && timeout++ < 15) {
-      LOG.info("Waiting for app to reach " + finalState
-          + ".. Current state is "
-          + app.getApplicationState());
-      Thread.sleep(1000);
-    }
-
-    Assert.assertTrue("App is not in " + finalState + " yet!! Timedout!!",
-        app.getApplicationState().equals(finalState));
-  }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Wed May  4 06:53:52 2011
@@ -83,7 +83,7 @@ public class TestContainer {
       final Map<String,LocalResource> localResources = createLocalResources(r);
       when(ctxt.getAllLocalResources()).thenReturn(localResources);
 
-      final Container c = new ContainerImpl(dispatcher, ctxt);
+      final Container c = new ContainerImpl(dispatcher, ctxt, null);
       assertEquals(ContainerState.NEW, c.getContainerState());
 
       // Verify request for public/private resources to localizer
@@ -136,7 +136,7 @@ public class TestContainer {
       final Map<String,LocalResource> localResources = createLocalResources(r);
       when(ctxt.getAllLocalResources()).thenReturn(localResources);
 
-      final Container c = new ContainerImpl(dispatcher, ctxt);
+      final Container c = new ContainerImpl(dispatcher, ctxt, null);
       assertEquals(ContainerState.NEW, c.getContainerState());
 
       c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
@@ -208,7 +208,7 @@ public class TestContainer {
       final Map<String,ByteBuffer> serviceData = createServiceData(r);
       when(ctxt.getAllServiceData()).thenReturn(serviceData);
 
-      final Container c = new ContainerImpl(dispatcher, ctxt);
+      final Container c = new ContainerImpl(dispatcher, ctxt, null);
       assertEquals(ContainerState.NEW, c.getContainerState());
 
       // Verify propagation of service data to AuxServices

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Wed May  4 06:53:52 2011
@@ -0,0 +1,405 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Test;
+
+public class TestLogAggregationService extends BaseContainerManagerTest {
+
+  static {
+    LOG = LogFactory.getLog(TestLogAggregationService.class);
+  }
+
+  private static RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  private File remoteRootLogDir = new File("target", this.getClass()
+      .getName() + "-remoteLogDir");
+
+  public TestLogAggregationService() throws UnsupportedFileSystemException {
+    super();
+    this.remoteRootLogDir.mkdir();
+  }
+
+  @Override
+  public void tearDown() throws IOException, InterruptedException {
+    super.tearDown();
+    createContainerExecutor().deleteAsUser(user,
+        new Path(remoteRootLogDir.getAbsolutePath()), new Path[] {});
+  }
+
+  @Test
+  public void testLocalFileDeletionAfterUpload() throws IOException {
+    this.delSrvc = new DeletionService(createContainerExecutor());
+    this.conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
+    this.conf.set(NMConfig.REMOTE_USER_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+    LogAggregationService logAggregationService =
+        new LogAggregationService(this.delSrvc);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+
+    // AppLogDir should be created
+    File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+    app1LogDir.mkdir();
+    logAggregationService
+        .handle(new LogAggregatorAppStartedEvent(
+            application1, this.user, null,
+            ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+
+    ContainerId container11 =
+        BuilderUtils.newContainerId(recordFactory, application1, 1);
+    // Simulate log-file creation
+    writeContainerLogs(app1LogDir, container11);
+    logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+        container11, "0"));
+
+    logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+        application1));
+
+    logAggregationService.stop();
+
+    String containerIdStr = ConverterUtils.toString(container11);
+    File containerLogDir = new File(app1LogDir, containerIdStr);
+    for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
+      Assert.assertFalse(new File(containerLogDir, fileType).exists());
+    }
+
+    Assert.assertTrue(new File(logAggregationService
+        .getRemoteNodeLogFileForApp(application1).toUri().getPath()).exists());
+  }
+
+  @Test
+  public void testNoContainerOnNode() {
+    this.conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
+    this.conf.set(NMConfig.REMOTE_USER_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+    LogAggregationService logAggregationService =
+        new LogAggregationService(this.delSrvc);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+
+    // AppLogDir should be created
+    File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+    app1LogDir.mkdir();
+    logAggregationService
+        .handle(new LogAggregatorAppStartedEvent(
+            application1, this.user, null,
+            ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+
+    logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+        application1));
+
+    logAggregationService.stop();
+
+    Assert
+        .assertFalse(new File(logAggregationService
+            .getRemoteNodeLogFileForApp(application1).toUri().getPath())
+            .exists());
+  }
+
+  @Test
+  public void testMultipleAppsLogAggregation() throws IOException {
+
+    this.conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
+    this.conf.set(NMConfig.REMOTE_USER_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+    LogAggregationService logAggregationService =
+        new LogAggregationService(this.delSrvc);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+
+    // AppLogDir should be created
+    File app1LogDir = logAggregationService.getLocalAppLogDir(application1);
+    app1LogDir.mkdir();
+    logAggregationService
+        .handle(new LogAggregatorAppStartedEvent(
+            application1, this.user, null,
+            ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+
+    ContainerId container11 =
+        BuilderUtils.newContainerId(recordFactory, application1, 1);
+    // Simulate log-file creation
+    writeContainerLogs(app1LogDir, container11);
+    logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+        container11, "0"));
+
+    ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
+
+    File app2LogDir = logAggregationService.getLocalAppLogDir(application2);
+    app2LogDir.mkdir();
+    logAggregationService.handle(new LogAggregatorAppStartedEvent(
+        application2, this.user, null,
+        ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY));
+
+    ContainerId container21 =
+        BuilderUtils.newContainerId(recordFactory, application2, 1);
+    writeContainerLogs(app2LogDir, container21);
+    logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+        container21, "0"));
+
+    ContainerId container12 =
+        BuilderUtils.newContainerId(recordFactory, application1, 2);
+    writeContainerLogs(app1LogDir, container12);
+    logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+        container12, "0"));
+
+    ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
+
+    File app3LogDir = logAggregationService.getLocalAppLogDir(application3);
+    app3LogDir.mkdir();
+    logAggregationService.handle(new LogAggregatorAppStartedEvent(
+        application3, this.user, null,
+        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY));
+
+    ContainerId container31 =
+        BuilderUtils.newContainerId(recordFactory, application3, 1);
+    writeContainerLogs(app3LogDir, container31);
+    logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+        container31, "0"));
+
+    ContainerId container32 =
+        BuilderUtils.newContainerId(recordFactory, application3, 2);
+    writeContainerLogs(app3LogDir, container32);
+    logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+        container32, "1")); // Failed container
+
+    ContainerId container22 =
+        BuilderUtils.newContainerId(recordFactory, application2, 2);
+    writeContainerLogs(app2LogDir, container22);
+    logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+        container22, "0"));
+
+    ContainerId container33 =
+        BuilderUtils.newContainerId(recordFactory, application3, 3);
+    writeContainerLogs(app3LogDir, container33);
+    logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
+        container33, "0"));
+
+    logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+        application2));
+    logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+        application3));
+    logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+        application1));
+
+    logAggregationService.stop();
+
+    verifyContainerLogs(logAggregationService, application1,
+        new ContainerId[] { container11, container12 });
+    verifyContainerLogs(logAggregationService, application2,
+        new ContainerId[] { container21 });
+    verifyContainerLogs(logAggregationService, application3,
+        new ContainerId[] { container31, container32 });
+  }
+
+  private void writeContainerLogs(File appLogDir, ContainerId containerId)
+      throws IOException {
+    // ContainerLogDir should be created
+    String containerStr = ConverterUtils.toString(containerId);
+    File containerLogDir = new File(appLogDir, containerStr);
+    containerLogDir.mkdir();
+    for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
+      Writer writer11 = new FileWriter(new File(containerLogDir, fileType));
+      writer11.write(containerStr + " Hello " + fileType + "!");
+      writer11.close();
+    }
+  }
+
+  private void verifyContainerLogs(
+      LogAggregationService logAggregationService, ApplicationId appId,
+      ContainerId[] expectedContainerIds) throws IOException {
+    AggregatedLogFormat.LogReader reader =
+        new AggregatedLogFormat.LogReader(this.conf,
+            logAggregationService.getRemoteNodeLogFileForApp(appId));
+    try {
+      Map<String, Map<String, String>> logMap =
+          new HashMap<String, Map<String, String>>();
+      DataInputStream valueStream;
+
+      LogKey key = new LogKey();
+      valueStream = reader.next(key);
+
+      while (valueStream != null) {
+        LOG.info("Found container " + key.toString());
+        Map<String, String> perContainerMap = new HashMap<String, String>();
+        logMap.put(key.toString(), perContainerMap);
+
+        while (true) {
+          try {
+            DataOutputBuffer dob = new DataOutputBuffer();
+            LogReader.readAContainerLogsForALogType(valueStream, dob);
+
+            DataInputBuffer dib = new DataInputBuffer();
+            dib.reset(dob.getData(), dob.getLength());
+
+            Assert.assertEquals("\nLogType:", dib.readUTF());
+            String fileType = dib.readUTF();
+
+            Assert.assertEquals("\nLogLength:", dib.readUTF());
+            String fileLengthStr = dib.readUTF();
+            long fileLength = Long.parseLong(fileLengthStr);
+
+            Assert.assertEquals("\nLog Contents:\n", dib.readUTF());
+            byte[] buf = new byte[(int) fileLength]; // cast is okay in this
+                                                     // test.
+            dib.read(buf, 0, (int) fileLength);
+            perContainerMap.put(fileType, new String(buf));
+
+            LOG.info("LogType:" + fileType);
+            LOG.info("LogType:" + fileLength);
+            LOG.info("Log Contents:\n" + perContainerMap.get(fileType));
+          } catch (EOFException eof) {
+            break;
+          }
+        }
+
+        // Next container
+        key = new LogKey();
+        valueStream = reader.next(key);
+      }
+
+      // 1 for each container
+      Assert.assertEquals(expectedContainerIds.length, logMap.size());
+      for (ContainerId cId : expectedContainerIds) {
+        String containerStr = ConverterUtils.toString(cId);
+        Map<String, String> thisContainerMap = logMap.remove(containerStr);
+        Assert.assertEquals(3, thisContainerMap.size());
+        for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
+          String expectedValue = containerStr + " Hello " + fileType + "!";
+          LOG.info("Expected log-content : " + new String(expectedValue));
+          String foundValue = thisContainerMap.remove(fileType);
+          Assert.assertNotNull(cId + " " + fileType
+              + " not present in aggregated log-file!", foundValue);
+          Assert.assertEquals(expectedValue, foundValue);
+        }
+        Assert.assertEquals(0, thisContainerMap.size());
+      }
+      Assert.assertEquals(0, logMap.size());
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testLogAggregationForRealContainerLaunch() throws IOException,
+      InterruptedException {
+
+    this.containerManager.start();
+
+    File scriptFile = new File(tmpDir, "scriptFile.sh");
+    PrintWriter fileWriter = new PrintWriter(scriptFile);
+    fileWriter.write("\necho Hello World! Stdout! > "
+        + new File(localLogDir, "stdout"));
+    fileWriter.write("\necho Hello World! Stderr! > "
+        + new File(localLogDir, "stderr"));
+    fileWriter.write("\necho Hello World! Syslog! > "
+        + new File(localLogDir, "syslog"));
+    fileWriter.close();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    // ////// Construct the Container-id
+    ApplicationId appId =
+        recordFactory.newRecordInstance(ApplicationId.class);
+    ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
+    cId.setAppId(appId);
+    containerLaunchContext.setContainerId(cId);
+
+    containerLaunchContext.setUser(this.user);
+
+    URL resource_alpha =
+        ConverterUtils.getYarnUrlFromPath(localFS
+            .makeQualified(new Path(scriptFile.getAbsolutePath())));
+    LocalResource rsrc_alpha =
+        recordFactory.newRecordInstance(LocalResource.class);
+    rsrc_alpha.setResource(resource_alpha);
+    rsrc_alpha.setSize(-1);
+    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+    rsrc_alpha.setType(LocalResourceType.FILE);
+    rsrc_alpha.setTimestamp(scriptFile.lastModified());
+    String destinationFile = "dest_file";
+    containerLaunchContext.setLocalResource(destinationFile, rsrc_alpha);
+    containerLaunchContext.setUser(containerLaunchContext.getUser());
+    containerLaunchContext.addCommand("/bin/bash");
+    containerLaunchContext.addCommand(scriptFile.getAbsolutePath());
+    containerLaunchContext.setResource(recordFactory
+        .newRecordInstance(Resource.class));
+    containerLaunchContext.getResource().setMemory(100 * 1024 * 1024);
+    StartContainerRequest startRequest =
+        recordFactory.newRecordInstance(StartContainerRequest.class);
+    startRequest.setContainerLaunchContext(containerLaunchContext);
+    this.containerManager.startContainer(startRequest);
+
+    BaseContainerManagerTest.waitForContainerState(this.containerManager,
+        cId, ContainerState.COMPLETE);
+
+    this.containerManager.handle(new CMgrCompletedAppsEvent(Arrays
+        .asList(appId)));
+    this.containerManager.stop();
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Wed May  4 06:53:52 2011
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
 import static org.junit.Assert.assertFalse;
@@ -12,15 +30,10 @@ import java.util.regex.Pattern;
 
 import junit.framework.Assert;
 
-import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.NodeHealthCheckerService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -33,116 +46,32 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.ResourceTracker;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.DummyContainerManager;
-import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.TestProcfsBasedProcessTree;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestContainersMonitor {
-
-  private static final Log LOG = LogFactory
-      .getLog(TestContainersMonitor.class);
-
-  protected static File localDir = new File("target",
-      TestContainersMonitor.class.getName() + "-localDir").getAbsoluteFile();
-  protected static File logDir = new File("target",
-      TestContainersMonitor.class.getName() + "-logDir").getAbsoluteFile();
-  protected static File tmpDir = new File("target",
-      TestContainersMonitor.class.getName() + "-tmpDir");
-
-  private static RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-  static {
-    DefaultMetricsSystem.setMiniClusterMode(true);
-  }
-
-  protected FileContext localFS;
+public class TestContainersMonitor extends BaseContainerManagerTest {
 
   public TestContainersMonitor() throws UnsupportedFileSystemException {
-    localFS = FileContext.getLocalFSFileContext();
+    super();
   }
 
-  protected Configuration conf = new YarnConfiguration();
-  private Context context = new NMContext();
-  private ContainerExecutor exec = new DefaultContainerExecutor();
-  private DeletionService delSrvc= new DeletionService(exec);
-  private Dispatcher dispatcher = new AsyncDispatcher();
-  private NodeHealthCheckerService healthChecker = null;
-  private String user = "nobody";
-
-  private NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
-      context, dispatcher, healthChecker) {
-    @Override
-    protected ResourceTracker getRMClient() {
-      return new LocalRMInterface();
-    };
-
-    @Override
-    protected void startStatusUpdater() throws InterruptedException,
-        YarnRemoteException {
-      return; // Don't start any updating thread.
-    }
-  };
-
-  private ContainerManagerImpl containerManager = null;
-
+  static {
+    LOG = LogFactory.getLog(TestContainersMonitor.class);
+  }
   @Before
   public void setup() throws IOException {
-    localFS.delete(new Path(localDir.getAbsolutePath()), true);
-    localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
-    localFS.delete(new Path(logDir.getAbsolutePath()), true);
-    localDir.mkdir();
-    tmpDir.mkdir();
-    logDir.mkdir();
-    LOG.info("Created localDir in " + localDir.getAbsolutePath());
-    LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
-
-    String bindAddress = "0.0.0.0:5555";
-    conf.set(NMConfig.NM_BIND_ADDRESS, bindAddress);
-    conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
-    conf.set(NMConfig.NM_LOG_DIR, logDir.getAbsolutePath());
-
-    containerManager =
-        new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater);
     conf.setClass(
         ContainersMonitorImpl.RESOURCE_CALCULATOR_PLUGIN_CONFIG_KEY,
         LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
-    containerManager.init(conf);
-  }
-
-  @After
-  public void tearDown() throws IOException, InterruptedException {
-    if (containerManager != null
-        && containerManager.getServiceState() == STATE.STARTED) {
-      containerManager.stop();
-    }
-    exec.deleteAsUser(user, new Path(localDir.getAbsolutePath()),
-        new Path[] {});
+    super.setup();
   }
 
   /**
@@ -308,7 +237,7 @@ public class TestContainersMonitor {
     // No more lines
     Assert.assertEquals(null, reader.readLine());
 
-    DummyContainerManager.waitForContainerState(containerManager, cId,
+    BaseContainerManagerTest.waitForContainerState(containerManager, cId,
         ContainerState.COMPLETE, 60);
 
     GetContainerStatusRequest gcsRequest =

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Wed May  4 06:53:52 2011
@@ -101,7 +101,7 @@ public class TestNMWebServer {
           recordFactory.newRecordInstance(ContainerLaunchContext.class);
       launchContext.setContainerId(containerId);
       launchContext.setUser(user);
-      Container container = new ContainerImpl(dispatcher, launchContext) {
+      Container container = new ContainerImpl(dispatcher, launchContext, null) {
         public ContainerState getContainerState() {
           return ContainerState.RUNNING;
         };