You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC
svn commit: r1082677 [31/38] - in /hadoop/mapreduce/branches/MR-279: ./
assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/
mr-client/hadoop-mapreduce-client-app/src/
mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+
+public class ContainersMonitorEvent extends
+ AbstractEvent<ContainersMonitorEventType> {
+
+ public ContainersMonitorEvent(ContainersMonitorEventType eventType) {
+ super(eventType);
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,24 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+public enum ContainersMonitorEventType {
+ START_MONITORING_CONTAINER,
+ STOP_MONITORING_CONTAINER
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,132 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.ContainerStatus;
+
+class DummyContainerManager extends ContainerManagerImpl {
+
+ private static final Log LOG = LogFactory
+ .getLog(DummyContainerManager.class);
+
+ public DummyContainerManager(Context context, ContainerExecutor exec,
+ DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater) {
+ super(context, exec, deletionContext, nodeStatusUpdater);
+ }
+
+ @Override
+ protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
+ DeletionService deletionContext) {
+ return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
+ @Override
+ public void handle(LocalizerEvent event) {
+ switch (event.getType()) {
+ case INIT_APPLICATION_RESOURCES:
+ Application app =
+ ((ApplicationLocalizerEvent) event).getApplication();
+ // Simulate event from ApplicationLocalization.
+ this.dispatcher.getEventHandler().handle(
+ new ApplicationInitedEvent(app.getAppId(),
+ new HashMap<Path, String>(), new Path("workDir")));
+ break;
+ case CLEANUP_CONTAINER_RESOURCES:
+ Container container =
+ ((ContainerLocalizerEvent) event).getContainer();
+ // TODO: delete the container dir
+ this.dispatcher.getEventHandler().handle(
+ new ContainerEvent(container.getLaunchContext().id,
+ ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
+ break;
+ case DESTROY_APPLICATION_RESOURCES:
+ // decrement reference counts of all resources associated with this
+ // app
+ break;
+ }
+ }
+ };
+ }
+
+ @Override
+ protected ContainersLauncher createContainersLauncher(Context context,
+ ContainerExecutor exec) {
+ return new ContainersLauncher(context, super.dispatcher, exec) {
+ @Override
+ public void handle(ContainersLauncherEvent event) {
+ Container container = event.getContainer();
+ ContainerID containerId = container.getLaunchContext().id;
+ switch (event.getType()) {
+ case LAUNCH_CONTAINER:
+ dispatcher.getEventHandler().handle(
+ new ContainerEvent(containerId,
+ ContainerEventType.CONTAINER_LAUNCHED));
+ break;
+ case CLEANUP_CONTAINER:
+ dispatcher.getEventHandler().handle(
+ new ContainerEvent(containerId,
+ ContainerEventType.CONTAINER_CLEANEDUP_AFTER_KILL));
+ break;
+ }
+ }
+ };
+ }
+
+ static void waitForContainerState(ContainerManager containerManager,
+ ContainerID containerID, ContainerState finalState)
+ throws InterruptedException, AvroRemoteException {
+ ContainerStatus containerStatus =
+ containerManager.getContainerStatus(containerID);
+ int timeoutSecs = 0;
+ while (!containerStatus.state.equals(finalState) && timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for container to get into state " + finalState
+ + ". Current state is " + containerStatus.state);
+ containerStatus = containerManager.getContainerStatus(containerID);
+ }
+ LOG.info("Container state is " + containerStatus.state);
+ Assert.assertEquals("ContainerState is not correct (timedout)",
+ finalState, containerStatus.state);
+ }
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.hadoop.yarn.HeartbeatResponse;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.NodeStatus;
+import org.apache.hadoop.yarn.RegistrationResponse;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceTracker;
+
+class LocalRMInterface implements ResourceTracker {
+
+ @Override
+ public RegistrationResponse registerNodeManager(CharSequence node,
+ Resource resource) throws AvroRemoteException {
+ RegistrationResponse registrationResponse = new RegistrationResponse();
+ registrationResponse.nodeID = new NodeID();
+ return registrationResponse;
+ }
+
+ @Override
+ public HeartbeatResponse nodeHeartbeat(NodeStatus nodeStatus)
+ throws AvroRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,129 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.URL;
+import org.apache.hadoop.yarn.YarnRemoteException;
+import static org.apache.hadoop.yarn.LocalResourceType.*;
+import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+
+public class SyntheticContainerLaunch {
+
+ static final long clusterTimeStamp = System.nanoTime();
+
+ static ContainerLaunchContext getContainer(Configuration conf,
+ int appId, int cId, String user, Path tokens)
+ throws IOException, URISyntaxException {
+ ContainerLaunchContext container = new ContainerLaunchContext();
+ // id
+ ApplicationID appID = new ApplicationID();
+ appID.id = appId;
+ appID.clusterTimeStamp = clusterTimeStamp;
+ container.id = new ContainerID();
+ container.id.appID = appID;
+ container.id.id = cId;
+
+ // user
+ container.user = user;
+
+ // Resource resource
+ container.resource = new Resource();
+ container.resource.memory = 1024;
+
+ // union {null, map<LocalResource>} resources_todo;
+ container.resources = new HashMap<CharSequence,LocalResource>();
+ LocalResource resource = new LocalResource();
+ resource.resource = AvroUtil.getYarnUrlFromPath(
+ new Path("file:///home/chrisdo/work/hadoop/mapred/CHANGES.txt"));
+ resource.size = -1;
+ resource.timestamp = 1294684255000L;
+ resource.type = FILE;
+ resource.state = PRIVATE;
+ container.resources.put("dingos", resource);
+
+ //union {null, bytes} fsTokens_todo;
+ Credentials creds = new Credentials();
+ if (tokens != null) {
+ creds.readTokenStorageFile(tokens, conf);
+ }
+ DataOutputBuffer buf = new DataOutputBuffer();
+ creds.writeTokenStorageToStream(buf);
+ container.containerTokens =
+ ByteBuffer.wrap(buf.getData(), 0, buf.getLength());
+
+ //union {null, map<bytes>} serviceData;
+ container.serviceData = new HashMap<CharSequence,ByteBuffer>();
+
+ // map<string> env;
+ container.env = new HashMap<CharSequence,CharSequence>();
+ container.env.put("MY_OUTPUT_FILE", "yak.txt");
+
+ // array<string> command;
+ container.command = new ArrayList<CharSequence>();
+ container.command.add("cat");
+ container.command.add("dingos");
+ container.command.add(">");
+ container.command.add("${MY_OUTPUT_FILE}");
+ return container;
+ }
+
+ static ContainerManager getClient(Configuration conf, InetSocketAddress adr) {
+ YarnRPC rpc = YarnRPC.create(conf);
+ //conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ // ContainerManagerSecurityInfo.class, SecurityInfo.class);
+ return (ContainerManager) rpc.getProxy(ContainerManager.class, adr, conf);
+ }
+
+ // usage $0 nmAddr user [fstokens]
+ public static void main(String[] argv) throws Exception {
+ Configuration conf = new Configuration();
+ InetSocketAddress nmAddr = NetUtils.createSocketAddr(argv[0]);
+ ContainerManager client = getClient(conf, nmAddr);
+ Path tokens = (argv.length > 2) ? new Path(argv[2]) : null;
+ ContainerLaunchContext ctxt = getContainer(conf, 0, 0, argv[1], tokens);
+ client.startContainer(ctxt);
+ System.out.println("START: " + ctxt);
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,360 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceType;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.URL;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestContainerManager {
+
+ static {
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ }
+
+ private static Log LOG = LogFactory.getLog(TestContainerManager.class);
+
+ private static File localDir = new File("target",
+ TestContainerManager.class.getName() + "-localDir").getAbsoluteFile();
+
+ private static File tmpDir = new File("target",
+ TestContainerManager.class.getName() + "-tmpDir");
+
+ private Configuration conf = new YarnConfiguration();
+ private Context context = new NMContext();
+ private ContainerExecutor exec = new DefaultContainerExecutor();
+ private DeletionService delSrvc = new DeletionService(exec);
+ private NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(context) {
+ @Override
+ protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
+ return new LocalRMInterface();
+ };
+
+ @Override
+ protected void startStatusUpdater() throws InterruptedException,
+ AvroRemoteException {
+ return; // Don't start any updating thread.
+ }
+ };
+
+ private ContainerManagerImpl containerManager = null;
+
+ @Before
+ public void setup() throws IOException {
+ FileContext localFS = FileContext.getLocalFSFileContext();
+ localFS.delete(new Path(localDir.getAbsolutePath()), true);
+ localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
+ localDir.mkdir();
+ tmpDir.mkdir();
+ LOG.info("Created localDir in " + localDir.getAbsolutePath());
+ LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
+
+ String bindAddress = "0.0.0.0:5555";
+ conf.set(NMConfig.NM_BIND_ADDRESS, bindAddress);
+ conf.set(NMConfig.NM_LOCAL_DIR, localDir.getAbsolutePath());
+ containerManager =
+ new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater);
+ containerManager.init(conf);
+ }
+
+ @After
+ public void tearDown() {
+ if (containerManager != null
+ && containerManager.getServiceState() == STATE.STARTED) {
+ containerManager.stop();
+ }
+ }
+
+ @Test
+ public void testContainerManagerInitialization() throws IOException {
+
+ containerManager.start();
+
+ // Just do a query for a non-existing container.
+ boolean throwsException = false;
+ try {
+ containerManager.getContainerStatus(new ContainerID());
+ } catch (AvroRemoteException e) {
+ throwsException = true;
+ }
+ Assert.assertTrue(throwsException);
+ }
+
+ @Test
+ public void testContainerSetup() throws IOException, InterruptedException {
+
+ containerManager.start();
+
+ // ////// Create the resources for the container
+ File dir = new File(tmpDir, "dir");
+ dir.mkdirs();
+ File file = new File(dir, "file");
+ PrintWriter fileWriter = new PrintWriter(file);
+ fileWriter.write("Hello World!");
+ fileWriter.close();
+
+ ContainerLaunchContext container = new ContainerLaunchContext();
+
+ // ////// Construct the Container-id
+ ApplicationID appId = new ApplicationID();
+ ContainerID cId = new ContainerID();
+ cId.appID = appId;
+ container.id = cId;
+
+ String user = "dummy-user";
+ container.user = user;
+
+ // ////// Construct the container-spec.
+ ContainerLaunchContext containerLaunchContext =
+ new ContainerLaunchContext();
+ containerLaunchContext.resources =
+ new HashMap<CharSequence, LocalResource>();
+ URL resource_alpha =
+ AvroUtil.getYarnUrlFromPath(FileContext.getLocalFSFileContext()
+ .makeQualified(new Path(file.getAbsolutePath())));
+ LocalResource rsrc_alpha = new LocalResource();
+ rsrc_alpha.resource = resource_alpha;
+ rsrc_alpha.size= -1;
+ rsrc_alpha.state = LocalResourceVisibility.APPLICATION;
+ rsrc_alpha.type = LocalResourceType.FILE;
+ rsrc_alpha.timestamp = file.lastModified();
+ String destinationFile = "dest_file";
+ containerLaunchContext.resources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.user = container.user;
+ containerLaunchContext.id = container.id;
+ containerLaunchContext.command = new ArrayList<CharSequence>();
+
+ containerManager.startContainer(containerLaunchContext);
+
+ DummyContainerManager.waitForContainerState(containerManager, cId,
+ ContainerState.COMPLETE);
+
+ // Now ascertain that the resources are localised correctly.
+ // TODO: Don't we need clusterStamp in localDir?
+ File userCacheDir = new File(localDir, ApplicationLocalizer.USERCACHE);
+ File userDir = new File(userCacheDir, user);
+ File appCache = new File(userDir, ApplicationLocalizer.APPCACHE);
+ File appDir = new File(appCache, AvroUtil.toString(appId));
+ File containerDir = new File(appDir, AvroUtil.toString(cId));
+ File targetFile = new File(containerDir, destinationFile);
+ for (File f : new File[] { localDir, userCacheDir, appDir,
+ containerDir }) {
+ Assert.assertTrue(f.getAbsolutePath() + " doesn't exist!!", f.exists());
+ Assert.assertTrue(f.getAbsolutePath() + " is not a directory!!",
+ f.isDirectory());
+ }
+ Assert.assertTrue(targetFile.getAbsolutePath() + " doesn't exist!!",
+ targetFile.exists());
+
+ // Now verify the contents of the file
+ BufferedReader reader = new BufferedReader(new FileReader(targetFile));
+ Assert.assertEquals("Hello World!", reader.readLine());
+ Assert.assertEquals(null, reader.readLine());
+ }
+
+ @Test
+ public void testContainerLaunchAndStop() throws IOException, InterruptedException {
+ containerManager.start();
+
+ File scriptFile = new File(tmpDir, "scriptFile.sh");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ File outputFile = new File(tmpDir, "output.txt").getAbsoluteFile();
+ fileWriter.write("echo Hello World! > " + outputFile);
+ fileWriter.close();
+
+ ContainerLaunchContext containerLaunchContext = new ContainerLaunchContext();
+
+ // ////// Construct the Container-id
+ ApplicationID appId = new ApplicationID();
+ ContainerID cId = new ContainerID();
+ cId.appID = appId;
+ containerLaunchContext.id = cId;
+
+ String user = "dummy-user";
+ containerLaunchContext.user = user;
+
+ containerLaunchContext.resources =
+ new HashMap<CharSequence, LocalResource>();
+ URL resource_alpha =
+ AvroUtil.getYarnUrlFromPath(FileContext.getLocalFSFileContext()
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha = new LocalResource();
+ rsrc_alpha.resource = resource_alpha;
+ rsrc_alpha.size= -1;
+ rsrc_alpha.state = LocalResourceVisibility.APPLICATION;
+ rsrc_alpha.type = LocalResourceType.FILE;
+ rsrc_alpha.timestamp = scriptFile.lastModified();
+ String destinationFile = "dest_file";
+ containerLaunchContext.resources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.user = containerLaunchContext.user;
+ List<CharSequence> commandArgs = new ArrayList<CharSequence>();
+ commandArgs.add("/bin/bash");
+ commandArgs.add(scriptFile.getAbsolutePath());
+ containerLaunchContext.command = commandArgs;
+ containerManager.startContainer(containerLaunchContext);
+
+ DummyContainerManager.waitForContainerState(containerManager, cId,
+ ContainerState.COMPLETE);
+
+ Assert.assertTrue("OutputFile doesn't exist!", outputFile.exists());
+
+ // Now verify the contents of the file
+ BufferedReader reader = new BufferedReader(new FileReader(outputFile));
+ Assert.assertEquals("Hello World!", reader.readLine());
+ Assert.assertEquals(null, reader.readLine());
+
+ // TODO: test the stop functionality.
+ }
+
+// @Test
+// public void testCommandPreparation() {
+// ContainerLaunchContext container = new ContainerLaunchContext();
+//
+// // ////// Construct the Container-id
+// ApplicationID appId = new ApplicationID();
+// appId.id = 0;
+// appId.clusterTimeStamp = 0;
+// ContainerID containerID = new ContainerID();
+// containerID.appID = appId;
+// containerID.id = 0;
+// container.id = containerID;
+//
+// // The actual environment for the container
+// Path containerWorkDir =
+// NodeManager.getContainerWorkDir(new Path(localDir.getAbsolutePath()),
+// containerID);
+// final Map<String, String> ENVS = new HashMap<String, String>();
+// ENVS.put("JAVA_HOME", "/my/path/to/java-home");
+// ENVS.put("LD_LIBRARY_PATH", "/my/path/to/libraries");
+//
+// File workDir = new File(ContainerBuilderHelper.getWorkDir());
+// File logDir = new File(workDir, "logs");
+// File stdout = new File(logDir, "stdout");
+// File stderr = new File(logDir, "stderr");
+// File tmpDir = new File(workDir, "tmp");
+// File javaHome = new File(ContainerBuilderHelper.getEnvVar("JAVA_HOME"));
+// String ldLibraryPath =
+// ContainerBuilderHelper.getEnvVar("LD_LIBRARY_PATH");
+// List<String> classPaths = new ArrayList<String>();
+// File someJar = new File(workDir, "jar-name.jar");
+// classPaths.add(someJar.toString());
+// classPaths.add(workDir.toString());
+// String PATH_SEPARATOR = System.getProperty("path.separator");
+// String classPath = StringUtils.join(PATH_SEPARATOR, classPaths);
+// File someFile = new File(workDir, "someFileNeededinEnv");
+//
+// NMContainer nmContainer = new NMContainer(container, containerWorkDir) {
+// @Override
+// protected String checkAndGetEnvValue(String envVar) {
+// return ENVS.get(envVar);
+// }
+// };
+// List<CharSequence> command = new ArrayList<CharSequence>();
+// command.add(javaHome + "/bin/java");
+// command.add("-Djava.library.path=" + ldLibraryPath);
+// command.add("-Djava.io.tmpdir=" + tmpDir);
+// command.add("-classpath");
+// command.add(classPath);
+// command.add("2>" + stdout);
+// command.add("1>" + stderr);
+//
+// Map<String, String> env = new HashMap<String, String>();
+// env.put("FILE_IN_ENV", someFile.toString());
+// env.put("JAVA_HOME", javaHome.toString());
+// env.put("LD_LIBRARY_PATH", ldLibraryPath);
+//
+// String actualWorkDir = containerWorkDir.toUri().getPath();
+//
+// String finalCmdSent = "";
+// for (CharSequence cmd : command) {
+// finalCmdSent += cmd + " ";
+// }
+// finalCmdSent.trim();
+// LOG.info("Final command sent is : " + finalCmdSent);
+//
+// // The main method being tested
+// String[] finalCommands =
+// nmContainer.prepareCommandArgs(command, env, actualWorkDir);
+// // //////////////////////////////
+//
+// String finalCmd = "";
+// for (String cmd : finalCommands) {
+// finalCmd += cmd + " ";
+// }
+// finalCmd = finalCmd.trim();
+// LOG.info("Final command for launch is : " + finalCmd);
+//
+// File actualLogDir = new File(actualWorkDir, "logs");
+// File actualStdout = new File(actualLogDir, "stdout");
+// File actualStderr = new File(actualLogDir, "stderr");
+// File actualTmpDir = new File(actualWorkDir, "tmp");
+// File actualSomeJar = new File(actualWorkDir, "jar-name.jar");
+// File actualSomeFileInEnv = new File(actualWorkDir, "someFileNeededinEnv");
+// Assert.assertEquals(actualSomeFileInEnv.toString(),
+// env.get("FILE_IN_ENV"));
+// Assert.assertEquals("/my/path/to/java-home", env.get("JAVA_HOME"));
+// Assert.assertEquals("/my/path/to/libraries", env.get("LD_LIBRARY_PATH"));
+// Assert.assertEquals("/my/path/to/java-home/bin/java"
+// + " -Djava.library.path=/my/path/to/libraries" + " -Djava.io.tmpdir="
+// + actualTmpDir + " -classpath " + actualSomeJar + PATH_SEPARATOR
+// + actualWorkDir + " 2>" + actualStdout + " 1>" + actualStderr,
+// finalCmd);
+// }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,233 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options.CreateOpts;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
+
+import static org.apache.hadoop.fs.CreateFlag.*;
+
+import org.apache.hadoop.yarn.LocalizationProtocol;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+import static org.mockito.Mockito.*;
+
+public class TestDefaultContainerExecutor {
+
+ /*
+ // XXX FileContext cannot be mocked to do this
+ static FSDataInputStream getRandomStream(Random r, int len)
+ throws IOException {
+ byte[] bytes = new byte[len];
+ r.nextBytes(bytes);
+ DataInputBuffer buf = new DataInputBuffer();
+ buf.reset(bytes, 0, bytes.length);
+ return new FSDataInputStream(new FakeFSDataInputStream(buf));
+ }
+
+ class PathEndsWith extends ArgumentMatcher<Path> {
+ final String suffix;
+ PathEndsWith(String suffix) {
+ this.suffix = suffix;
+ }
+ @Override
+ public boolean matches(Object o) {
+ return
+ suffix.equals(((Path)o).getName());
+ }
+ }
+
+ DataOutputBuffer mockStream(
+ AbstractFileSystem spylfs, Path p, Random r, int len)
+ throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ doReturn(getRandomStream(r, len)).when(spylfs).open(p);
+ doReturn(new FileStatus(len, false, -1, -1L, -1L, p)).when(
+ spylfs).getFileStatus(argThat(new PathEndsWith(p.getName())));
+ doReturn(new FSDataOutputStream(dob)).when(spylfs).createInternal(
+ argThat(new PathEndsWith(p.getName())),
+ eq(EnumSet.of(OVERWRITE)),
+ Matchers.<FsPermission>anyObject(), anyInt(), anyShort(), anyLong(),
+ Matchers.<Progressable>anyObject(), anyInt(), anyBoolean());
+ return dob;
+ }
+ */
+
+ @AfterClass
+ public static void deleteTmpFiles() throws IOException {
+ FileContext lfs = FileContext.getLocalFSFileContext();
+ lfs.delete(new Path("target",
+ TestDefaultContainerExecutor.class.getSimpleName()), true);
+ }
+
+ byte[] createTmpFile(Path dst, Random r, int len)
+ throws IOException {
+ // use unmodified local context
+ FileContext lfs = FileContext.getLocalFSFileContext();
+ dst = lfs.makeQualified(dst);
+ lfs.mkdir(dst.getParent(), null, true);
+ byte[] bytes = new byte[len];
+ FSDataOutputStream out = null;
+ try {
+ out = lfs.create(dst, EnumSet.of(CREATE, OVERWRITE));
+ r.nextBytes(bytes);
+ out.write(bytes);
+ } finally {
+ if (out != null) out.close();
+ }
+ return bytes;
+ }
+
+// @Test
+// public void testInit() throws IOException, InterruptedException {
+// Configuration conf = new Configuration();
+// AbstractFileSystem spylfs =
+// spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+// // don't actually create dirs
+// //doNothing().when(spylfs).mkdir(Matchers.<Path>anyObject(),
+// // Matchers.<FsPermission>anyObject(), anyBoolean());
+// FileContext lfs = FileContext.getFileContext(spylfs, conf);
+//
+// Path basedir = new Path("target",
+// TestDefaultContainerExecutor.class.getSimpleName());
+// List<String> localDirs = new ArrayList<String>();
+// List<Path> localPaths = new ArrayList<Path>();
+// for (int i = 0; i < 4; ++i) {
+// Path p = new Path(basedir, i + "");
+// lfs.mkdir(p, null, true);
+// localPaths.add(p);
+// localDirs.add(p.toString());
+// }
+// final String user = "yak";
+// final String appId = "app_RM_0";
+// final Path logDir = new Path(basedir, "logs");
+// final Path nmLocal = new Path(basedir, "nmPrivate/" + user + "/" + appId);
+// final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 4344);
+// System.out.println("NMLOCAL: " + nmLocal);
+// Random r = new Random();
+//
+// /*
+// // XXX FileContext cannot be reasonably mocked to do this
+// // mock jobFiles copy
+// long fileSeed = r.nextLong();
+// r.setSeed(fileSeed);
+// System.out.println("SEED: " + seed);
+// Path fileCachePath = new Path(nmLocal, ApplicationLocalizer.FILECACHE_FILE);
+// DataOutputBuffer fileCacheBytes = mockStream(spylfs, fileCachePath, r, 512);
+//
+// // mock jobTokens copy
+// long jobSeed = r.nextLong();
+// r.setSeed(jobSeed);
+// System.out.println("SEED: " + seed);
+// Path jobTokenPath = new Path(nmLocal, ApplicationLocalizer.JOBTOKEN_FILE);
+// DataOutputBuffer jobTokenBytes = mockStream(spylfs, jobTokenPath, r, 512);
+// */
+//
+// // create jobFiles
+// long fileSeed = r.nextLong();
+// r.setSeed(fileSeed);
+// System.out.println("SEED: " + fileSeed);
+// Path fileCachePath = new Path(nmLocal, ApplicationLocalizer.FILECACHE_FILE);
+// byte[] fileCacheBytes = createTmpFile(fileCachePath, r, 512);
+//
+// // create jobTokens
+// long jobSeed = r.nextLong();
+// r.setSeed(jobSeed);
+// System.out.println("SEED: " + jobSeed);
+// Path jobTokenPath = new Path(nmLocal, ApplicationLocalizer.JOBTOKEN_FILE);
+// byte[] jobTokenBytes = createTmpFile(jobTokenPath, r, 512);
+//
+// DefaultContainerExecutor dce = new DefaultContainerExecutor(lfs);
+// Localization mockLocalization = mock(Localization.class);
+// ApplicationLocalizer spyLocalizer =
+// spy(new ApplicationLocalizer(lfs, user, appId, logDir,
+// localPaths));
+// // ignore cache localization
+// doNothing().when(spyLocalizer).localizeFiles(
+// Matchers.<Localization>anyObject(), Matchers.<Path>anyObject());
+// Path workingDir = lfs.getWorkingDirectory();
+// dce.initApplication(spyLocalizer, nmLocal, mockLocalization, localPaths);
+// lfs.setWorkingDirectory(workingDir);
+//
+// for (Path localdir : localPaths) {
+// Path userdir = lfs.makeQualified(new Path(localdir,
+// new Path(ApplicationLocalizer.USERCACHE, user)));
+// // $localdir/$user
+// verify(spylfs).mkdir(userdir,
+// new FsPermission(ApplicationLocalizer.USER_PERM), true);
+// // $localdir/$user/appcache
+// Path jobdir = new Path(userdir, ApplicationLocalizer.appcache);
+// verify(spylfs).mkdir(jobdir,
+// new FsPermission(ApplicationLocalizer.appcache_PERM), true);
+// // $localdir/$user/filecache
+// Path filedir = new Path(userdir, ApplicationLocalizer.FILECACHE);
+// verify(spylfs).mkdir(filedir,
+// new FsPermission(ApplicationLocalizer.FILECACHE_PERM), true);
+// // $localdir/$user/appcache/$appId
+// Path appdir = new Path(jobdir, appId);
+// verify(spylfs).mkdir(appdir,
+// new FsPermission(ApplicationLocalizer.APPDIR_PERM), true);
+// // $localdir/$user/appcache/$appId/work
+// Path workdir = new Path(appdir, ApplicationLocalizer.WORKDIR);
+// verify(spylfs, atMost(1)).mkdir(workdir, FsPermission.getDefault(), true);
+// }
+// // $logdir/$appId
+// Path logdir = new Path(lfs.makeQualified(logDir), appId);
+// verify(spylfs).mkdir(logdir,
+// new FsPermission(ApplicationLocalizer.LOGDIR_PERM), true);
+// }
+
+ @Test
+ public void testLaunch() throws Exception {
+ }
+
+ @Test
+ public void testSignal() throws Exception {
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ // TestDeletionService covers?
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestDeletionService {
+
+ private static final FileContext lfs = getLfs();
+ private static final FileContext getLfs() {
+ try {
+ return FileContext.getLocalFSFileContext();
+ } catch (UnsupportedFileSystemException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ private static final Path base =
+ lfs.makeQualified(new Path("target", TestDeletionService.class.getName()));
+
+ @AfterClass
+ public static void removeBase() throws IOException {
+ lfs.delete(base, true);
+ }
+
+ public List<Path> buildDirs(Random r, Path root, int numpaths)
+ throws IOException {
+ ArrayList<Path> ret = new ArrayList<Path>();
+ for (int i = 0; i < numpaths; ++i) {
+ Path p = root;
+ long name = r.nextLong();
+ do {
+ p = new Path(p, "" + name);
+ name = r.nextLong();
+ } while (0 == (name % 2));
+ ret.add(p);
+ }
+ return ret;
+ }
+
+ public void createDirs(Path base, List<Path> dirs) throws IOException {
+ for (Path dir : dirs) {
+ lfs.mkdir(new Path(base, dir), null, true);
+ }
+ }
+
+ static class FakeDefaultContainerExecutor extends DefaultContainerExecutor {
+ @Override
+ public void deleteAsUser(String user, Path subDir, Path... basedirs)
+ throws IOException, InterruptedException {
+ if ((Long.parseLong(subDir.getName()) % 2) == 0) {
+ assertNull(user);
+ } else {
+ assertEquals("dingo", user);
+ }
+ super.deleteAsUser(user, subDir, basedirs);
+ assertFalse(lfs.util().exists(subDir));
+ }
+ }
+
+ @Test
+ public void testAbsDelete() throws Exception {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ List<Path> dirs = buildDirs(r, base, 20);
+ createDirs(new Path("."), dirs);
+ DeletionService del =
+ new DeletionService(new FakeDefaultContainerExecutor());
+ del.init(new Configuration());
+ del.start();
+ try {
+ for (Path p : dirs) {
+ del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
+ p, null);
+ }
+ } finally {
+ del.stop();
+ }
+ for (Path p : dirs) {
+ assertFalse(lfs.util().exists(p));
+ }
+ }
+
+ @Test
+ public void testRelativeDelete() throws Exception {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ List<Path> baseDirs = buildDirs(r, base, 4);
+ createDirs(new Path("."), baseDirs);
+ List<Path> content = buildDirs(r, new Path("."), 10);
+ for (Path b : baseDirs) {
+ createDirs(b, content);
+ }
+ DeletionService del =
+ new DeletionService(new FakeDefaultContainerExecutor());
+ del.init(new Configuration());
+ del.start();
+ try {
+ for (Path p : content) {
+ assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
+ del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
+ p, baseDirs.toArray(new Path[4]));
+ }
+ } finally {
+ del.stop();
+ }
+ for (Path p : baseDirs) {
+ for (Path q : content) {
+ assertFalse(lfs.util().exists(new Path(p, q)));
+ }
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,94 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerState;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.URL;
+import org.junit.Test;
+
+public class TestEventFlow {
+
+ private static final Log LOG = LogFactory.getLog(TestEventFlow.class);
+
+ @Test
+ public void testSuccessfulContainerLaunch() throws InterruptedException,
+ AvroRemoteException {
+ Context context = new NMContext();
+
+ YarnConfiguration conf = new YarnConfiguration();
+ ContainerExecutor exec = new DefaultContainerExecutor();
+ DeletionService del = new DeletionService(exec);
+ NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(context) {
+ @Override
+ protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
+ return new LocalRMInterface();
+ };
+
+ @Override
+ protected void startStatusUpdater() throws InterruptedException,
+ AvroRemoteException {
+ return; // Don't start any updating thread.
+ }
+ };
+
+ DummyContainerManager containerManager =
+ new DummyContainerManager(context, exec, del, nodeStatusUpdater);
+ containerManager.init(new Configuration());
+ containerManager.start();
+
+ ContainerLaunchContext launchContext = new ContainerLaunchContext();
+ ContainerID cID = new ContainerID();
+ cID.appID = new ApplicationID();
+ launchContext.id = cID;
+ launchContext.user = "testing";
+ launchContext.resource = new Resource();
+ launchContext.env = new HashMap<CharSequence, CharSequence>();
+ launchContext.command = new ArrayList<CharSequence>();
+ containerManager.startContainer(launchContext);
+
+ DummyContainerManager.waitForContainerState(containerManager, cID,
+ ContainerState.RUNNING);
+
+ containerManager.stopContainer(cID);
+ DummyContainerManager.waitForContainerState(containerManager, cID,
+ ContainerState.COMPLETE);
+
+ containerManager.stop();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,168 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.AccessControlException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLinuxContainerExecutor {
+//
+// private static final Log LOG = LogFactory
+// .getLog(TestLinuxContainerExecutor.class);
+//
+// // TODO: FIXME
+// private static File workSpace = new File("target",
+// TestLinuxContainerExecutor.class.getName() + "-workSpace");
+//
+// @Before
+// public void setup() throws IOException {
+// FileContext.getLocalFSFileContext().mkdir(
+// new Path(workSpace.getAbsolutePath()), null, true);
+// workSpace.setReadable(true, false);
+// workSpace.setExecutable(true, false);
+// workSpace.setWritable(true, false);
+// }
+//
+// @After
+// public void tearDown() throws AccessControlException, FileNotFoundException,
+// UnsupportedFileSystemException, IOException {
+// FileContext.getLocalFSFileContext().delete(
+// new Path(workSpace.getAbsolutePath()), true);
+// }
+//
+ @Test
+ public void testCommandFilePreparation() throws IOException {
+// LinuxContainerExecutor executor = new LinuxContainerExecutor(new String[] {
+// "/bin/echo", "hello" }, null, null, "nobody"); // TODO: fix user name
+// executor.prepareCommandFile(workSpace.getAbsolutePath());
+//
+// // Now verify the contents of the commandFile
+// File commandFile = new File(workSpace, LinuxContainerExecutor.COMMAND_FILE);
+// BufferedReader reader = new BufferedReader(new FileReader(commandFile));
+// Assert.assertEquals("/bin/echo hello", reader.readLine());
+// Assert.assertEquals(null, reader.readLine());
+// Assert.assertTrue(commandFile.canExecute());
+ }
+//
+// @Test
+// public void testContainerLaunch() throws IOException {
+// String containerExecutorPath = System
+// .getProperty("container-executor-path");
+// if (containerExecutorPath == null || containerExecutorPath.equals("")) {
+// LOG.info("Not Running test for lack of container-executor-path");
+// return;
+// }
+//
+// String applicationSubmitter = "nobody";
+//
+// File touchFile = new File(workSpace, "touch-file");
+// LinuxContainerExecutor executor = new LinuxContainerExecutor(new String[] {
+// "touch", touchFile.getAbsolutePath() }, workSpace, null,
+// applicationSubmitter);
+// executor.setCommandExecutorPath(containerExecutorPath);
+// executor.execute();
+//
+// FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
+// new Path(touchFile.getAbsolutePath()));
+// Assert.assertEquals(applicationSubmitter, fileStatus.getOwner());
+// }
+//
+// @Test
+// public void testContainerKill() throws IOException, InterruptedException,
+// IllegalArgumentException, SecurityException, IllegalAccessException,
+// NoSuchFieldException {
+// String containerExecutorPath = System
+// .getProperty("container-executor-path");
+// if (containerExecutorPath == null || containerExecutorPath.equals("")) {
+// LOG.info("Not Running test for lack of container-executor-path");
+// return;
+// }
+//
+// String applicationSubmitter = "nobody";
+// final LinuxContainerExecutor executor = new LinuxContainerExecutor(
+// new String[] { "sleep", "100" }, workSpace, null, applicationSubmitter);
+// executor.setCommandExecutorPath(containerExecutorPath);
+// new Thread() {
+// public void run() {
+// try {
+// executor.execute();
+// } catch (IOException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// }
+// };
+// }.start();
+//
+// String pid;
+// while ((pid = executor.getPid()) == null) {
+// LOG.info("Sleeping for 5 seconds before checking if "
+// + "the process is alive.");
+// Thread.sleep(5000);
+// }
+// LOG.info("Going to check the liveliness of the process with pid " + pid);
+//
+// LinuxContainerExecutor checkLiveliness = new LinuxContainerExecutor(
+// new String[] { "kill", "-0", "-" + pid }, workSpace, null,
+// applicationSubmitter);
+// checkLiveliness.setCommandExecutorPath(containerExecutorPath);
+// checkLiveliness.execute();
+//
+// LOG.info("Process is alive. "
+// + "Sleeping for 5 seconds before killing the process.");
+// Thread.sleep(5000);
+// LOG.info("Going to killing the process.");
+//
+// executor.kill();
+//
+// LOG.info("Sleeping for 5 seconds before checking if "
+// + "the process is alive.");
+// Thread.sleep(5000);
+// LOG.info("Going to check the liveliness of the process.");
+//
+// // TODO: fix
+// checkLiveliness = new LinuxContainerExecutor(new String[] { "kill", "-0",
+// "-" + pid }, workSpace, null, applicationSubmitter);
+// checkLiveliness.setCommandExecutorPath(containerExecutorPath);
+// boolean success = false;
+// try {
+// checkLiveliness.execute();
+// success = true;
+// } catch (IOException e) {
+// success = false;
+// }
+//
+// Assert.assertFalse(success);
+// }
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,236 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.HeartbeatResponse;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.NodeStatus;
+import org.apache.hadoop.yarn.RegistrationResponse;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceTracker;
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNodeStatusUpdater {
+
+ static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
+ static final Path basedir =
+ new Path("target", TestNodeStatusUpdater.class.getName());
+
+ int heartBeatID = 0;
+ volatile Error nmStartError = null;
+
+ private class MyResourceTracker implements ResourceTracker {
+
+ private Context context;
+
+ public MyResourceTracker(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public RegistrationResponse registerNodeManager(CharSequence node,
+ Resource resource) throws AvroRemoteException {
+ LOG.info("Registering " + node);
+ try {
+ Assert.assertEquals(InetAddress.getLocalHost().getHostAddress()
+ + ":12345", node);
+ } catch (UnknownHostException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertEquals(5 * 1024, resource.memory);
+ RegistrationResponse regResponse = new RegistrationResponse();
+ regResponse.nodeID = new NodeID();
+ return regResponse;
+ }
+
+ ApplicationID applicationID = new ApplicationID();
+ ContainerID firstContainerID = new ContainerID();
+ ContainerID secondContainerID = new ContainerID();
+
+ @Override
+ public HeartbeatResponse nodeHeartbeat(NodeStatus nodeStatus)
+ throws AvroRemoteException {
+ LOG.info("Got heartbeat number " + heartBeatID);
+ nodeStatus.responseId = heartBeatID++;
+ if (heartBeatID == 1) {
+ Assert.assertEquals(0, nodeStatus.containers.size());
+
+ // Give a container to the NM.
+ applicationID.id = heartBeatID;
+ firstContainerID.appID = applicationID;
+ firstContainerID.id = heartBeatID;
+ ContainerLaunchContext launchContext = new ContainerLaunchContext();
+ launchContext.id = firstContainerID;
+ launchContext.resource = new Resource();
+ launchContext.resource.memory = 2; // 2GB
+ Container container = new ContainerImpl(null, launchContext);
+ this.context.getContainers().put(firstContainerID, container);
+ } else if (heartBeatID == 2) {
+ // Checks on the RM end
+ Assert.assertEquals("Number of applications should only be one!", 1,
+ nodeStatus.containers.size());
+ Assert.assertEquals("Number of container for the app should be one!",
+ 1, nodeStatus.containers.get(String.valueOf(applicationID.id))
+ .size());
+ Assert.assertEquals(2,
+ nodeStatus.containers.get(String.valueOf(applicationID.id))
+ .get(0).resource.memory);
+
+ // Checks on the NM end
+ ConcurrentMap<ContainerID, Container> activeContainers =
+ this.context.getContainers();
+ Assert.assertEquals(1, activeContainers.size());
+
+ // Give another container to the NM.
+ applicationID.id = heartBeatID;
+ secondContainerID.appID = applicationID;
+ secondContainerID.id = heartBeatID;
+ ContainerLaunchContext launchContext = new ContainerLaunchContext();
+ launchContext.id = secondContainerID;
+ launchContext.resource = new Resource();
+ launchContext.resource.memory = 3; // 3GB
+ Container container = new ContainerImpl(null, launchContext);
+ this.context.getContainers().put(secondContainerID, container);
+ } else if (heartBeatID == 3) {
+ // Checks on the RM end
+ Assert.assertEquals("Number of applications should only be one!", 1,
+ nodeStatus.containers.size());
+ Assert.assertEquals("Number of container for the app should be two!",
+ 2, nodeStatus.containers.get(String.valueOf(applicationID.id))
+ .size());
+ Assert.assertEquals(2,
+ nodeStatus.containers.get(String.valueOf(applicationID.id))
+ .get(0).resource.memory);
+ Assert.assertEquals(3,
+ nodeStatus.containers.get(String.valueOf(applicationID.id))
+ .get(1).resource.memory);
+
+ // Checks on the NM end
+ ConcurrentMap<ContainerID, Container> activeContainers =
+ this.context.getContainers();
+ Assert.assertEquals(2, activeContainers.size());
+ }
+ HeartbeatResponse response = new HeartbeatResponse();
+ response.responseId = heartBeatID;
+ response.containersToCleanup = new ArrayList<org.apache.hadoop.yarn.Container>();
+ response.appplicationsToCleanup = new ArrayList<ApplicationID>();
+ return response;
+ }
+ }
+
+ private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
+ private Context context;
+
+ public MyNodeStatusUpdater(Context context) {
+ super(context);
+ this.context = context;
+ }
+
+ @Override
+ protected ResourceTracker getRMClient() {
+ return new MyResourceTracker(this.context);
+ }
+ }
+
+ @Before
+ public void clearError() {
+ nmStartError = null;
+ }
+
+ @After
+ public void deleteBaseDir() throws IOException {
+ FileContext lfs = FileContext.getLocalFSFileContext();
+ lfs.delete(basedir, true);
+ }
+
+ @Test
+ public void testNMRegistration() throws InterruptedException {
+ final NodeManager nm = new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context) {
+ return new MyNodeStatusUpdater(context);
+ }
+ };
+
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(NMConfig.NM_RESOURCE, 5); // 5GB
+ conf.set(NMConfig.NM_BIND_ADDRESS, "127.0.0.1:12345");
+ conf.set(NMConfig.NM_LOCALIZER_BIND_ADDRESS, "127.0.0.1:12346");
+ conf.set(NMConfig.NM_LOG_DIR, new Path(basedir, "logs").toUri().getPath());
+ conf.set(NMConfig.NM_LOCAL_DIR, new Path(basedir, "nm0").toUri().getPath());
+ nm.init(conf);
+ new Thread() {
+ public void run() {
+ try {
+ nm.start();
+ } catch (Error e) {
+ TestNodeStatusUpdater.this.nmStartError = e;
+ }
+ }
+ }.start();
+
+ System.out.println(" ----- thread already started.."
+ + nm.getServiceState());
+
+ while (nm.getServiceState() == STATE.INITED) {
+ LOG.info("Waiting for NM to start..");
+ Thread.sleep(1000);
+ }
+ if (nmStartError != null) {
+ throw nmStartError;
+ }
+ if (nm.getServiceState() != STATE.STARTED) {
+ // NM could have failed.
+ Assert.fail("NodeManager failed to start");
+ }
+
+ while (heartBeatID <= 3) {
+ Thread.sleep(500);
+ }
+
+ nm.stop();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,159 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
+
+import org.apache.hadoop.yarn.ApplicationID;
+
+import static org.apache.hadoop.yarn.service.Service.STATE.*;
+
+public class TestAuxServices {
+
+ static class LightService extends AbstractService
+ implements AuxServices.AuxiliaryService {
+ private final char idef;
+ private final int expected_appId;
+ private int remaining_init;
+ private int remaining_stop;
+ LightService(String name, char idef, int expected_appId) {
+ super(name);
+ this.idef = idef;
+ this.expected_appId = expected_appId;
+ }
+ @Override
+ public void init(Configuration conf) {
+ remaining_init = conf.getInt(idef + ".expected.init", 0);
+ remaining_stop = conf.getInt(idef + ".expected.stop", 0);
+ super.init(conf);
+ }
+ @Override
+ public void stop() {
+ assertEquals(0, remaining_init);
+ assertEquals(0, remaining_stop);
+ super.stop();
+ }
+ @Override
+ public void initApp(String user, ApplicationID appId, ByteBuffer data) {
+ assertEquals(idef, data.getChar());
+ assertEquals(expected_appId, data.getInt());
+ assertEquals(expected_appId, appId.id);
+ }
+ @Override
+ public void stopApp(ApplicationID appId) {
+ assertEquals(expected_appId, appId.id);
+ }
+ }
+
+ static class ServiceA extends LightService {
+ public ServiceA() { super("A", 'A', 65); }
+ }
+
+ static class ServiceB extends LightService {
+ public ServiceB() { super("B", 'B', 66); }
+ }
+
+ @Test
+ public void testAuxEventDispatch() {
+ Configuration conf = new Configuration();
+ conf.setStrings(AuxServices.AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
+ conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Asrv"),
+ ServiceA.class, Service.class);
+ conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Bsrv"),
+ ServiceB.class, Service.class);
+ conf.setInt("A.expected.init", 1);
+ conf.setInt("B.expected.stop", 1);
+ final AuxServices aux = new AuxServices();
+ aux.init(conf);
+ aux.start();
+
+ ApplicationID appId = new ApplicationID();
+ appId.id = 65;
+ ByteBuffer buf = ByteBuffer.allocate(6);
+ buf.putChar('A');
+ buf.putInt(65);
+ buf.flip();
+ AuxServicesEvent event = new AuxServicesEvent(
+ AuxServicesEventType.APPLICATION_INIT, "user0", appId, "Asrv", buf);
+ aux.handle(event);
+ appId.id = 66;
+ event = new AuxServicesEvent(
+ AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
+ }
+
+ @Test
+ public void testAuxServices() {
+ Configuration conf = new Configuration();
+ conf.setStrings(AuxServices.AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
+ conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Asrv"),
+ ServiceA.class, Service.class);
+ conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Bsrv"),
+ ServiceB.class, Service.class);
+ final AuxServices aux = new AuxServices();
+ aux.init(conf);
+
+ int latch = 1;
+ for (Service s : aux.getServices()) {
+ assertEquals(INITED, s.getServiceState());
+ if (s instanceof ServiceA) { latch *= 2; }
+ else if (s instanceof ServiceB) { latch *= 3; }
+ else fail("Unexpected service type " + s.getClass());
+ }
+ assertEquals("Invalid mix of services", 6, latch);
+ aux.start();
+ for (Service s : aux.getServices()) {
+ assertEquals(STARTED, s.getServiceState());
+ }
+
+ aux.stop();
+ for (Service s : aux.getServices()) {
+ assertEquals(STOPPED, s.getServiceState());
+ }
+ }
+
+ @Test
+ public void testAuxUnexpectedStop() {
+ Configuration conf = new Configuration();
+ conf.setStrings(AuxServices.AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
+ conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Asrv"),
+ ServiceA.class, Service.class);
+ conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Bsrv"),
+ ServiceB.class, Service.class);
+ final AuxServices aux = new AuxServices();
+ aux.init(conf);
+ aux.start();
+
+ Service s = aux.getServices().iterator().next();
+ s.stop();
+ assertEquals("Auxiliary service stopped, but AuxService unaffected.",
+ STOPPED, aux.getServiceState());
+ assertTrue(aux.getServices().isEmpty());
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FakeFSDataInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FakeFSDataInputStream.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FakeFSDataInputStream.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FakeFSDataInputStream.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+
+/** mock streams in unit tests */
+public class FakeFSDataInputStream
+ extends FilterInputStream implements Seekable, PositionedReadable {
+ public FakeFSDataInputStream(InputStream in) { super(in); }
+ public void seek(long pos) throws IOException { }
+ public long getPos() throws IOException { return -1; }
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException { return -1; }
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException { }
+ public void readFully(long position, byte[] buffer) throws IOException { }
+}