You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC

svn commit: r1082677 [31/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+
+public class ContainersMonitorEvent extends
+    AbstractEvent<ContainersMonitorEventType> {
+
+  public ContainersMonitorEvent(ContainersMonitorEventType eventType) {
+    super(eventType);
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,24 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+public enum ContainersMonitorEventType {
+  START_MONITORING_CONTAINER,
+  STOP_MONITORING_CONTAINER
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,132 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.ContainerStatus;
+
+class DummyContainerManager extends ContainerManagerImpl {
+
+  private static final Log LOG = LogFactory
+      .getLog(DummyContainerManager.class);
+
+  public DummyContainerManager(Context context, ContainerExecutor exec,
+      DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater) {
+    super(context, exec, deletionContext, nodeStatusUpdater);
+  }
+
+  @Override
+  protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
+      DeletionService deletionContext) {
+    return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
+      @Override
+      public void handle(LocalizerEvent event) {
+        switch (event.getType()) {
+        case INIT_APPLICATION_RESOURCES:
+          Application app =
+              ((ApplicationLocalizerEvent) event).getApplication();
+          // Simulate event from ApplicationLocalization.
+          this.dispatcher.getEventHandler().handle(
+              new ApplicationInitedEvent(app.getAppId(),
+                  new HashMap<Path, String>(), new Path("workDir")));
+          break;
+        case CLEANUP_CONTAINER_RESOURCES:
+          Container container =
+              ((ContainerLocalizerEvent) event).getContainer();
+          // TODO: delete the container dir
+          this.dispatcher.getEventHandler().handle(
+              new ContainerEvent(container.getLaunchContext().id,
+                  ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
+          break;
+        case DESTROY_APPLICATION_RESOURCES:
+          // decrement reference counts of all resources associated with this
+          // app
+          break;
+        }
+      }
+    };
+  }
+
+  @Override
+  protected ContainersLauncher createContainersLauncher(Context context,
+      ContainerExecutor exec) {
+    return new ContainersLauncher(context, super.dispatcher, exec) {
+      @Override
+      public void handle(ContainersLauncherEvent event) {
+        Container container = event.getContainer();
+        ContainerID containerId = container.getLaunchContext().id;
+        switch (event.getType()) {
+        case LAUNCH_CONTAINER:
+          dispatcher.getEventHandler().handle(
+              new ContainerEvent(containerId,
+                  ContainerEventType.CONTAINER_LAUNCHED));
+          break;
+        case CLEANUP_CONTAINER:
+          dispatcher.getEventHandler().handle(
+              new ContainerEvent(containerId,
+                  ContainerEventType.CONTAINER_CLEANEDUP_AFTER_KILL));
+          break;
+        }
+      }
+    };
+  }
+
+  static void waitForContainerState(ContainerManager containerManager,
+        ContainerID containerID, ContainerState finalState)
+        throws InterruptedException, AvroRemoteException {
+      ContainerStatus containerStatus =
+          containerManager.getContainerStatus(containerID);
+      int timeoutSecs = 0;
+      while (!containerStatus.state.equals(finalState) && timeoutSecs++ < 20) {
+        Thread.sleep(1000);
+        LOG.info("Waiting for container to get into state " + finalState
+            + ". Current state is " + containerStatus.state);
+        containerStatus = containerManager.getContainerStatus(containerID);
+      }
+      LOG.info("Container state is " + containerStatus.state);
+      Assert.assertEquals("ContainerState is not correct (timedout)",
+          finalState, containerStatus.state);
+    }
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.hadoop.yarn.HeartbeatResponse;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.NodeStatus;
+import org.apache.hadoop.yarn.RegistrationResponse;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceTracker;
+
+class LocalRMInterface implements ResourceTracker {
+
+  @Override
+  public RegistrationResponse registerNodeManager(CharSequence node,
+      Resource resource) throws AvroRemoteException {
+    RegistrationResponse registrationResponse = new RegistrationResponse();
+    registrationResponse.nodeID = new NodeID();
+    return registrationResponse;
+  }
+
+  @Override
+  public HeartbeatResponse nodeHeartbeat(NodeStatus nodeStatus)
+      throws AvroRemoteException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,129 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.URL;
+import org.apache.hadoop.yarn.YarnRemoteException;
+import static org.apache.hadoop.yarn.LocalResourceType.*;
+import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+
+public class SyntheticContainerLaunch {
+
+  static final long clusterTimeStamp = System.nanoTime();
+
+  static ContainerLaunchContext getContainer(Configuration conf,
+      int appId, int cId, String user, Path tokens)
+      throws IOException, URISyntaxException {
+    ContainerLaunchContext container = new ContainerLaunchContext();
+    // id
+    ApplicationID appID = new ApplicationID();
+    appID.id = appId;
+    appID.clusterTimeStamp = clusterTimeStamp;
+    container.id = new ContainerID();
+    container.id.appID = appID;
+    container.id.id = cId;
+
+    // user
+    container.user = user;
+
+    // Resource resource
+    container.resource = new Resource();
+    container.resource.memory = 1024;
+
+    // union {null, map<LocalResource>} resources_todo;
+    container.resources = new HashMap<CharSequence,LocalResource>();
+    LocalResource resource = new LocalResource();
+    resource.resource = AvroUtil.getYarnUrlFromPath(
+        new Path("file:///home/chrisdo/work/hadoop/mapred/CHANGES.txt"));
+    resource.size = -1;
+    resource.timestamp = 1294684255000L;
+    resource.type = FILE;
+    resource.state = PRIVATE;
+    container.resources.put("dingos", resource);
+
+    //union {null, bytes} fsTokens_todo;
+    Credentials creds = new Credentials();
+    if (tokens != null) {
+      creds.readTokenStorageFile(tokens, conf);
+    }
+    DataOutputBuffer buf = new DataOutputBuffer();
+    creds.writeTokenStorageToStream(buf);
+    container.containerTokens =
+      ByteBuffer.wrap(buf.getData(), 0, buf.getLength());
+
+    //union {null, map<bytes>} serviceData;
+    container.serviceData = new HashMap<CharSequence,ByteBuffer>();
+
+    // map<string> env;
+    container.env = new HashMap<CharSequence,CharSequence>();
+    container.env.put("MY_OUTPUT_FILE", "yak.txt");
+
+    // array<string> command;
+    container.command = new ArrayList<CharSequence>();
+    container.command.add("cat");
+    container.command.add("dingos");
+    container.command.add(">");
+    container.command.add("${MY_OUTPUT_FILE}");
+    return container;
+  }
+
+  static ContainerManager getClient(Configuration conf, InetSocketAddress adr) {
+    YarnRPC rpc = YarnRPC.create(conf);
+    //conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+    //    ContainerManagerSecurityInfo.class, SecurityInfo.class);
+    return (ContainerManager) rpc.getProxy(ContainerManager.class, adr, conf);
+  }
+
+  // usage $0 nmAddr user [fstokens]
+  public static void main(String[] argv) throws Exception {
+    Configuration conf = new Configuration();
+    InetSocketAddress nmAddr = NetUtils.createSocketAddr(argv[0]);
+    ContainerManager client = getClient(conf, nmAddr);
+    Path tokens = (argv.length > 2) ? new Path(argv[2]) : null;
+    ContainerLaunchContext ctxt = getContainer(conf, 0, 0, argv[1], tokens);
+    client.startContainer(ctxt);
+    System.out.println("START: " + ctxt);
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,360 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceType;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.URL;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestContainerManager {
+
+  static {
+    DefaultMetricsSystem.setMiniClusterMode(true);
+  }
+
+  private static Log LOG = LogFactory.getLog(TestContainerManager.class);
+
+  private static File localDir = new File("target",
+      TestContainerManager.class.getName() + "-localDir").getAbsoluteFile();
+
+  private static File tmpDir = new File("target",
+      TestContainerManager.class.getName() + "-tmpDir");
+
+  private Configuration conf = new YarnConfiguration();
+  private Context context = new NMContext();
+  private ContainerExecutor exec = new DefaultContainerExecutor();
+  private DeletionService delSrvc = new DeletionService(exec);
+  private NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(context) {
+    @Override
+    protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
+      return new LocalRMInterface();
+    };
+
+    @Override
+    protected void startStatusUpdater() throws InterruptedException,
+        AvroRemoteException {
+      return; // Don't start any updating thread.
+    }
+  };
+
+  private ContainerManagerImpl containerManager = null;
+
+  @Before
+  public void setup() throws IOException {
+    FileContext localFS = FileContext.getLocalFSFileContext();
+    localFS.delete(new Path(localDir.getAbsolutePath()), true);
+    localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
+    localDir.mkdir();
+    tmpDir.mkdir();
+    LOG.info("Created localDir in " + localDir.getAbsolutePath());
+    LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
+
+    String bindAddress = "0.0.0.0:5555";
+    conf.set(NMConfig.NM_BIND_ADDRESS, bindAddress);
+    conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
+    containerManager =
+        new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater);
+    containerManager.init(conf);
+  }
+
+  @After
+  public void tearDown() {
+    if (containerManager != null
+        && containerManager.getServiceState() == STATE.STARTED) {
+      containerManager.stop();
+    }
+  }
+
+  @Test
+  public void testContainerManagerInitialization() throws IOException {
+
+    containerManager.start();
+
+    // Just do a query for a non-existing container.
+    boolean throwsException = false;
+    try {
+      containerManager.getContainerStatus(new ContainerID());
+    } catch (AvroRemoteException e) {
+      throwsException = true;
+    }
+    Assert.assertTrue(throwsException);
+  }
+
+  @Test
+  public void testContainerSetup() throws IOException, InterruptedException {
+
+    containerManager.start();
+
+    // ////// Create the resources for the container
+    File dir = new File(tmpDir, "dir");
+    dir.mkdirs();
+    File file = new File(dir, "file");
+    PrintWriter fileWriter = new PrintWriter(file);
+    fileWriter.write("Hello World!");
+    fileWriter.close();
+
+    ContainerLaunchContext container = new ContainerLaunchContext();
+
+    // ////// Construct the Container-id
+    ApplicationID appId = new ApplicationID();
+    ContainerID cId = new ContainerID();
+    cId.appID = appId;
+    container.id = cId;
+
+    String user = "dummy-user";
+    container.user = user;
+
+    // ////// Construct the container-spec.
+    ContainerLaunchContext containerLaunchContext =
+        new ContainerLaunchContext();
+    containerLaunchContext.resources =
+        new HashMap<CharSequence, LocalResource>();
+    URL resource_alpha =
+        AvroUtil.getYarnUrlFromPath(FileContext.getLocalFSFileContext()
+            .makeQualified(new Path(file.getAbsolutePath())));
+    LocalResource rsrc_alpha = new LocalResource();
+    rsrc_alpha.resource = resource_alpha;
+    rsrc_alpha.size= -1;
+    rsrc_alpha.state = LocalResourceVisibility.APPLICATION;
+    rsrc_alpha.type = LocalResourceType.FILE;
+    rsrc_alpha.timestamp = file.lastModified();
+    String destinationFile = "dest_file";
+    containerLaunchContext.resources.put(destinationFile, rsrc_alpha);
+    containerLaunchContext.user = container.user;
+    containerLaunchContext.id = container.id;
+    containerLaunchContext.command = new ArrayList<CharSequence>();
+
+    containerManager.startContainer(containerLaunchContext);
+
+    DummyContainerManager.waitForContainerState(containerManager, cId,
+        ContainerState.COMPLETE);
+
+    // Now ascertain that the resources are localised correctly.
+    // TODO: Don't we need clusterStamp in localDir?
+    File userCacheDir = new File(localDir, ApplicationLocalizer.USERCACHE);
+    File userDir = new File(userCacheDir, user);
+    File appCache = new File(userDir, ApplicationLocalizer.APPCACHE);
+    File appDir = new File(appCache, AvroUtil.toString(appId));
+    File containerDir = new File(appDir, AvroUtil.toString(cId));
+    File targetFile = new File(containerDir, destinationFile);
+    for (File f : new File[] { localDir, userCacheDir, appDir,
+        containerDir }) {
+      Assert.assertTrue(f.getAbsolutePath() + " doesn't exist!!", f.exists());
+      Assert.assertTrue(f.getAbsolutePath() + " is not a directory!!",
+          f.isDirectory());
+    }
+    Assert.assertTrue(targetFile.getAbsolutePath() + " doesn't exist!!",
+        targetFile.exists());
+
+    // Now verify the contents of the file
+    BufferedReader reader = new BufferedReader(new FileReader(targetFile));
+    Assert.assertEquals("Hello World!", reader.readLine());
+    Assert.assertEquals(null, reader.readLine());
+  }
+
+  @Test
+  public void testContainerLaunchAndStop() throws IOException, InterruptedException {
+    containerManager.start();
+
+    File scriptFile = new File(tmpDir, "scriptFile.sh");
+    PrintWriter fileWriter = new PrintWriter(scriptFile);
+    File outputFile = new File(tmpDir, "output.txt").getAbsoluteFile();
+    fileWriter.write("echo Hello World! > " + outputFile);
+    fileWriter.close();
+
+    ContainerLaunchContext containerLaunchContext = new ContainerLaunchContext();
+
+    // ////// Construct the Container-id
+    ApplicationID appId = new ApplicationID();
+    ContainerID cId = new ContainerID();
+    cId.appID = appId;
+    containerLaunchContext.id = cId;
+
+    String user = "dummy-user";
+    containerLaunchContext.user = user;
+
+    containerLaunchContext.resources =
+        new HashMap<CharSequence, LocalResource>();
+    URL resource_alpha =
+        AvroUtil.getYarnUrlFromPath(FileContext.getLocalFSFileContext()
+            .makeQualified(new Path(scriptFile.getAbsolutePath())));
+    LocalResource rsrc_alpha = new LocalResource();
+    rsrc_alpha.resource = resource_alpha;
+    rsrc_alpha.size= -1;
+    rsrc_alpha.state = LocalResourceVisibility.APPLICATION;
+    rsrc_alpha.type = LocalResourceType.FILE;
+    rsrc_alpha.timestamp = scriptFile.lastModified();
+    String destinationFile = "dest_file";
+    containerLaunchContext.resources.put(destinationFile, rsrc_alpha);
+    containerLaunchContext.user = containerLaunchContext.user;
+    List<CharSequence> commandArgs = new ArrayList<CharSequence>();
+    commandArgs.add("/bin/bash");
+    commandArgs.add(scriptFile.getAbsolutePath());
+    containerLaunchContext.command = commandArgs;
+    containerManager.startContainer(containerLaunchContext);
+
+    DummyContainerManager.waitForContainerState(containerManager, cId,
+        ContainerState.COMPLETE);
+
+    Assert.assertTrue("OutputFile doesn't exist!", outputFile.exists());
+    
+    // Now verify the contents of the file
+    BufferedReader reader = new BufferedReader(new FileReader(outputFile));
+    Assert.assertEquals("Hello World!", reader.readLine());
+    Assert.assertEquals(null, reader.readLine());
+
+    // TODO: test the stop functionality.
+  }
+
+//  @Test
+//  public void testCommandPreparation() {
+//    ContainerLaunchContext container = new ContainerLaunchContext();
+//
+//    // ////// Construct the Container-id
+//    ApplicationID appId = new ApplicationID();
+//    appId.id = 0;
+//    appId.clusterTimeStamp = 0;
+//    ContainerID containerID = new ContainerID();
+//    containerID.appID = appId;
+//    containerID.id = 0;
+//    container.id = containerID;
+//
+//    // The actual environment for the container
+//    Path containerWorkDir =
+//        NodeManager.getContainerWorkDir(new Path(localDir.getAbsolutePath()),
+//            containerID);
+//    final Map<String, String> ENVS = new HashMap<String, String>();
+//    ENVS.put("JAVA_HOME", "/my/path/to/java-home");
+//    ENVS.put("LD_LIBRARY_PATH", "/my/path/to/libraries");
+//
+//    File workDir = new File(ContainerBuilderHelper.getWorkDir());
+//    File logDir = new File(workDir, "logs");
+//    File stdout = new File(logDir, "stdout");
+//    File stderr = new File(logDir, "stderr");
+//    File tmpDir = new File(workDir, "tmp");
+//    File javaHome = new File(ContainerBuilderHelper.getEnvVar("JAVA_HOME"));
+//    String ldLibraryPath =
+//        ContainerBuilderHelper.getEnvVar("LD_LIBRARY_PATH");
+//    List<String> classPaths = new ArrayList<String>();
+//    File someJar = new File(workDir, "jar-name.jar");
+//    classPaths.add(someJar.toString());
+//    classPaths.add(workDir.toString());
+//    String PATH_SEPARATOR = System.getProperty("path.separator");
+//    String classPath = StringUtils.join(PATH_SEPARATOR, classPaths);
+//    File someFile = new File(workDir, "someFileNeededinEnv");
+//
+//    NMContainer nmContainer = new NMContainer(container, containerWorkDir) {
+//      @Override
+//      protected String checkAndGetEnvValue(String envVar) {
+//        return ENVS.get(envVar);
+//      }
+//    };
+//    List<CharSequence> command = new ArrayList<CharSequence>();
+//    command.add(javaHome + "/bin/java");
+//    command.add("-Djava.library.path=" + ldLibraryPath);
+//    command.add("-Djava.io.tmpdir=" + tmpDir);
+//    command.add("-classpath");
+//    command.add(classPath);
+//    command.add("2>" + stdout);
+//    command.add("1>" + stderr);
+//
+//    Map<String, String> env = new HashMap<String, String>();
+//    env.put("FILE_IN_ENV", someFile.toString());
+//    env.put("JAVA_HOME", javaHome.toString());
+//    env.put("LD_LIBRARY_PATH", ldLibraryPath);
+//
+//    String actualWorkDir = containerWorkDir.toUri().getPath();
+//
+//    String finalCmdSent = "";
+//    for (CharSequence cmd : command) {
+//      finalCmdSent += cmd + " ";
+//    }
+//    finalCmdSent.trim();
+//    LOG.info("Final command sent is : " + finalCmdSent);
+//
+//    // The main method being tested
+//    String[] finalCommands =
+//        nmContainer.prepareCommandArgs(command, env, actualWorkDir);
+//    // //////////////////////////////
+//
+//    String finalCmd = "";
+//    for (String cmd : finalCommands) {
+//      finalCmd += cmd + " ";
+//    }
+//    finalCmd = finalCmd.trim();
+//    LOG.info("Final command for launch is : " + finalCmd);
+//
+//    File actualLogDir = new File(actualWorkDir, "logs");
+//    File actualStdout = new File(actualLogDir, "stdout");
+//    File actualStderr = new File(actualLogDir, "stderr");
+//    File actualTmpDir = new File(actualWorkDir, "tmp");
+//    File actualSomeJar = new File(actualWorkDir, "jar-name.jar");
+//    File actualSomeFileInEnv = new File(actualWorkDir, "someFileNeededinEnv");
+//    Assert.assertEquals(actualSomeFileInEnv.toString(),
+//        env.get("FILE_IN_ENV"));
+//    Assert.assertEquals("/my/path/to/java-home", env.get("JAVA_HOME"));
+//    Assert.assertEquals("/my/path/to/libraries", env.get("LD_LIBRARY_PATH"));
+//    Assert.assertEquals("/my/path/to/java-home/bin/java"
+//        + " -Djava.library.path=/my/path/to/libraries" + " -Djava.io.tmpdir="
+//        + actualTmpDir + " -classpath " + actualSomeJar + PATH_SEPARATOR
+//        + actualWorkDir + " 2>" + actualStdout + " 1>" + actualStderr,
+//        finalCmd);
+//  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,233 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options.CreateOpts;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
+
+import static org.apache.hadoop.fs.CreateFlag.*;
+
+import org.apache.hadoop.yarn.LocalizationProtocol;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+import static org.mockito.Mockito.*;
+
+public class TestDefaultContainerExecutor {
+
+  /*
+  // XXX FileContext cannot be mocked to do this
+  static FSDataInputStream getRandomStream(Random r, int len)
+      throws IOException {
+    byte[] bytes = new byte[len];
+    r.nextBytes(bytes);
+    DataInputBuffer buf = new DataInputBuffer();
+    buf.reset(bytes, 0, bytes.length);
+    return new FSDataInputStream(new FakeFSDataInputStream(buf));
+  }
+
+  class PathEndsWith extends ArgumentMatcher<Path> {
+    final String suffix;
+    PathEndsWith(String suffix) {
+      this.suffix = suffix;
+    }
+    @Override
+    public boolean matches(Object o) {
+      return
+      suffix.equals(((Path)o).getName());
+    }
+  }
+
+  DataOutputBuffer mockStream(
+      AbstractFileSystem spylfs, Path p, Random r, int len) 
+      throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer();
+    doReturn(getRandomStream(r, len)).when(spylfs).open(p);
+    doReturn(new FileStatus(len, false, -1, -1L, -1L, p)).when(
+        spylfs).getFileStatus(argThat(new PathEndsWith(p.getName())));
+    doReturn(new FSDataOutputStream(dob)).when(spylfs).createInternal(
+        argThat(new PathEndsWith(p.getName())),
+        eq(EnumSet.of(OVERWRITE)),
+        Matchers.<FsPermission>anyObject(), anyInt(), anyShort(), anyLong(),
+        Matchers.<Progressable>anyObject(), anyInt(), anyBoolean());
+    return dob;
+  }
+  */
+
+  @AfterClass
+  public static void deleteTmpFiles() throws IOException {
+    FileContext lfs = FileContext.getLocalFSFileContext();
+    lfs.delete(new Path("target",
+          TestDefaultContainerExecutor.class.getSimpleName()), true);
+  }
+
+  byte[] createTmpFile(Path dst, Random r, int len)
+      throws IOException {
+    // use unmodified local context
+    FileContext lfs = FileContext.getLocalFSFileContext();
+    dst = lfs.makeQualified(dst);
+    lfs.mkdir(dst.getParent(), null, true);
+    byte[] bytes = new byte[len];
+    FSDataOutputStream out = null;
+    try {
+      out = lfs.create(dst, EnumSet.of(CREATE, OVERWRITE));
+      r.nextBytes(bytes);
+      out.write(bytes);
+    } finally {
+      if (out != null) out.close();
+    }
+    return bytes;
+  }
+
+//  @Test
+//  public void testInit() throws IOException, InterruptedException {
+//    Configuration conf = new Configuration();
+//    AbstractFileSystem spylfs =
+//      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+//    // don't actually create dirs
+//    //doNothing().when(spylfs).mkdir(Matchers.<Path>anyObject(),
+//    //    Matchers.<FsPermission>anyObject(), anyBoolean());
+//    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+//
+//    Path basedir = new Path("target",
+//        TestDefaultContainerExecutor.class.getSimpleName());
+//    List<String> localDirs = new ArrayList<String>();
+//    List<Path> localPaths = new ArrayList<Path>();
+//    for (int i = 0; i < 4; ++i) {
+//      Path p = new Path(basedir, i + "");
+//      lfs.mkdir(p, null, true);
+//      localPaths.add(p);
+//      localDirs.add(p.toString());
+//    }
+//    final String user = "yak";
+//    final String appId = "app_RM_0";
+//    final Path logDir = new Path(basedir, "logs");
+//    final Path nmLocal = new Path(basedir, "nmPrivate/" + user + "/" + appId);
+//    final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 4344);
+//    System.out.println("NMLOCAL: " + nmLocal);
+//    Random r = new Random();
+//
+//    /*
+//    // XXX FileContext cannot be reasonably mocked to do this
+//    // mock jobFiles copy
+//    long fileSeed = r.nextLong();
+//    r.setSeed(fileSeed);
+//    System.out.println("SEED: " + seed);
+//    Path fileCachePath = new Path(nmLocal, ApplicationLocalizer.FILECACHE_FILE);
+//    DataOutputBuffer fileCacheBytes = mockStream(spylfs, fileCachePath, r, 512);
+//
+//    // mock jobTokens copy
+//    long jobSeed = r.nextLong();
+//    r.setSeed(jobSeed);
+//    System.out.println("SEED: " + seed);
+//    Path jobTokenPath = new Path(nmLocal, ApplicationLocalizer.JOBTOKEN_FILE);
+//    DataOutputBuffer jobTokenBytes = mockStream(spylfs, jobTokenPath, r, 512);
+//    */
+//
+//    // create jobFiles
+//    long fileSeed = r.nextLong();
+//    r.setSeed(fileSeed);
+//    System.out.println("SEED: " + fileSeed);
+//    Path fileCachePath = new Path(nmLocal, ApplicationLocalizer.FILECACHE_FILE);
+//    byte[] fileCacheBytes = createTmpFile(fileCachePath, r, 512);
+//
+//    // create jobTokens
+//    long jobSeed = r.nextLong();
+//    r.setSeed(jobSeed);
+//    System.out.println("SEED: " + jobSeed);
+//    Path jobTokenPath = new Path(nmLocal, ApplicationLocalizer.JOBTOKEN_FILE);
+//    byte[] jobTokenBytes = createTmpFile(jobTokenPath, r, 512);
+//
+//    DefaultContainerExecutor dce = new DefaultContainerExecutor(lfs);
+//    Localization mockLocalization = mock(Localization.class);
+//    ApplicationLocalizer spyLocalizer =
+//      spy(new ApplicationLocalizer(lfs, user, appId, logDir,
+//            localPaths));
+//    // ignore cache localization
+//    doNothing().when(spyLocalizer).localizeFiles(
+//        Matchers.<Localization>anyObject(), Matchers.<Path>anyObject());
+//    Path workingDir = lfs.getWorkingDirectory();
+//    dce.initApplication(spyLocalizer, nmLocal, mockLocalization, localPaths);
+//    lfs.setWorkingDirectory(workingDir);
+//
+//    for (Path localdir : localPaths) {
+//      Path userdir = lfs.makeQualified(new Path(localdir,
+//            new Path(ApplicationLocalizer.USERCACHE, user)));
+//      // $localdir/$user
+//      verify(spylfs).mkdir(userdir,
+//          new FsPermission(ApplicationLocalizer.USER_PERM), true);
+//      // $localdir/$user/appcache
+//      Path jobdir = new Path(userdir, ApplicationLocalizer.appcache);
+//      verify(spylfs).mkdir(jobdir,
+//          new FsPermission(ApplicationLocalizer.appcache_PERM), true);
+//      // $localdir/$user/filecache
+//      Path filedir = new Path(userdir, ApplicationLocalizer.FILECACHE);
+//      verify(spylfs).mkdir(filedir,
+//          new FsPermission(ApplicationLocalizer.FILECACHE_PERM), true);
+//      // $localdir/$user/appcache/$appId
+//      Path appdir = new Path(jobdir, appId);
+//      verify(spylfs).mkdir(appdir,
+//          new FsPermission(ApplicationLocalizer.APPDIR_PERM), true);
+//      // $localdir/$user/appcache/$appId/work
+//      Path workdir = new Path(appdir, ApplicationLocalizer.WORKDIR);
+//      verify(spylfs, atMost(1)).mkdir(workdir, FsPermission.getDefault(), true);
+//    }
+//    // $logdir/$appId
+//    Path logdir = new Path(lfs.makeQualified(logDir), appId);
+//    verify(spylfs).mkdir(logdir,
+//        new FsPermission(ApplicationLocalizer.LOGDIR_PERM), true);
+//  }
+
+  @Test
+  public void testLaunch() throws Exception {
+  }
+
+  @Test
+  public void testSignal() throws Exception {
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    // TestDeletionService covers?
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestDeletionService {
+
+  private static final FileContext lfs = getLfs();
+  private static final FileContext getLfs() {
+    try {
+      return FileContext.getLocalFSFileContext();
+    } catch (UnsupportedFileSystemException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  private static final Path base =
+    lfs.makeQualified(new Path("target", TestDeletionService.class.getName()));
+
+  @AfterClass
+  public static void removeBase() throws IOException {
+    lfs.delete(base, true);
+  }
+
+  public List<Path> buildDirs(Random r, Path root, int numpaths)
+      throws IOException {
+    ArrayList<Path> ret = new ArrayList<Path>();
+    for (int i = 0; i < numpaths; ++i) {
+      Path p = root;
+      long name = r.nextLong();
+      do {
+        p = new Path(p, "" + name);
+        name = r.nextLong();
+      } while (0 == (name % 2));
+      ret.add(p);
+    }
+    return ret;
+  }
+
+  public void createDirs(Path base, List<Path> dirs) throws IOException {
+    for (Path dir : dirs) {
+      lfs.mkdir(new Path(base, dir), null, true);
+    }
+  }
+
+  static class FakeDefaultContainerExecutor extends DefaultContainerExecutor {
+    @Override
+    public void deleteAsUser(String user, Path subDir, Path... basedirs)
+        throws IOException, InterruptedException {
+      if ((Long.parseLong(subDir.getName()) % 2) == 0) {
+        assertNull(user);
+      } else {
+        assertEquals("dingo", user);
+      }
+      super.deleteAsUser(user, subDir, basedirs);
+      assertFalse(lfs.util().exists(subDir));
+    }
+  }
+
+  @Test
+  public void testAbsDelete() throws Exception {
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    System.out.println("SEED: " + seed);
+    List<Path> dirs = buildDirs(r, base, 20);
+    createDirs(new Path("."), dirs);
+    DeletionService del =
+      new DeletionService(new FakeDefaultContainerExecutor());
+    del.init(new Configuration());
+    del.start();
+    try {
+      for (Path p : dirs) {
+        del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
+            p, null);
+      }
+    } finally {
+      del.stop();
+    }
+    for (Path p : dirs) {
+      assertFalse(lfs.util().exists(p));
+    }
+  }
+
+  @Test
+  public void testRelativeDelete() throws Exception {
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    System.out.println("SEED: " + seed);
+    List<Path> baseDirs = buildDirs(r, base, 4);
+    createDirs(new Path("."), baseDirs);
+    List<Path> content = buildDirs(r, new Path("."), 10);
+    for (Path b : baseDirs) {
+      createDirs(b, content);
+    }
+    DeletionService del =
+      new DeletionService(new FakeDefaultContainerExecutor());
+    del.init(new Configuration());
+    del.start();
+    try {
+      for (Path p : content) {
+        assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
+        del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
+            p, baseDirs.toArray(new Path[4]));
+      }
+    } finally {
+      del.stop();
+    }
+    for (Path p : baseDirs) {
+      for (Path q : content) {
+        assertFalse(lfs.util().exists(new Path(p, q)));
+      }
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,94 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.URL;
+import org.junit.Test;
+
+public class TestEventFlow {
+
+  private static final Log LOG = LogFactory.getLog(TestEventFlow.class);
+
+  @Test
+  public void testSuccessfulContainerLaunch() throws InterruptedException,
+      AvroRemoteException {
+    Context context = new NMContext();
+
+    YarnConfiguration conf = new YarnConfiguration();
+    ContainerExecutor exec = new DefaultContainerExecutor();
+    DeletionService del = new DeletionService(exec);
+    NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(context) {
+      @Override
+      protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
+        return new LocalRMInterface();
+      };
+
+      @Override
+      protected void startStatusUpdater() throws InterruptedException,
+          AvroRemoteException {
+        return; // Don't start any updating thread.
+      }
+    };
+
+    DummyContainerManager containerManager =
+        new DummyContainerManager(context, exec, del, nodeStatusUpdater);
+    containerManager.init(new Configuration());
+    containerManager.start();
+
+    ContainerLaunchContext launchContext = new ContainerLaunchContext();
+    ContainerID cID = new ContainerID();
+    cID.appID = new ApplicationID();
+    launchContext.id = cID;
+    launchContext.user = "testing";
+    launchContext.resource = new Resource();
+    launchContext.env = new HashMap<CharSequence, CharSequence>();
+    launchContext.command = new ArrayList<CharSequence>();
+    containerManager.startContainer(launchContext);
+
+    DummyContainerManager.waitForContainerState(containerManager, cID,
+        ContainerState.RUNNING);
+
+    containerManager.stopContainer(cID);
+    DummyContainerManager.waitForContainerState(containerManager, cID,
+        ContainerState.COMPLETE);
+
+    containerManager.stop();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,168 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.AccessControlException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLinuxContainerExecutor {
+//
+//  private static final Log LOG = LogFactory
+//      .getLog(TestLinuxContainerExecutor.class);
+//
+//  // TODO: FIXME
+//  private static File workSpace = new File("target",
+//      TestLinuxContainerExecutor.class.getName() + "-workSpace");
+//
+//  @Before
+//  public void setup() throws IOException {
+//    FileContext.getLocalFSFileContext().mkdir(
+//        new Path(workSpace.getAbsolutePath()), null, true);
+//    workSpace.setReadable(true, false);
+//    workSpace.setExecutable(true, false);
+//    workSpace.setWritable(true, false);
+//  }
+//
+//  @After
+//  public void tearDown() throws AccessControlException, FileNotFoundException,
+//      UnsupportedFileSystemException, IOException {
+//    FileContext.getLocalFSFileContext().delete(
+//        new Path(workSpace.getAbsolutePath()), true);
+//  }
+//
+  @Test
+  public void testCommandFilePreparation() throws IOException {
+//    LinuxContainerExecutor executor = new LinuxContainerExecutor(new String[] {
+//        "/bin/echo", "hello" }, null, null, "nobody"); // TODO: fix user name
+//    executor.prepareCommandFile(workSpace.getAbsolutePath());
+//
+//    // Now verify the contents of the commandFile
+//    File commandFile = new File(workSpace, LinuxContainerExecutor.COMMAND_FILE);
+//    BufferedReader reader = new BufferedReader(new FileReader(commandFile));
+//    Assert.assertEquals("/bin/echo hello", reader.readLine());
+//    Assert.assertEquals(null, reader.readLine());
+//    Assert.assertTrue(commandFile.canExecute());
+  }
+//
+//  @Test
+//  public void testContainerLaunch() throws IOException {
+//    String containerExecutorPath = System
+//        .getProperty("container-executor-path");
+//    if (containerExecutorPath == null || containerExecutorPath.equals("")) {
+//      LOG.info("Not Running test for lack of container-executor-path");
+//      return;
+//    }
+//
+//    String applicationSubmitter = "nobody";
+//
+//    File touchFile = new File(workSpace, "touch-file");
+//    LinuxContainerExecutor executor = new LinuxContainerExecutor(new String[] {
+//        "touch", touchFile.getAbsolutePath() }, workSpace, null,
+//        applicationSubmitter);
+//    executor.setCommandExecutorPath(containerExecutorPath);
+//    executor.execute();
+//
+//    FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
+//        new Path(touchFile.getAbsolutePath()));
+//    Assert.assertEquals(applicationSubmitter, fileStatus.getOwner());
+//  }
+//
+//  @Test
+//  public void testContainerKill() throws IOException, InterruptedException,
+//      IllegalArgumentException, SecurityException, IllegalAccessException,
+//      NoSuchFieldException {
+//    String containerExecutorPath = System
+//        .getProperty("container-executor-path");
+//    if (containerExecutorPath == null || containerExecutorPath.equals("")) {
+//      LOG.info("Not Running test for lack of container-executor-path");
+//      return;
+//    }
+//
+//    String applicationSubmitter = "nobody";
+//    final LinuxContainerExecutor executor = new LinuxContainerExecutor(
+//        new String[] { "sleep", "100" }, workSpace, null, applicationSubmitter);
+//    executor.setCommandExecutorPath(containerExecutorPath);
+//    new Thread() {
+//      public void run() {
+//        try {
+//          executor.execute();
+//        } catch (IOException e) {
+//          // TODO Auto-generated catch block
+//          e.printStackTrace();
+//        }
+//      };
+//    }.start();
+//
+//    String pid;
+//    while ((pid = executor.getPid()) == null) {
+//      LOG.info("Sleeping for 5 seconds before checking if "
+//          + "the process is alive.");
+//      Thread.sleep(5000);
+//    }
+//    LOG.info("Going to check the liveliness of the process with pid " + pid);
+//
+//    LinuxContainerExecutor checkLiveliness = new LinuxContainerExecutor(
+//        new String[] { "kill", "-0", "-" + pid }, workSpace, null,
+//        applicationSubmitter);
+//    checkLiveliness.setCommandExecutorPath(containerExecutorPath);
+//    checkLiveliness.execute();
+//
+//    LOG.info("Process is alive. "
+//        + "Sleeping for 5 seconds before killing the process.");
+//    Thread.sleep(5000);
+//    LOG.info("Going to killing the process.");
+//
+//    executor.kill();
+//
+//    LOG.info("Sleeping for 5 seconds before checking if "
+//        + "the process is alive.");
+//    Thread.sleep(5000);
+//    LOG.info("Going to check the liveliness of the process.");
+//
+//    // TODO: fix
+//    checkLiveliness = new LinuxContainerExecutor(new String[] { "kill", "-0",
+//        "-" + pid }, workSpace, null, applicationSubmitter);
+//    checkLiveliness.setCommandExecutorPath(containerExecutorPath);
+//    boolean success = false;
+//    try {
+//      checkLiveliness.execute();
+//      success = true;
+//    } catch (IOException e) {
+//      success = false;
+//    }
+//
+//    Assert.assertFalse(success);
+//  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,236 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.HeartbeatResponse;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.NodeStatus;
+import org.apache.hadoop.yarn.RegistrationResponse;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceTracker;
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNodeStatusUpdater {
+
+  static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
+  static final Path basedir =
+      new Path("target", TestNodeStatusUpdater.class.getName());
+
+  int heartBeatID = 0;
+  volatile Error nmStartError = null;
+
+  private class MyResourceTracker implements ResourceTracker {
+
+    private Context context;
+
+    public MyResourceTracker(Context context) {
+      this.context = context;
+    }
+
+    @Override
+    public RegistrationResponse registerNodeManager(CharSequence node,
+        Resource resource) throws AvroRemoteException {
+      LOG.info("Registering " + node);
+      try {
+        Assert.assertEquals(InetAddress.getLocalHost().getHostAddress()
+            + ":12345", node);
+      } catch (UnknownHostException e) {
+        Assert.fail(e.getMessage());
+      }
+      Assert.assertEquals(5 * 1024, resource.memory);
+      RegistrationResponse regResponse = new RegistrationResponse();
+      regResponse.nodeID = new NodeID();
+      return regResponse;
+    }
+
+    ApplicationID applicationID = new ApplicationID();
+    ContainerID firstContainerID = new ContainerID();
+    ContainerID secondContainerID = new ContainerID();
+
+    @Override
+    public HeartbeatResponse nodeHeartbeat(NodeStatus nodeStatus)
+        throws AvroRemoteException {
+      LOG.info("Got heartbeat number " + heartBeatID);
+      nodeStatus.responseId = heartBeatID++;
+      if (heartBeatID == 1) {
+        Assert.assertEquals(0, nodeStatus.containers.size());
+
+        // Give a container to the NM.
+        applicationID.id = heartBeatID;
+        firstContainerID.appID = applicationID;
+        firstContainerID.id = heartBeatID;
+        ContainerLaunchContext launchContext = new ContainerLaunchContext();
+        launchContext.id = firstContainerID;
+        launchContext.resource = new Resource();
+        launchContext.resource.memory = 2; // 2GB
+        Container container = new ContainerImpl(null, launchContext);
+        this.context.getContainers().put(firstContainerID, container);
+      } else if (heartBeatID == 2) {
+        // Checks on the RM end
+        Assert.assertEquals("Number of applications should only be one!", 1,
+            nodeStatus.containers.size());
+        Assert.assertEquals("Number of container for the app should be one!",
+            1, nodeStatus.containers.get(String.valueOf(applicationID.id))
+                .size());
+        Assert.assertEquals(2,
+            nodeStatus.containers.get(String.valueOf(applicationID.id))
+                .get(0).resource.memory);
+
+        // Checks on the NM end
+        ConcurrentMap<ContainerID, Container> activeContainers =
+            this.context.getContainers();
+        Assert.assertEquals(1, activeContainers.size());
+
+        // Give another container to the NM.
+        applicationID.id = heartBeatID;
+        secondContainerID.appID = applicationID;
+        secondContainerID.id = heartBeatID;
+        ContainerLaunchContext launchContext = new ContainerLaunchContext();
+        launchContext.id = secondContainerID;
+        launchContext.resource = new Resource();
+        launchContext.resource.memory = 3; // 3GB
+        Container container = new ContainerImpl(null, launchContext);
+        this.context.getContainers().put(secondContainerID, container);
+      } else if (heartBeatID == 3) {
+        // Checks on the RM end
+        Assert.assertEquals("Number of applications should only be one!", 1,
+            nodeStatus.containers.size());
+        Assert.assertEquals("Number of container for the app should be two!",
+            2, nodeStatus.containers.get(String.valueOf(applicationID.id))
+                .size());
+        Assert.assertEquals(2,
+            nodeStatus.containers.get(String.valueOf(applicationID.id))
+                .get(0).resource.memory);
+        Assert.assertEquals(3,
+            nodeStatus.containers.get(String.valueOf(applicationID.id))
+                .get(1).resource.memory);
+
+        // Checks on the NM end
+        ConcurrentMap<ContainerID, Container> activeContainers =
+            this.context.getContainers();
+        Assert.assertEquals(2, activeContainers.size());
+      }
+      HeartbeatResponse response = new HeartbeatResponse();
+      response.responseId = heartBeatID;
+      response.containersToCleanup = new ArrayList<org.apache.hadoop.yarn.Container>();
+      response.appplicationsToCleanup = new ArrayList<ApplicationID>();
+      return response;
+    }
+  }
+
+  private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
+    private Context context;
+
+    public MyNodeStatusUpdater(Context context) {
+      super(context);
+      this.context = context;
+    }
+
+    @Override
+    protected ResourceTracker getRMClient() {
+      return new MyResourceTracker(this.context);
+    }
+  }
+
+  @Before
+  public void clearError() {
+    nmStartError = null;
+  }
+
+  @After
+  public void deleteBaseDir() throws IOException {
+    FileContext lfs = FileContext.getLocalFSFileContext();
+    lfs.delete(basedir, true);
+  }
+
+  @Test
+  public void testNMRegistration() throws InterruptedException {
+    final NodeManager nm = new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context) {
+        return new MyNodeStatusUpdater(context);
+      }
+    };
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(NMConfig.NM_RESOURCE, 5); // 5GB
+    conf.set(NMConfig.NM_BIND_ADDRESS, "127.0.0.1:12345");
+    conf.set(NMConfig.NM_LOCALIZER_BIND_ADDRESS, "127.0.0.1:12346");
+    conf.set(NMConfig.NM_LOG_DIR, new Path(basedir, "logs").toUri().getPath());
+    conf.set(NMConfig.NM_LOCAL_DIR, new Path(basedir, "nm0").toUri().getPath());
+    nm.init(conf);
+    new Thread() {
+      public void run() {
+        try {
+          nm.start();
+        } catch (Error e) {
+          TestNodeStatusUpdater.this.nmStartError = e;
+        }
+      }
+    }.start();
+
+    System.out.println(" ----- thread already started.."
+        + nm.getServiceState());
+
+    while (nm.getServiceState() == STATE.INITED) {
+      LOG.info("Waiting for NM to start..");
+      Thread.sleep(1000);
+    }
+    if (nmStartError != null) {
+      throw nmStartError;
+    }
+    if (nm.getServiceState() != STATE.STARTED) {
+      // NM could have failed.
+      Assert.fail("NodeManager failed to start");
+    }
+
+    while (heartBeatID <= 3) {
+      Thread.sleep(500);
+    }
+
+    nm.stop();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,159 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
+
+import org.apache.hadoop.yarn.ApplicationID;
+
+import static org.apache.hadoop.yarn.service.Service.STATE.*;
+
+public class TestAuxServices {
+
+  static class LightService extends AbstractService
+      implements AuxServices.AuxiliaryService {
+    private final char idef;
+    private final int expected_appId;
+    private int remaining_init;
+    private int remaining_stop;
+    LightService(String name, char idef, int expected_appId) {
+      super(name);
+      this.idef = idef;
+      this.expected_appId = expected_appId;
+    }
+    @Override
+    public void init(Configuration conf) {
+      remaining_init = conf.getInt(idef + ".expected.init", 0);
+      remaining_stop = conf.getInt(idef + ".expected.stop", 0);
+      super.init(conf);
+    }
+    @Override
+    public void stop() {
+      assertEquals(0, remaining_init);
+      assertEquals(0, remaining_stop);
+      super.stop();
+    }
+    @Override
+    public void initApp(String user, ApplicationID appId, ByteBuffer data) {
+      assertEquals(idef, data.getChar());
+      assertEquals(expected_appId, data.getInt());
+      assertEquals(expected_appId, appId.id);
+    }
+    @Override
+    public void stopApp(ApplicationID appId) {
+      assertEquals(expected_appId, appId.id);
+    }
+  }
+
+  static class ServiceA extends LightService {
+    public ServiceA() { super("A", 'A', 65); }
+  }
+
+  static class ServiceB extends LightService {
+    public ServiceB() { super("B", 'B', 66); }
+  }
+
+  @Test
+  public void testAuxEventDispatch() {
+    Configuration conf = new Configuration();
+    conf.setStrings(AuxServices.AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
+    conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Asrv"),
+        ServiceA.class, Service.class);
+    conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Bsrv"),
+        ServiceB.class, Service.class);
+    conf.setInt("A.expected.init", 1);
+    conf.setInt("B.expected.stop", 1);
+    final AuxServices aux = new AuxServices();
+    aux.init(conf);
+    aux.start();
+
+    ApplicationID appId = new ApplicationID();
+    appId.id = 65;
+    ByteBuffer buf = ByteBuffer.allocate(6);
+    buf.putChar('A');
+    buf.putInt(65);
+    buf.flip();
+    AuxServicesEvent event = new AuxServicesEvent(
+        AuxServicesEventType.APPLICATION_INIT, "user0", appId, "Asrv", buf);
+    aux.handle(event);
+    appId.id = 66;
+    event = new AuxServicesEvent(
+        AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
+  }
+
+  @Test
+  public void testAuxServices() {
+    Configuration conf = new Configuration();
+    conf.setStrings(AuxServices.AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
+    conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Asrv"),
+        ServiceA.class, Service.class);
+    conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Bsrv"),
+        ServiceB.class, Service.class);
+    final AuxServices aux = new AuxServices();
+    aux.init(conf);
+
+    int latch = 1;
+    for (Service s : aux.getServices()) {
+      assertEquals(INITED, s.getServiceState());
+      if (s instanceof ServiceA) { latch *= 2; }
+      else if (s instanceof ServiceB) { latch *= 3; }
+      else fail("Unexpected service type " + s.getClass());
+    }
+    assertEquals("Invalid mix of services", 6, latch);
+    aux.start();
+    for (Service s : aux.getServices()) {
+      assertEquals(STARTED, s.getServiceState());
+    }
+
+    aux.stop();
+    for (Service s : aux.getServices()) {
+      assertEquals(STOPPED, s.getServiceState());
+    }
+  }
+
+  @Test
+  public void testAuxUnexpectedStop() {
+    Configuration conf = new Configuration();
+    conf.setStrings(AuxServices.AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
+    conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Asrv"),
+        ServiceA.class, Service.class);
+    conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Bsrv"),
+        ServiceB.class, Service.class);
+    final AuxServices aux = new AuxServices();
+    aux.init(conf);
+    aux.start();
+
+    Service s = aux.getServices().iterator().next();
+    s.stop();
+    assertEquals("Auxiliary service stopped, but AuxService unaffected.",
+        STOPPED, aux.getServiceState());
+    assertTrue(aux.getServices().isEmpty());
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FakeFSDataInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FakeFSDataInputStream.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FakeFSDataInputStream.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FakeFSDataInputStream.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+
+/** mock streams in unit tests */
+public class FakeFSDataInputStream
+    extends FilterInputStream implements Seekable, PositionedReadable {
+  public FakeFSDataInputStream(InputStream in) { super(in); }
+  public void seek(long pos) throws IOException { }
+  public long getPos() throws IOException { return -1; }
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+  public int read(long position, byte[] buffer, int offset, int length)
+    throws IOException { return -1; }
+  public void readFully(long position, byte[] buffer, int offset, int length)
+    throws IOException { }
+  public void readFully(long position, byte[] buffer) throws IOException { }
+}