You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/11/02 06:35:03 UTC
svn commit: r1196458 [14/19] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/
bin/ conf/ dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-cl...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Wed Nov 2 05:34:31 2011
@@ -24,10 +24,10 @@ import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -47,24 +47,26 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
public class DummyContainerManager extends ContainerManagerImpl {
private static final Log LOG = LogFactory
.getLog(DummyContainerManager.class);
-
- private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
public DummyContainerManager(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
- NodeManagerMetrics metrics, ContainerTokenSecretManager containerTokenSecretManager) {
- super(context, exec, deletionContext, nodeStatusUpdater, metrics, containerTokenSecretManager);
+ NodeManagerMetrics metrics,
+ ContainerTokenSecretManager containerTokenSecretManager,
+ ApplicationACLsManager applicationACLsManager) {
+ super(context, exec, deletionContext, nodeStatusUpdater, metrics,
+ containerTokenSecretManager, applicationACLsManager);
}
@Override
+ @SuppressWarnings("unchecked")
protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
DeletionService deletionContext) {
return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
@@ -120,6 +122,7 @@ public class DummyContainerManager exten
}
@Override
+ @SuppressWarnings("unchecked")
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, super.dispatcher, exec) {
@@ -144,22 +147,23 @@ public class DummyContainerManager exten
}
@Override
- protected LogAggregationService createLogAggregationService(Context context,
- DeletionService deletionService) {
- return new LogAggregationService(context, deletionService) {
+ protected LogHandler createLogHandler(Configuration conf,
+ Context context, DeletionService deletionService) {
+ return new LogHandler() {
+
@Override
- public void handle(LogAggregatorEvent event) {
+ public void handle(LogHandlerEvent event) {
switch (event.getType()) {
- case APPLICATION_STARTED:
- break;
- case CONTAINER_FINISHED:
- break;
- case APPLICATION_FINISHED:
- break;
- default:
- // Ignore
- }
+ case APPLICATION_STARTED:
+ break;
+ case CONTAINER_FINISHED:
+ break;
+ case APPLICATION_FINISHED:
+ break;
+ default:
+ // Ignore
+ }
}
};
}
-}
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Wed Nov 2 05:34:31 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.event.Asyn
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -97,9 +98,9 @@ public class TestEventFlow {
}
};
- DummyContainerManager containerManager =
- new DummyContainerManager(context, exec, del, nodeStatusUpdater,
- metrics, containerTokenSecretManager);
+ DummyContainerManager containerManager = new DummyContainerManager(
+ context, exec, del, nodeStatusUpdater, metrics,
+ containerTokenSecretManager, new ApplicationACLsManager(conf));
containerManager.init(conf);
containerManager.start();
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java Wed Nov 2 05:34:31 2011
@@ -18,151 +18,231 @@
package org.apache.hadoop.yarn.server.nodemanager;
-import java.io.BufferedReader;
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
+import java.io.FileOutputStream;
import java.io.IOException;
-
-import junit.framework.Assert;
+import java.io.PrintWriter;
+import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+/**
+ * This is intended to test the LinuxContainerExecutor code, but because of
+ * some security restrictions this can only be done with some special setup
+ * first.
+ * <br><ol>
+ * <li>Compile the code with container-executor.conf.dir set to the location you
+ * want for testing.
+ * <br><pre><code>
+ * > mvn clean install -Pnative -Dcontainer-executor.conf.dir=/etc/hadoop
+ * -DskipTests
+ * </code></pre>
+ *
+ * <li>Set up <code>${container-executor.conf.dir}/container-executor.cfg</code>
+ * container-executor.cfg needs to be owned by root and have in it the proper
+ * config values.
+ * <br><pre><code>
+ * > cat /etc/hadoop/container-executor.cfg
+ * yarn.nodemanager.local-dirs=/tmp/hadoop/nm-local/
+ * yarn.nodemanager.log-dirs=/tmp/hadoop/nm-log
+ * yarn.nodemanager.linux-container-executor.group=mapred
+ * #depending on the user id of the application.submitter option
+ * min.user.id=1
+ * > sudo chown root:root /etc/hadoop/container-executor.cfg
+ * > sudo chmod 444 /etc/hadoop/container-executor.cfg
+ * </code></pre>
+ *
+ * <li>iMove the binary and set proper permissions on it. It needs to be owned
+ * by root, the group needs to be the group configured in container-executor.cfg,
+ * and it needs the setuid bit set. (The build will also overwrite it so you
+ * need to move it to a place that you can support it.
+ * <br><pre><code>
+ * > cp ./hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/container-executor /tmp/
+ * > sudo chown root:mapred /tmp/container-executor
+ * > sudo chmod 4550 /tmp/container-executor
+ * </code></pre>
+ *
+ * <li>Run the tests with the execution enabled (The user you run the tests as
+ * needs to be part of the group from the config.
+ * <br><pre><code>
+ * mvn test -Dtest=TestLinuxContainerExecutor -Dapplication.submitter=nobody -Dcontainer-executor.path=/tmp/container-executor
+ * </code></pre>
+ * </ol>
+ */
public class TestLinuxContainerExecutor {
-//
-// private static final Log LOG = LogFactory
-// .getLog(TestLinuxContainerExecutor.class);
-//
-// // TODO: FIXME
-// private static File workSpace = new File("target",
-// TestLinuxContainerExecutor.class.getName() + "-workSpace");
-//
-// @Before
-// public void setup() throws IOException {
-// FileContext.getLocalFSFileContext().mkdir(
-// new Path(workSpace.getAbsolutePath()), null, true);
-// workSpace.setReadable(true, false);
-// workSpace.setExecutable(true, false);
-// workSpace.setWritable(true, false);
-// }
-//
-// @After
-// public void tearDown() throws AccessControlException, FileNotFoundException,
-// UnsupportedFileSystemException, IOException {
-// FileContext.getLocalFSFileContext().delete(
-// new Path(workSpace.getAbsolutePath()), true);
-// }
-//
+ private static final Log LOG = LogFactory
+ .getLog(TestLinuxContainerExecutor.class);
+
+ private static File workSpace = new File("target",
+ TestLinuxContainerExecutor.class.getName() + "-workSpace");
+
+ private LinuxContainerExecutor exec = null;
+ private String appSubmitter = null;
+
+ @Before
+ public void setup() throws Exception {
+ FileContext.getLocalFSFileContext().mkdir(
+ new Path(workSpace.getAbsolutePath()), null, true);
+ workSpace.setReadable(true, false);
+ workSpace.setExecutable(true, false);
+ workSpace.setWritable(true, false);
+ String exec_path = System.getProperty("container-executor.path");
+ if(exec_path != null && !exec_path.isEmpty()) {
+ Configuration conf = new Configuration(false);
+ LOG.info("Setting "+YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
+ +"="+exec_path);
+ conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
+ exec = new LinuxContainerExecutor();
+ exec.setConf(conf);
+ }
+ appSubmitter = System.getProperty("application.submitter");
+ if(appSubmitter == null || appSubmitter.isEmpty()) {
+ appSubmitter = "nobody";
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ FileContext.getLocalFSFileContext().delete(
+ new Path(workSpace.getAbsolutePath()), true);
+ }
+
+ private boolean shouldRun() {
+ if(exec == null) {
+ LOG.warn("Not running test because container-executor.path is not set");
+ return false;
+ }
+ return true;
+ }
+
+ private String writeScriptFile(String ... cmd) throws IOException {
+ File f = File.createTempFile("TestLinuxContainerExecutor", ".sh");
+ f.deleteOnExit();
+ PrintWriter p = new PrintWriter(new FileOutputStream(f));
+ p.println("#!/bin/sh");
+ p.print("exec");
+ for(String part: cmd) {
+ p.print(" '");
+ p.print(part.replace("\\", "\\\\").replace("'", "\\'"));
+ p.print("'");
+ }
+ p.println();
+ p.close();
+ return f.getAbsolutePath();
+ }
+
+ private int id = 0;
+ private synchronized int getNextId() {
+ id += 1;
+ return id;
+ }
+
+ private ContainerId getNextContainerId() {
+ ContainerId cId = mock(ContainerId.class);
+ String id = "CONTAINER_"+getNextId();
+ when(cId.toString()).thenReturn(id);
+ return cId;
+ }
+
+
+ private int runAndBlock(String ... cmd) throws IOException {
+ return runAndBlock(getNextContainerId(), cmd);
+ }
+
+ private int runAndBlock(ContainerId cId, String ... cmd) throws IOException {
+ String appId = "APP_"+getNextId();
+ Container container = mock(Container.class);
+ ContainerLaunchContext context = mock(ContainerLaunchContext.class);
+ HashMap<String, String> env = new HashMap<String,String>();
+
+ when(container.getContainerID()).thenReturn(cId);
+ when(container.getLaunchContext()).thenReturn(context);
+
+ when(context.getEnvironment()).thenReturn(env);
+
+ String script = writeScriptFile(cmd);
+
+ Path scriptPath = new Path(script);
+ Path tokensPath = new Path("/dev/null");
+ Path workDir = new Path(workSpace.getAbsolutePath());
+ Path pidFile = new Path(workDir, "pid.txt");
+
+ exec.activateContainer(cId, pidFile);
+ return exec.launchContainer(container, scriptPath, tokensPath,
+ appSubmitter, appId, workDir);
+ }
+
+
+ @Test
+ public void testContainerLaunch() throws IOException {
+ if (!shouldRun()) {
+ return;
+ }
+
+ File touchFile = new File(workSpace, "touch-file");
+ int ret = runAndBlock("touch", touchFile.getAbsolutePath());
+
+ assertEquals(0, ret);
+ FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
+ new Path(touchFile.getAbsolutePath()));
+ assertEquals(appSubmitter, fileStatus.getOwner());
+ }
+
@Test
- public void testCommandFilePreparation() throws IOException {
-// LinuxContainerExecutor executor = new LinuxContainerExecutor(new String[] {
-// "/bin/echo", "hello" }, null, null, "nobody"); // TODO: fix user name
-// executor.prepareCommandFile(workSpace.getAbsolutePath());
-//
-// // Now verify the contents of the commandFile
-// File commandFile = new File(workSpace, LinuxContainerExecutor.COMMAND_FILE);
-// BufferedReader reader = new BufferedReader(new FileReader(commandFile));
-// Assert.assertEquals("/bin/echo hello", reader.readLine());
-// Assert.assertEquals(null, reader.readLine());
-// Assert.assertTrue(commandFile.canExecute());
- }
-//
-// @Test
-// public void testContainerLaunch() throws IOException {
-// String containerExecutorPath = System
-// .getProperty("container-executor-path");
-// if (containerExecutorPath == null || containerExecutorPath.equals("")) {
-// LOG.info("Not Running test for lack of container-executor-path");
-// return;
-// }
-//
-// String applicationSubmitter = "nobody";
-//
-// File touchFile = new File(workSpace, "touch-file");
-// LinuxContainerExecutor executor = new LinuxContainerExecutor(new String[] {
-// "touch", touchFile.getAbsolutePath() }, workSpace, null,
-// applicationSubmitter);
-// executor.setCommandExecutorPath(containerExecutorPath);
-// executor.execute();
-//
-// FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
-// new Path(touchFile.getAbsolutePath()));
-// Assert.assertEquals(applicationSubmitter, fileStatus.getOwner());
-// }
-//
-// @Test
-// public void testContainerKill() throws IOException, InterruptedException,
-// IllegalArgumentException, SecurityException, IllegalAccessException,
-// NoSuchFieldException {
-// String containerExecutorPath = System
-// .getProperty("container-executor-path");
-// if (containerExecutorPath == null || containerExecutorPath.equals("")) {
-// LOG.info("Not Running test for lack of container-executor-path");
-// return;
-// }
-//
-// String applicationSubmitter = "nobody";
-// final LinuxContainerExecutor executor = new LinuxContainerExecutor(
-// new String[] { "sleep", "100" }, workSpace, null, applicationSubmitter);
-// executor.setCommandExecutorPath(containerExecutorPath);
-// new Thread() {
-// public void run() {
-// try {
-// executor.execute();
-// } catch (IOException e) {
-// // TODO Auto-generated catch block
-// e.printStackTrace();
-// }
-// };
-// }.start();
-//
-// String pid;
-// while ((pid = executor.getPid()) == null) {
-// LOG.info("Sleeping for 5 seconds before checking if "
-// + "the process is alive.");
-// Thread.sleep(5000);
-// }
-// LOG.info("Going to check the liveliness of the process with pid " + pid);
-//
-// LinuxContainerExecutor checkLiveliness = new LinuxContainerExecutor(
-// new String[] { "kill", "-0", "-" + pid }, workSpace, null,
-// applicationSubmitter);
-// checkLiveliness.setCommandExecutorPath(containerExecutorPath);
-// checkLiveliness.execute();
-//
-// LOG.info("Process is alive. "
-// + "Sleeping for 5 seconds before killing the process.");
-// Thread.sleep(5000);
-// LOG.info("Going to killing the process.");
-//
-// executor.kill();
-//
-// LOG.info("Sleeping for 5 seconds before checking if "
-// + "the process is alive.");
-// Thread.sleep(5000);
-// LOG.info("Going to check the liveliness of the process.");
-//
-// // TODO: fix
-// checkLiveliness = new LinuxContainerExecutor(new String[] { "kill", "-0",
-// "-" + pid }, workSpace, null, applicationSubmitter);
-// checkLiveliness.setCommandExecutorPath(containerExecutorPath);
-// boolean success = false;
-// try {
-// checkLiveliness.execute();
-// success = true;
-// } catch (IOException e) {
-// success = false;
-// }
-//
-// Assert.assertFalse(success);
-// }
-}
\ No newline at end of file
+ public void testContainerKill() throws Exception {
+ if (!shouldRun()) {
+ return;
+ }
+
+ final ContainerId sleepId = getNextContainerId();
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ runAndBlock(sleepId, "sleep", "100");
+ } catch (IOException e) {
+ LOG.warn("Caught exception while running sleep",e);
+ }
+ };
+ };
+ t.setDaemon(true); //If it does not exit we shouldn't block the test.
+ t.start();
+
+ assertTrue(t.isAlive());
+
+ String pid = null;
+ int count = 10;
+ while ((pid = exec.getProcessId(sleepId)) == null && count > 0) {
+ LOG.info("Sleeping for 200 ms before checking for pid ");
+ Thread.sleep(200);
+ count--;
+ }
+ assertNotNull(pid);
+
+ LOG.info("Going to killing the process.");
+ exec.signalContainer(appSubmitter, pid, Signal.TERM);
+ LOG.info("sleeping for 100ms to let the sleep be killed");
+ Thread.sleep(100);
+
+ assertFalse(t.isAlive());
+ }
+}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Nov 2 05:34:31 2011
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMa
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -52,12 +53,14 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE;
@@ -82,10 +85,16 @@ public class TestNodeStatusUpdater {
int heartBeatID = 0;
volatile Error nmStartError = null;
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
+ private final Configuration conf = new YarnConfiguration();
+ private NodeManager nm;
@After
public void tearDown() {
this.registeredNodes.clear();
+ heartBeatID = 0;
+ if (nm != null) {
+ nm.stop();
+ }
DefaultMetricsSystem.shutdown();
}
@@ -167,7 +176,7 @@ public class TestNodeStatusUpdater {
launchContext.setContainerId(firstContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(2);
- Container container = new ContainerImpl(null, launchContext, null, null);
+ Container container = new ContainerImpl(conf , null, launchContext, null, null);
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
@@ -191,7 +200,7 @@ public class TestNodeStatusUpdater {
launchContext.setContainerId(secondContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(3);
- Container container = new ContainerImpl(null, launchContext, null, null);
+ Container container = new ContainerImpl(conf, null, launchContext, null, null);
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
@@ -217,6 +226,7 @@ public class TestNodeStatusUpdater {
}
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
+ public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
@@ -229,10 +239,44 @@ public class TestNodeStatusUpdater {
@Override
protected ResourceTracker getRMClient() {
- return new MyResourceTracker(this.context);
+ return resourceTracker;
}
}
+
+ //
+ private class MyResourceTracker2 implements ResourceTracker {
+ public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
+
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ RegistrationResponse regResponse = recordFactory
+ .newRecordInstance(RegistrationResponse.class);
+ regResponse.setNodeAction(registerNodeAction );
+ response.setRegistrationResponse(regResponse);
+ return response;
+ }
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnRemoteException {
+ NodeStatus nodeStatus = request.getNodeStatus();
+ nodeStatus.setResponseId(heartBeatID++);
+ HeartbeatResponse response = recordFactory
+ .newRecordInstance(HeartbeatResponse.class);
+ response.setResponseId(heartBeatID);
+ response.setNodeAction(heartBeatNodeAction);
+
+ NodeHeartbeatResponse nhResponse = recordFactory
+ .newRecordInstance(NodeHeartbeatResponse.class);
+ nhResponse.setHeartbeatResponse(response);
+ return nhResponse;
+ }
+ }
+
@Before
public void clearError() {
nmStartError = null;
@@ -246,7 +290,7 @@ public class TestNodeStatusUpdater {
@Test
public void testNMRegistration() throws InterruptedException {
- final NodeManager nm = new NodeManager() {
+ nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
@@ -292,14 +336,85 @@ public class TestNodeStatusUpdater {
Assert.fail("NodeManager failed to start");
}
- while (heartBeatID <= 3) {
+ waitCount = 0;
+ while (heartBeatID <= 3 && waitCount++ != 20) {
Thread.sleep(500);
}
+ Assert.assertFalse(heartBeatID <= 3);
Assert.assertEquals("Number of registered NMs is wrong!!", 1,
this.registeredNodes.size());
nm.stop();
}
+
+ @Test
+ public void testNodeDecommision() throws Exception {
+ nm = getNodeManager(NodeAction.SHUTDOWN);
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ Assert.assertEquals(STATE.INITED, nm.getServiceState());
+ nm.start();
+
+ int waitCount = 0;
+ while (heartBeatID < 1 && waitCount++ != 20) {
+ Thread.sleep(500);
+ }
+ Assert.assertFalse(heartBeatID < 1);
+
+ // NM takes a while to reach the STOPPED state.
+ waitCount = 0;
+ while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+ LOG.info("Waiting for NM to stop..");
+ Thread.sleep(1000);
+ }
+
+ Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+ }
+
+ @Test
+ public void testNodeReboot() throws Exception {
+ nm = getNodeManager(NodeAction.REBOOT);
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ Assert.assertEquals(STATE.INITED, nm.getServiceState());
+ nm.start();
+
+ int waitCount = 0;
+ while (heartBeatID < 1 && waitCount++ != 20) {
+ Thread.sleep(500);
+ }
+ Assert.assertFalse(heartBeatID < 1);
+
+ // NM takes a while to reach the STOPPED state.
+ waitCount = 0;
+ while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+ LOG.info("Waiting for NM to stop..");
+ Thread.sleep(1000);
+ }
+
+ Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+ }
+
+ @Test
+ public void testNMShutdownForRegistrationFailure() {
+
+ nm = new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+ ContainerTokenSecretManager containerTokenSecretManager) {
+ MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
+ context, dispatcher, healthChecker, metrics,
+ containerTokenSecretManager);
+ MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
+ myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
+ nodeStatusUpdater.resourceTracker = myResourceTracker2;
+ return nodeStatusUpdater;
+ }
+ };
+ verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: "
+ + "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
+ }
/**
* Verifies that if for some reason NM fails to start ContainerManager RPC
@@ -311,7 +426,7 @@ public class TestNodeStatusUpdater {
@Test
public void testNoRegistrationWhenNMServicesFail() {
- final NodeManager nm = new NodeManager() {
+ nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
@@ -324,9 +439,11 @@ public class TestNodeStatusUpdater {
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
- ContainerTokenSecretManager containerTokenSecretManager) {
- return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
- metrics, containerTokenSecretManager) {
+ ContainerTokenSecretManager containerTokenSecretManager,
+ ApplicationACLsManager aclsManager) {
+ return new ContainerManagerImpl(context, exec, del,
+ nodeStatusUpdater, metrics, containerTokenSecretManager,
+ aclsManager) {
@Override
public void start() {
// Simulating failure of starting RPC server
@@ -336,16 +453,22 @@ public class TestNodeStatusUpdater {
}
};
+ verifyNodeStartFailure("Starting of RPC Server failed");
+ }
+
+ private void verifyNodeStartFailure(String errMessage) {
YarnConfiguration conf = createNMConfig();
nm.init(conf);
try {
nm.start();
Assert.fail("NM should have failed to start. Didn't get exception!!");
} catch (Exception e) {
- Assert.assertEquals("Starting of RPC Server failed", e.getCause()
+ Assert.assertEquals(errMessage, e.getCause()
.getMessage());
}
-
+
+ // the state change to stopped occurs only if the startup is success, else
+ // state change doesn't occur
Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
.getServiceState());
@@ -355,7 +478,7 @@ public class TestNodeStatusUpdater {
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.NM_VMEM_GB, 5); // 5GB
+ conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
@@ -366,4 +489,21 @@ public class TestNodeStatusUpdater {
.toUri().getPath());
return conf;
}
+
+ private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
+ return new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+ ContainerTokenSecretManager containerTokenSecretManager) {
+ MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
+ context, dispatcher, healthChecker, metrics,
+ containerTokenSecretManager);
+ MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
+ myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction;
+ myNodeStatusUpdater.resourceTracker = myResourceTracker2;
+ return myNodeStatusUpdater;
+ }
+ };
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBLocalizerRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBLocalizerRPC.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBLocalizerRPC.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBLocalizerRPC.java Wed Nov 2 05:34:31 2011
@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.no
import java.net.InetSocketAddress;
-import org.apache.avro.ipc.Server;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -61,7 +61,7 @@ public class TestPBLocalizerRPC {
public void stop() {
if (server != null) {
- server.close();
+ server.stop();
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Wed Nov 2 05:34:31 2011
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.After;
@@ -146,9 +147,9 @@ public abstract class BaseContainerManag
delSrvc.init(conf);
exec = createContainerExecutor();
- containerManager =
- new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
- metrics, this.containerTokenSecretManager);
+ containerManager = new ContainerManagerImpl(context, exec, delSrvc,
+ nodeStatusUpdater, metrics, this.containerTokenSecretManager,
+ new ApplicationACLsManager(conf));
containerManager.init(conf);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Wed Nov 2 05:34:31 2011
@@ -22,8 +22,12 @@ import org.junit.Test;
import static org.junit.Assert.*;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -39,6 +43,7 @@ import org.apache.hadoop.yarn.service.Se
import static org.apache.hadoop.yarn.service.Service.STATE.*;
public class TestAuxServices {
+ private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
static class LightService extends AbstractService
implements AuxServices.AuxiliaryService {
@@ -47,6 +52,7 @@ public class TestAuxServices {
private int remaining_init;
private int remaining_stop;
private ByteBuffer meta = null;
+ private ArrayList<Integer> stoppedApps;
LightService(String name, char idef, int expected_appId) {
this(name, idef, expected_appId, null);
@@ -56,7 +62,13 @@ public class TestAuxServices {
this.idef = idef;
this.expected_appId = expected_appId;
this.meta = meta;
+ this.stoppedApps = new ArrayList<Integer>();
}
+
+ public ArrayList<Integer> getAppIdsStopped() {
+ return (ArrayList)this.stoppedApps.clone();
+ }
+
@Override
public void init(Configuration conf) {
remaining_init = conf.getInt(idef + ".expected.init", 0);
@@ -77,7 +89,7 @@ public class TestAuxServices {
}
@Override
public void stopApp(ApplicationId appId) {
- assertEquals(expected_appId, appId.getId());
+ stoppedApps.add(appId.getId());
}
@Override
public ByteBuffer getMeta() {
@@ -86,11 +98,15 @@ public class TestAuxServices {
}
static class ServiceA extends LightService {
- public ServiceA() { super("A", 'A', 65, ByteBuffer.wrap("A".getBytes())); }
+ public ServiceA() {
+ super("A", 'A', 65, ByteBuffer.wrap("A".getBytes()));
+ }
}
static class ServiceB extends LightService {
- public ServiceB() { super("B", 'B', 66, ByteBuffer.wrap("B".getBytes())); }
+ public ServiceB() {
+ super("B", 'B', 66, ByteBuffer.wrap("B".getBytes()));
+ }
}
@Test
@@ -119,6 +135,14 @@ public class TestAuxServices {
appId.setId(66);
event = new AuxServicesEvent(
AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
+ // verify all services got the stop event
+ aux.handle(event);
+ Collection<AuxServices.AuxiliaryService> servs = aux.getServices();
+ for (AuxServices.AuxiliaryService serv: servs) {
+ ArrayList<Integer> appIds = ((LightService)serv).getAppIdsStopped();
+ assertEquals("app not properly stopped", 1, appIds.size());
+ assertTrue("wrong app stopped", appIds.contains((Integer)66));
+ }
}
@Test
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Wed Nov 2 05:34:31 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@@ -279,7 +280,7 @@ public class TestContainerManager extend
gcsRequest.setContainerId(cId);
ContainerStatus containerStatus =
containerManager.getContainerStatus(gcsRequest).getStatus();
- Assert.assertEquals(ExitCode.KILLED.getExitCode(),
+ Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
containerStatus.getExitStatus());
// Assert that the process is not alive anymore
@@ -385,7 +386,8 @@ public class TestContainerManager extend
ContainerTokenSecretManager containerTokenSecretManager = new
ContainerTokenSecretManager();
containerManager = new ContainerManagerImpl(context, exec, delSrvc,
- nodeStatusUpdater, metrics, containerTokenSecretManager);
+ nodeStatusUpdater, metrics, containerTokenSecretManager,
+ new ApplicationACLsManager(conf));
containerManager.init(conf);
containerManager.start();
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Wed Nov 2 05:34:31 2011
@@ -33,10 +33,12 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -68,6 +71,7 @@ import org.mockito.ArgumentMatcher;
public class TestContainer {
final NodeManagerMetrics metrics = NodeManagerMetrics.create();
+ final Configuration conf = new YarnConfiguration();
/**
@@ -165,7 +169,7 @@ public class TestContainer {
wc.localizeResources();
wc.launchContainer();
reset(wc.localizerBus);
- wc.containerFailed(ExitCode.KILLED.getExitCode());
+ wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState());
verifyCleanupCall(wc);
@@ -222,6 +226,89 @@ public class TestContainer {
}
}
+ @Test
+ public void testKillOnLocalizationFailed() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(15, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.failLocalizeResources(wc.getLocalResourceCount());
+ assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ wc.killContainer();
+ assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ verifyCleanupCall(wc);
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
+ @Test
+ public void testResourceLocalizedOnLocalizationFailed() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(16, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ int failCount = wc.getLocalResourceCount()/2;
+ if (failCount == 0) {
+ failCount = 1;
+ }
+ wc.failLocalizeResources(failCount);
+ assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ wc.localizeResourcesFromInvalidState(failCount);
+ assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ verifyCleanupCall(wc);
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
+ @Test
+ public void testResourceFailedOnLocalizationFailed() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(16, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+
+ Iterator<String> lRsrcKeys = wc.localResources.keySet().iterator();
+ String key1 = lRsrcKeys.next();
+ String key2 = lRsrcKeys.next();
+ wc.failLocalizeSpecificResource(key1);
+ assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ wc.failLocalizeSpecificResource(key2);
+ assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+ verifyCleanupCall(wc);
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
+ @Test
+ public void testResourceFailedOnKilling() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(16, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+
+ Iterator<String> lRsrcKeys = wc.localResources.keySet().iterator();
+ String key1 = lRsrcKeys.next();
+ wc.killContainer();
+ assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ wc.failLocalizeSpecificResource(key1);
+ assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ verifyCleanupCall(wc);
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
/**
* Verify serviceData correctly sent.
*/
@@ -265,6 +352,26 @@ public class TestContainer {
}
}
+ @Test
+ public void testLaunchAfterKillRequest() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(14, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ wc.killContainer();
+ assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ wc.launchContainer();
+ assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ wc.containerKilledOnRequest();
+ verifyCleanupCall(wc);
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
private void verifyCleanupCall(WrappedContainer wc) throws Exception {
ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
@@ -384,7 +491,7 @@ public class TestContainer {
}
private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
- return new ContainerImpl(disp, ctx, null, metrics);
+ return new ContainerImpl(conf, disp, ctx, null, metrics);
}
@SuppressWarnings("unchecked")
@@ -468,11 +575,20 @@ public class TestContainer {
drainDispatcherEvents();
}
- public Map<Path, String> localizeResources() throws URISyntaxException {
+ // Localize resources
+ // Skip some resources so as to consider them failed
+ public Map<Path, String> doLocalizeResources(boolean checkLocalizingState,
+ int skipRsrcCount) throws URISyntaxException {
Path cache = new Path("file:///cache");
Map<Path, String> localPaths = new HashMap<Path, String>();
+ int counter = 0;
for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
- assertEquals(ContainerState.LOCALIZING, c.getContainerState());
+ if (counter++ < skipRsrcCount) {
+ continue;
+ }
+ if (checkLocalizingState) {
+ assertEquals(ContainerState.LOCALIZING, c.getContainerState());
+ }
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Path p = new Path(cache, rsrc.getKey());
localPaths.put(p, rsrc.getKey());
@@ -483,6 +599,42 @@ public class TestContainer {
drainDispatcherEvents();
return localPaths;
}
+
+
+ public Map<Path, String> localizeResources() throws URISyntaxException {
+ return doLocalizeResources(true, 0);
+ }
+
+ public void localizeResourcesFromInvalidState(int skipRsrcCount)
+ throws URISyntaxException {
+ doLocalizeResources(false, skipRsrcCount);
+ }
+
+ public void failLocalizeSpecificResource(String rsrcKey)
+ throws URISyntaxException {
+ LocalResource rsrc = localResources.get(rsrcKey);
+ LocalResourceRequest req = new LocalResourceRequest(rsrc);
+ Exception e = new Exception("Fake localization error");
+ c.handle(new ContainerResourceFailedEvent(c.getContainerID(), req, e));
+ drainDispatcherEvents();
+ }
+
+ // fail to localize some resources
+ public void failLocalizeResources(int failRsrcCount)
+ throws URISyntaxException {
+ int counter = 0;
+ for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
+ if (counter >= failRsrcCount) {
+ break;
+ }
+ ++counter;
+ LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
+ Exception e = new Exception("Fake localization error");
+ c.handle(new ContainerResourceFailedEvent(c.getContainerID(),
+ req, e));
+ }
+ drainDispatcherEvents();
+ }
public void launchContainer() {
c.handle(new ContainerEvent(cId, ContainerEventType.CONTAINER_LAUNCHED));
@@ -508,9 +660,13 @@ public class TestContainer {
public void containerKilledOnRequest() {
c.handle(new ContainerExitEvent(cId,
- ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.KILLED
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.FORCE_KILLED
.getExitCode()));
drainDispatcherEvents();
}
+
+ public int getLocalResourceCount() {
+ return localResources.size();
+ }
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Wed Nov 2 05:34:31 2011
@@ -33,7 +33,7 @@ import java.util.Set;
import junit.framework.Assert;
-import org.apache.avro.ipc.Server;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.util.Conve
import org.junit.Test;
import static org.junit.Assert.*;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
@@ -146,7 +147,7 @@ public class TestResourceLocalizationSer
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testResourceRelease() throws Exception {
- Configuration conf = new Configuration();
+ Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
@@ -330,7 +331,7 @@ public class TestResourceLocalizationSer
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
- Configuration conf = new Configuration();
+ Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
@@ -355,7 +356,8 @@ public class TestResourceLocalizationSer
dispatcher.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
- DeletionService delService = new DeletionService(exec);
+ DeletionService delServiceReal = new DeletionService(exec);
+ DeletionService delService = spy(delServiceReal);
delService.init(null);
delService.start();
@@ -407,12 +409,14 @@ public class TestResourceLocalizationSer
rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible
- Thread.sleep(500);
+ Thread.sleep(1000);
dispatcher.await();
String appStr = ConverterUtils.toString(appId);
String ctnrStr = c.getContainerID().toString();
- verify(exec).startLocalizer(isA(Path.class), isA(InetSocketAddress.class),
- eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+ ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
+ verify(exec).startLocalizer(tokenPathCaptor.capture(), isA(InetSocketAddress.class),
+ eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+ Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer
LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
@@ -454,10 +458,13 @@ public class TestResourceLocalizationSer
};
dispatcher.await();
verify(containerBus).handle(argThat(matchesContainerLoc));
+
+ // Verify deletion of localization token.
+ verify(delService).delete((String)isNull(), eq(localizationTokenPath));
} finally {
- delService.stop();
- dispatcher.stop();
spyService.stop();
+ dispatcher.stop();
+ delService.stop();
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Wed Nov 2 05:34:31 2011
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMa
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.junit.Test;
@@ -82,7 +83,7 @@ public class TestResourceRetention {
for (int i = 0; i < nRsrcs; ++i) {
final LocalResourceRequest req = new LocalResourceRequest(
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
- LocalResourceType.FILE);
+ LocalResourceType.FILE, LocalResourceVisibility.PUBLIC);
final long ts = timestamp + i * tsstep;
final Path p = new Path("file:///local/" + user + "/rsrc" + i);
LocalizedResource rsrc = new LocalizedResource(req, null) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Wed Nov 2 05:34:31 2011
@@ -18,6 +18,12 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
@@ -28,8 +34,10 @@ import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import junit.framework.Assert;
@@ -41,6 +49,7 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -50,24 +59,31 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
-@Ignore
+//@Ignore
public class TestLogAggregationService extends BaseContainerManagerTest {
+ private Map<ApplicationAccessType, String> acls = createAppAcls();
+
static {
LOG = LogFactory.getLog(TestLogAggregationService.class);
}
@@ -91,17 +107,24 @@ public class TestLogAggregationService e
}
@Test
+ @SuppressWarnings("unchecked")
public void testLocalFileDeletionAfterUpload() throws IOException {
this.delSrvc = new DeletionService(createContainerExecutor());
this.delSrvc.init(conf);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
+
+ DrainDispatcher dispatcher = createDispatcher();
+ EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+
LogAggregationService logAggregationService =
- new LogAggregationService(this.context, this.delSrvc);
+ new LogAggregationService(dispatcher, this.context, this.delSrvc);
logAggregationService.init(this.conf);
logAggregationService.start();
+
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
// AppLogDir should be created
@@ -109,25 +132,24 @@ public class TestLogAggregationService e
new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
- .handle(new LogAggregatorAppStartedEvent(
+ .handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
- ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
- ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
- appAttemptId.setApplicationId(application1);
- appAttemptId.setAttemptId(1);
- ContainerId container11 =
- BuilderUtils.newContainerId(recordFactory, application1, appAttemptId, 1);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(application1, 1);
+ ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1);
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container11, 0));
+ new LogHandlerContainerFinishedEvent(container11, 0));
- logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
application1));
logAggregationService.stop();
+
String containerIdStr = ConverterUtils.toString(container11);
File containerLogDir = new File(app1LogDir, containerIdStr);
for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
@@ -136,17 +158,36 @@ public class TestLogAggregationService e
Assert.assertFalse(app1LogDir.exists());
- Assert.assertTrue(new File(logAggregationService
- .getRemoteNodeLogFileForApp(application1).toUri().getPath()).exists());
+ Path logFilePath =
+ logAggregationService.getRemoteNodeLogFileForApp(application1,
+ this.user);
+ Assert.assertTrue("Log file [" + logFilePath + "] not found", new File(
+ logFilePath.toUri().getPath()).exists());
+
+ dispatcher.await();
+ ArgumentCaptor<ApplicationEvent> eventCaptor =
+ ArgumentCaptor.forClass(ApplicationEvent.class);
+ verify(appEventHandler).handle(eventCaptor.capture());
+ assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+ eventCaptor.getValue().getType());
+ assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue()
+ .getApplicationID());
+
}
@Test
+ @SuppressWarnings("unchecked")
public void testNoContainerOnNode() {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
+
+ DrainDispatcher dispatcher = createDispatcher();
+ EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+
LogAggregationService logAggregationService =
- new LogAggregationService(this.context, this.delSrvc);
+ new LogAggregationService(dispatcher, this.context, this.delSrvc);
logAggregationService.init(this.conf);
logAggregationService.start();
@@ -157,29 +198,44 @@ public class TestLogAggregationService e
new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
- .handle(new LogAggregatorAppStartedEvent(
+ .handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
- ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
- logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
application1));
logAggregationService.stop();
- Assert
- .assertFalse(new File(logAggregationService
- .getRemoteNodeLogFileForApp(application1).toUri().getPath())
- .exists());
+ Assert.assertFalse(new File(logAggregationService
+ .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath())
+ .exists());
+
+ dispatcher.await();
+ ArgumentCaptor<ApplicationEvent> eventCaptor =
+ ArgumentCaptor.forClass(ApplicationEvent.class);
+ verify(appEventHandler).handle(eventCaptor.capture());
+ assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+ eventCaptor.getValue().getType());
+ verify(appEventHandler).handle(eventCaptor.capture());
+ assertEquals(application1, eventCaptor.getValue()
+ .getApplicationID());
}
@Test
+ @SuppressWarnings("unchecked")
public void testMultipleAppsLogAggregation() throws IOException {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
+
+ DrainDispatcher dispatcher = createDispatcher();
+ EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+
LogAggregationService logAggregationService =
- new LogAggregationService(this.context, this.delSrvc);
+ new LogAggregationService(dispatcher, this.context, this.delSrvc);
logAggregationService.init(this.conf);
logAggregationService.start();
@@ -190,92 +246,80 @@ public class TestLogAggregationService e
new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
- .handle(new LogAggregatorAppStartedEvent(
+ .handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
- ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
- ApplicationAttemptId appAttemptId1 =
- recordFactory.newRecordInstance(ApplicationAttemptId.class);
- appAttemptId1.setApplicationId(application1);
- ContainerId container11 =
- BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1, 1);
+ ApplicationAttemptId appAttemptId1 =
+ BuilderUtils.newApplicationAttemptId(application1, 1);
+ ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
+
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container11, 0));
+ new LogHandlerContainerFinishedEvent(container11, 0));
ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
- ApplicationAttemptId appAttemptId2 =
- recordFactory.newRecordInstance(ApplicationAttemptId.class);
- appAttemptId1.setApplicationId(application2);
+ ApplicationAttemptId appAttemptId2 =
+ BuilderUtils.newApplicationAttemptId(application2, 1);
File app2LogDir =
new File(localLogDir, ConverterUtils.toString(application2));
app2LogDir.mkdir();
- logAggregationService.handle(new LogAggregatorAppStartedEvent(
+ logAggregationService.handle(new LogHandlerAppStartedEvent(
application2, this.user, null,
- ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY));
+ ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
- ContainerId container21 =
- BuilderUtils.newContainerId(recordFactory, application2,
- appAttemptId2, 1);
+ ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1);
+
writeContainerLogs(app2LogDir, container21);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container21, 0));
+ new LogHandlerContainerFinishedEvent(container21, 0));
+
+ ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
- ContainerId container12 =
- BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1,
- 2);
writeContainerLogs(app1LogDir, container12);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container12, 0));
+ new LogHandlerContainerFinishedEvent(container12, 0));
ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
- ApplicationAttemptId appAttemptId3 =
- recordFactory.newRecordInstance(ApplicationAttemptId.class);
- appAttemptId1.setApplicationId(application3);
+ ApplicationAttemptId appAttemptId3 =
+ BuilderUtils.newApplicationAttemptId(application3, 1);
File app3LogDir =
new File(localLogDir, ConverterUtils.toString(application3));
app3LogDir.mkdir();
- logAggregationService.handle(new LogAggregatorAppStartedEvent(
- application3, this.user, null,
- ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY));
-
- ContainerId container31 =
- BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3,
- 1);
+ logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
+ this.user, null,
+ ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
+
+
+ ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
writeContainerLogs(app3LogDir, container31);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container31, 0));
+ new LogHandlerContainerFinishedEvent(container31, 0));
- ContainerId container32 =
- BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3,
- 2);
+ ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
writeContainerLogs(app3LogDir, container32);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container32, 1)); // Failed
+ new LogHandlerContainerFinishedEvent(container32, 1)); // Failed
- ContainerId container22 =
- BuilderUtils.newContainerId(recordFactory, application2, appAttemptId2,
- 2);
+ ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
writeContainerLogs(app2LogDir, container22);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container22, 0));
+ new LogHandlerContainerFinishedEvent(container22, 0));
- ContainerId container33 =
- BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3,
- 3);
+ ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
writeContainerLogs(app3LogDir, container33);
logAggregationService.handle(
- new LogAggregatorContainerFinishedEvent(container33, 0));
+ new LogHandlerContainerFinishedEvent(container33, 0));
- logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
application2));
- logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
application3));
- logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(
application1));
logAggregationService.stop();
@@ -286,6 +330,22 @@ public class TestLogAggregationService e
new ContainerId[] { container21 });
verifyContainerLogs(logAggregationService, application3,
new ContainerId[] { container31, container32 });
+
+ dispatcher.await();
+ ArgumentCaptor<ApplicationEvent> eventCaptor =
+ ArgumentCaptor.forClass(ApplicationEvent.class);
+
+ verify(appEventHandler, times(3)).handle(eventCaptor.capture());
+ List<ApplicationEvent> capturedEvents = eventCaptor.getAllValues();
+ Set<ApplicationId> appIds = new HashSet<ApplicationId>();
+ for (ApplicationEvent cap : capturedEvents) {
+ assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+ eventCaptor.getValue().getType());
+ appIds.add(cap.getApplicationID());
+ }
+ assertTrue(appIds.contains(application1));
+ assertTrue(appIds.contains(application2));
+ assertTrue(appIds.contains(application3));
}
private void writeContainerLogs(File appLogDir, ContainerId containerId)
@@ -306,7 +366,11 @@ public class TestLogAggregationService e
ContainerId[] expectedContainerIds) throws IOException {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(this.conf,
- logAggregationService.getRemoteNodeLogFileForApp(appId));
+ logAggregationService.getRemoteNodeLogFileForApp(appId, this.user));
+
+ Assert.assertEquals(this.user, reader.getApplicationOwner());
+ verifyAcls(reader.getApplicationAcls());
+
try {
Map<String, Map<String, String>> logMap =
new HashMap<String, Map<String, String>>();
@@ -382,6 +446,7 @@ public class TestLogAggregationService e
this.containerManager.start();
+
File scriptFile = new File(tmpDir, "scriptFile.sh");
PrintWriter fileWriter = new PrintWriter(scriptFile);
fileWriter.write("\necho Hello World! Stdout! > "
@@ -400,13 +465,10 @@ public class TestLogAggregationService e
recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(0);
appId.setId(0);
- ApplicationAttemptId appAttemptId =
- recordFactory.newRecordInstance(ApplicationAttemptId.class);
- appAttemptId.setApplicationId(appId);
- appAttemptId.setAttemptId(1);
- ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
- cId.setId(0);
- cId.setApplicationAttemptId(appAttemptId);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 1);
+ ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 0);
+
containerLaunchContext.setContainerId(cId);
containerLaunchContext.setUser(this.user);
@@ -446,4 +508,27 @@ public class TestLogAggregationService e
.asList(appId)));
this.containerManager.stop();
}
+
+ private void verifyAcls(Map<ApplicationAccessType, String> logAcls) {
+ Assert.assertEquals(this.acls.size(), logAcls.size());
+ for (ApplicationAccessType appAccessType : this.acls.keySet()) {
+ Assert.assertEquals(this.acls.get(appAccessType),
+ logAcls.get(appAccessType));
+ }
+ }
+
+ private DrainDispatcher createDispatcher() {
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(this.conf);
+ dispatcher.start();
+ return dispatcher;
+ }
+
+ private Map<ApplicationAccessType, String> createAppAcls() {
+ Map<ApplicationAccessType, String> appAcls =
+ new HashMap<ApplicationAccessType, String>();
+ appAcls.put(ApplicationAccessType.MODIFY_APP, "user group");
+ appAcls.put(ApplicationAccessType.VIEW_APP, "*");
+ return appAcls;
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Wed Nov 2 05:34:31 2011
@@ -262,16 +262,17 @@ public class TestContainersMonitor exten
gcsRequest.setContainerId(cId);
ContainerStatus containerStatus =
containerManager.getContainerStatus(gcsRequest).getStatus();
- Assert.assertEquals(ExitCode.KILLED.getExitCode(),
+ Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
containerStatus.getExitStatus());
String expectedMsgPattern =
"Container \\[pid=" + pid + ",containerID=" + cId
- + "\\] is running beyond memory-limits. Current usage : "
- + "[0-9]*bytes. Limit : [0-9]*"
- + "bytes. Killing container. \nDump of the process-tree for "
- + cId + " : \n";
+ + "\\] is running beyond virtual memory limits. Current usage: "
+ + "[0-9.]+m?b of [0-9.]+m?b physical memory used; "
+ + "[0-9.]+m?b of [0-9.]+m?b virtual memory used. "
+ + "Killing container.\nDump of the process-tree for "
+ + cId + " :\n";
Pattern pat = Pattern.compile(expectedMsgPattern);
- Assert.assertEquals("Expected message patterns is: " + expectedMsgPattern
+ Assert.assertEquals("Expected message pattern is: " + expectedMsgPattern
+ "\n\nObserved message is: " + containerStatus.getDiagnostics(),
true, pat.matcher(containerStatus.getDiagnostics()).find());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Wed Nov 2 05:34:31 2011
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@@ -41,11 +44,11 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Before;
import org.junit.Test;
-import static org.mockito.Mockito.*;
public class TestNMWebServer {
@@ -58,7 +61,7 @@ public class TestNMWebServer {
}
@Test
- public void testNMWebApp() throws InterruptedException, IOException {
+ public void testNMWebApp() throws IOException {
Context nmContext = new NodeManager.NMContext();
ResourceView resourceView = new ResourceView() {
@Override
@@ -70,8 +73,9 @@ public class TestNMWebServer {
return 0;
}
};
- WebServer server = new WebServer(nmContext, resourceView);
Configuration conf = new Configuration();
+ WebServer server = new WebServer(nmContext, resourceView,
+ new ApplicationACLsManager(conf));
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
server.init(conf);
server.start();
@@ -88,9 +92,8 @@ public class TestNMWebServer {
when(app.getUser()).thenReturn(user);
when(app.getAppId()).thenReturn(appId);
nmContext.getApplications().put(appId, app);
- ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
- appAttemptId.setApplicationId(appId);
- appAttemptId.setAttemptId(1);
+ ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+ appId, 1);
ContainerId container1 =
BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 0);
ContainerId container2 =
@@ -104,7 +107,7 @@ public class TestNMWebServer {
launchContext.setContainerId(containerId);
launchContext.setUser(user);
Container container =
- new ContainerImpl(dispatcher, launchContext, null, metrics) {
+ new ContainerImpl(conf, dispatcher, launchContext, null, metrics) {
@Override
public ContainerState getContainerState() {
return ContainerState.RUNNING;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Wed Nov 2 05:34:31 2011
@@ -16,15 +16,16 @@
<parent>
<artifactId>hadoop-yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
- <version>${yarn.version}</version>
+ <version>0.24.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+ <version>0.24.0-SNAPSHOT</version>
<name>hadoop-yarn-server-resourcemanager</name>
<properties>
- <install.file>${project.artifact.file}</install.file>
+ <!-- Needed for generating FindBugs warnings using parent pom -->
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
</properties>
@@ -33,6 +34,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </dependency>
</dependencies>
<build>