You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:19 UTC
svn commit: r1513258 [7/10] - in
/hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/
hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/
hadoop-...
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java Mon Aug 12 21:25:49 2013
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.File;
@@ -28,22 +30,26 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -57,10 +63,16 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -98,7 +110,7 @@ public class TestContainerLaunch extends
tempFile = Shell.appendScriptExtension(tmpDir, "temp");
String timeoutCommand = Shell.WINDOWS ? "@echo \"hello\"" :
"echo \"hello\"";
- PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
+ PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
FileUtil.setExecutable(shellFile, true);
writer.println(timeoutCommand);
writer.close();
@@ -132,7 +144,7 @@ public class TestContainerLaunch extends
assertEquals(shexc.getExitCode(), 0);
assert(shexc.getOutput().contains("hello"));
- symLinkFile = new File(tmpDir, badSymlink);
+ symLinkFile = new File(tmpDir, badSymlink);
}
finally {
// cleanup
@@ -151,6 +163,173 @@ public class TestContainerLaunch extends
}
}
+ // test the diagnostics are generated
+ @Test (timeout = 20000)
+ public void testInvalidSymlinkDiagnostics() throws IOException {
+
+ File shellFile = null;
+ File tempFile = null;
+ String symLink = Shell.WINDOWS ? "test.cmd" :
+ "test";
+ File symLinkFile = null;
+
+ try {
+ shellFile = Shell.appendScriptExtension(tmpDir, "hello");
+ tempFile = Shell.appendScriptExtension(tmpDir, "temp");
+ String timeoutCommand = Shell.WINDOWS ? "@echo \"hello\"" :
+ "echo \"hello\"";
+ PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
+ FileUtil.setExecutable(shellFile, true);
+ writer.println(timeoutCommand);
+ writer.close();
+
+ Map<Path, List<String>> resources =
+ new HashMap<Path, List<String>>();
+ //This is an invalid path and should throw exception because of No such file.
+ Path invalidPath = new Path(shellFile.getAbsolutePath()+"randomPath");
+ resources.put(invalidPath, Arrays.asList(symLink));
+ FileOutputStream fos = new FileOutputStream(tempFile);
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> commands = new ArrayList<String>();
+ if (Shell.WINDOWS) {
+ commands.add("cmd");
+ commands.add("/c");
+ commands.add("\"" + symLink + "\"");
+ } else {
+ commands.add("/bin/sh ./\\\"" + symLink + "\\\"");
+ }
+ ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
+ fos.flush();
+ fos.close();
+ FileUtil.setExecutable(tempFile, true);
+
+ Shell.ShellCommandExecutor shexc
+ = new Shell.ShellCommandExecutor(new String[]{tempFile.getAbsolutePath()}, tmpDir);
+ String diagnostics = null;
+ try {
+ shexc.execute();
+ Assert.fail("Should catch exception");
+ } catch(ExitCodeException e){
+ diagnostics = e.getMessage();
+ }
+ Assert.assertNotNull(diagnostics);
+ Assert.assertTrue(shexc.getExitCode() != 0);
+ symLinkFile = new File(tmpDir, symLink);
+ }
+ finally {
+ // cleanup
+ if (shellFile != null
+ && shellFile.exists()) {
+ shellFile.delete();
+ }
+ if (tempFile != null
+ && tempFile.exists()) {
+ tempFile.delete();
+ }
+ if (symLinkFile != null
+ && symLinkFile.exists()) {
+ symLinkFile.delete();
+ }
+ }
+ }
+
+ @Test (timeout = 20000)
+ public void testInvalidEnvSyntaxDiagnostics() throws IOException {
+
+ File shellFile = null;
+ try {
+ shellFile = Shell.appendScriptExtension(tmpDir, "hello");
+ String timeoutCommand = Shell.WINDOWS ? "@echo \"hello\"" :
+ "echo \"hello\"";
+ PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
+ FileUtil.setExecutable(shellFile, true);
+ writer.println(timeoutCommand);
+ writer.close();
+ Map<Path, List<String>> resources =
+ new HashMap<Path, List<String>>();
+ FileOutputStream fos = new FileOutputStream(shellFile);
+
+ Map<String, String> env = new HashMap<String, String>();
+ // invalid env
+ env.put(
+ "APPLICATION_WORKFLOW_CONTEXT", "{\"workflowId\":\"609f91c5cd83\"," +
+ "\"workflowName\":\"\n\ninsert table " +
+ "\npartition (cd_education_status)\nselect cd_demo_sk, cd_gender, " );
+ List<String> commands = new ArrayList<String>();
+ ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
+ fos.flush();
+ fos.close();
+
+ Shell.ShellCommandExecutor shexc
+ = new Shell.ShellCommandExecutor(new String[]{shellFile.getAbsolutePath()}, tmpDir);
+ String diagnostics = null;
+ try {
+ shexc.execute();
+ Assert.fail("Should catch exception");
+ } catch(ExitCodeException e){
+ diagnostics = e.getMessage();
+ }
+ Assert.assertTrue(diagnostics.contains("command not found"));
+ Assert.assertTrue(shexc.getExitCode() != 0);
+ }
+ finally {
+ // cleanup
+ if (shellFile != null
+ && shellFile.exists()) {
+ shellFile.delete();
+ }
+ }
+ }
+
+ @Test (timeout = 20000)
+ public void testContainerLaunchStdoutAndStderrDiagnostics() throws IOException {
+
+ File shellFile = null;
+ try {
+ shellFile = Shell.appendScriptExtension(tmpDir, "hello");
+ // echo "hello" to stdout and "error" to stderr and exit code with 2;
+ String command = Shell.WINDOWS ? "@echo \"hello\"; @echo \"error\" 1>&2; exit 2;" :
+ "echo \"hello\"; echo \"error\" 1>&2; exit 2;";
+ PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
+ FileUtil.setExecutable(shellFile, true);
+ writer.println(command);
+ writer.close();
+ Map<Path, List<String>> resources =
+ new HashMap<Path, List<String>>();
+ FileOutputStream fos = new FileOutputStream(shellFile);
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> commands = new ArrayList<String>();
+ commands.add(command);
+ ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
+ fos.flush();
+ fos.close();
+
+ Shell.ShellCommandExecutor shexc
+ = new Shell.ShellCommandExecutor(new String[]{shellFile.getAbsolutePath()}, tmpDir);
+ String diagnostics = null;
+ try {
+ shexc.execute();
+ Assert.fail("Should catch exception");
+ } catch(ExitCodeException e){
+ diagnostics = e.getMessage();
+ }
+ // test stderr
+ Assert.assertTrue(diagnostics.contains("error"));
+ // test stdout
+ Assert.assertTrue(shexc.getOutput().contains("hello"));
+ Assert.assertTrue(shexc.getExitCode() == 2);
+ }
+ finally {
+ // cleanup
+ if (shellFile != null
+ && shellFile.exists()) {
+ shellFile.delete();
+ }
+ }
+ }
+
/**
* See if environment variable is forwarded using sanitizeEnv.
* @throws Exception
@@ -231,10 +410,14 @@ public class TestContainerLaunch extends
// set up the rest of the container
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
- StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(createContainerToken(cId));
- containerManager.startContainer(startRequest);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ createContainerToken(cId));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
int timeoutSecs = 0;
while (!processStartFile.exists() && timeoutSecs++ < 20) {
@@ -297,18 +480,20 @@ public class TestContainerLaunch extends
Assert.assertTrue("Process is not alive!",
DefaultContainerExecutor.containerIsAlive(pid));
- StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
- stopRequest.setContainerId(cId);
- containerManager.stopContainer(stopRequest);
+ // Now test the stop functionality.
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(cId);
+ StopContainersRequest stopRequest =
+ StopContainersRequest.newInstance(containerIds);
+ containerManager.stopContainers(stopRequest);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
- GetContainerStatusRequest gcsRequest =
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
- gcsRequest.setContainerId(cId);
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
- containerManager.getContainerStatus(gcsRequest).getStatus();
+ containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
ExitCode.TERMINATED.getExitCode();
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
@@ -376,11 +561,15 @@ public class TestContainerLaunch extends
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
Token containerToken = createContainerToken(cId);
- StartContainerRequest startRequest =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(containerToken);
- containerManager.startContainer(startRequest);
+
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ containerToken);
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
int timeoutSecs = 0;
while (!processStartFile.exists() && timeoutSecs++ < 20) {
@@ -391,21 +580,25 @@ public class TestContainerLaunch extends
processStartFile.exists());
// Now test the stop functionality.
- StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
- stopRequest.setContainerId(cId);
- containerManager.stopContainer(stopRequest);
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(cId);
+ StopContainersRequest stopRequest =
+ StopContainersRequest.newInstance(containerIds);
+ containerManager.stopContainers(stopRequest);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
// container stop sends a sigterm followed by a sigkill
- GetContainerStatusRequest gcsRequest =
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
- gcsRequest.setContainerId(cId);
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
+
ContainerStatus containerStatus =
- containerManager.getContainerStatus(gcsRequest).getStatus();
- Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(),
- containerStatus.getExitStatus());
+ containerManager.getContainerStatuses(gcsRequest)
+ .getContainerStatuses().get(0);
+ int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
+ ExitCode.TERMINATED.getExitCode();
+ Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
// Now verify the contents of the file. Script generates a message when it
// receives a sigterm so we look for that. We cannot perform this check on
@@ -435,6 +628,32 @@ public class TestContainerLaunch extends
}
}
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testCallFailureWithNullLocalizedResources() {
+ Container container = mock(Container.class);
+ when(container.getContainerId()).thenReturn(ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(
+ System.currentTimeMillis(), 1), 1), 1));
+ ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
+ when(clc.getCommands()).thenReturn(Collections.<String>emptyList());
+ when(container.getLaunchContext()).thenReturn(clc);
+ when(container.getLocalizedResources()).thenReturn(null);
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ EventHandler eventHandler = new EventHandler() {
+ public void handle(Event event) {
+ Assert.assertTrue(event instanceof ContainerExitEvent);
+ ContainerExitEvent exitEvent = (ContainerExitEvent) event;
+ Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ exitEvent.getType());
+ }
+ };
+ when(dispatcher.getEventHandler()).thenReturn(eventHandler);
+ ContainerLaunch launch = new ContainerLaunch(context, new Configuration(),
+ dispatcher, exec, null, container, dirsHandler);
+ launch.call();
+ }
+
protected Token createContainerToken(ContainerId cId) throws InvalidToken {
Resource r = BuilderUtils.newResource(1024, 1);
ContainerTokenIdentifier containerTokenIdentifier =
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalResourceStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalResourceStatus.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalResourceStatus.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalResourceStatus.java Mon Aug 12 21:25:49 2013
@@ -18,8 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.server.api.records.SerializedException;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Mon Aug 12 21:25:49 2013
@@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -118,8 +119,9 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -132,31 +134,44 @@ public class TestResourceLocalizationSer
static final Path basedir =
new Path("target", TestResourceLocalizationService.class.getName());
static Server mockServer;
+
+ private Configuration conf;
+ private AbstractFileSystem spylfs;
+ private FileContext lfs;
@BeforeClass
- public static void setup() {
+ public static void setupClass() {
mockServer = mock(Server.class);
doReturn(new InetSocketAddress(123)).when(mockServer).getListenerAddress();
}
+
+ @Before
+ public void setup() throws IOException {
+ conf = new Configuration();
+ spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ lfs = FileContext.getFileContext(spylfs, conf);
+ doNothing().when(spylfs).mkdir(
+ isA(Path.class), isA(FsPermission.class), anyBoolean());
+ String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+ }
+
+ @After
+ public void cleanup() {
+ conf = null;
+ }
@Test
public void testLocalizationInit() throws Exception {
- final Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(new Configuration());
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = spy(new DeletionService(exec));
- delService.init(new Configuration());
+ delService.init(conf);
delService.start();
- AbstractFileSystem spylfs =
- spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
- FileContext lfs = FileContext.getFileContext(spylfs, conf);
- doNothing().when(spylfs).mkdir(
- isA(Path.class), isA(FsPermission.class), anyBoolean());
-
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
@@ -164,6 +179,7 @@ public class TestResourceLocalizationSer
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+
LocalDirsHandlerService diskhandler = new LocalDirsHandlerService();
diskhandler.init(conf);
@@ -204,13 +220,6 @@ public class TestResourceLocalizationSer
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testResourceRelease() throws Exception {
- Configuration conf = new YarnConfiguration();
- AbstractFileSystem spylfs =
- spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
- final FileContext lfs = FileContext.getFileContext(spylfs, conf);
- doNothing().when(spylfs).mkdir(
- isA(Path.class), isA(FsPermission.class), anyBoolean());
-
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
@@ -218,8 +227,7 @@ public class TestResourceLocalizationSer
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
- String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
- conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+
LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
@@ -394,13 +402,6 @@ public class TestResourceLocalizationSer
@Test( timeout = 10000)
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
- Configuration conf = new YarnConfiguration();
- AbstractFileSystem spylfs =
- spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
- final FileContext lfs = FileContext.getFileContext(spylfs, conf);
- doNothing().when(spylfs).mkdir(
- isA(Path.class), isA(FsPermission.class), anyBoolean());
-
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
// Making sure that we have only one local disk so that it will only be
@@ -413,8 +414,6 @@ public class TestResourceLocalizationSer
// Adding configuration to make sure there is only one file per
// directory
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
- String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
- conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -589,12 +588,6 @@ public class TestResourceLocalizationSer
@Test(timeout=20000)
@SuppressWarnings("unchecked") // mocked generics
public void testFailedPublicResource() throws Exception {
- Configuration conf = new YarnConfiguration();
- AbstractFileSystem spylfs =
- spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
- final FileContext lfs = FileContext.getFileContext(spylfs, conf);
- doNothing().when(spylfs).mkdir(
- isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
@@ -602,8 +595,6 @@ public class TestResourceLocalizationSer
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
- String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
- conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
DrainDispatcher dispatcher = new DrainDispatcher();
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
@@ -701,15 +692,6 @@ public class TestResourceLocalizationSer
String user = "testuser";
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
- // mocked Resource Localization Service
- Configuration conf = new Configuration();
- AbstractFileSystem spylfs =
- spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
- final FileContext lfs = FileContext.getFileContext(spylfs, conf);
- // We don't want files to be created
- doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
- anyBoolean());
-
// creating one local directory
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
@@ -718,10 +700,6 @@ public class TestResourceLocalizationSer
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
- // setting log directory.
- String logDir =
- lfs.makeQualified(new Path(basedir, "logdir ")).toString();
- conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
localDirHandler.init(conf);
@@ -865,15 +843,6 @@ public class TestResourceLocalizationSer
String user = "testuser";
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
- // mocked Resource Localization Service
- Configuration conf = new Configuration();
- AbstractFileSystem spylfs =
- spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
- final FileContext lfs = FileContext.getFileContext(spylfs, conf);
- // We don't want files to be created
- doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
- anyBoolean());
-
// creating one local directory
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
@@ -882,10 +851,6 @@ public class TestResourceLocalizationSer
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
- // setting log directory.
- String logDir =
- lfs.makeQualified(new Path(basedir, "logdir ")).toString();
- conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
localDirHandler.init(conf);
@@ -1013,8 +978,8 @@ public class TestResourceLocalizationSer
String localizerId, LocalResourceRequest req) {
LocalizerStatus status = createLocalizerStatus(localizerId);
LocalResourceStatus resourceStatus = new LocalResourceStatusPBImpl();
- resourceStatus.setException(YarnServerBuilderUtils
- .newSerializedException(new YarnException("test")));
+ resourceStatus.setException(SerializedException
+ .newInstance(new YarnException("test")));
resourceStatus.setStatus(ResourceStatusType.FETCH_FAILURE);
resourceStatus.setResource(req);
status.addResourceStatus(resourceStatus);
@@ -1043,16 +1008,6 @@ public class TestResourceLocalizationSer
DrainDispatcher dispatcher1 = null;
String user = "testuser";
try {
- // Setting up ResourceLocalization service.
- Configuration conf = new Configuration();
- dispatcher1 = new DrainDispatcher();
- AbstractFileSystem spylfs =
- spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
- final FileContext lfs = FileContext.getFileContext(spylfs, conf);
- // We don't want files to be created
- doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
- anyBoolean());
-
// creating one local directory
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
@@ -1061,13 +1016,10 @@ public class TestResourceLocalizationSer
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
- // setting log directory.
- String logDir =
- lfs.makeQualified(new Path(basedir, "logdir ")).toString();
- conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
// Registering event handlers
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher1 = new DrainDispatcher();
dispatcher1.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher1.register(ContainerEventType.class, containerBus);
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Mon Aug 12 21:25:49 2013
@@ -63,6 +63,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -809,14 +810,18 @@ public class TestLogAggregationService e
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
- StartContainerRequest startRequest =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(TestContainerManager.createContainerToken(
- cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
- context.getContainerTokenSecretManager()));
- this.containerManager.startContainer(startRequest);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ TestContainerManager.createContainerToken(
+ cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ this.containerManager.startContainers(allRequests);
+
BaseContainerManagerTest.waitForContainerState(this.containerManager,
cId, ContainerState.COMPLETE);
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Mon Aug 12 21:25:49 2013
@@ -40,8 +40,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -227,9 +228,6 @@ public class TestContainersMonitor exten
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
Resource r = BuilderUtils.newResource(8 * 1024 * 1024, 1);
- StartContainerRequest startRequest =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
ContainerTokenIdentifier containerIdentifier =
new ContainerTokenIdentifier(cId, context.getNodeId().toString(), user,
r, System.currentTimeMillis() + 120000, 123, DUMMY_RM_IDENTIFIER);
@@ -237,8 +235,14 @@ public class TestContainersMonitor exten
BuilderUtils.newContainerToken(context.getNodeId(),
containerManager.getContext().getContainerTokenSecretManager()
.createPassword(containerIdentifier), containerIdentifier);
- startRequest.setContainerToken(containerToken);
- containerManager.startContainer(startRequest);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ containerToken);
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
int timeoutSecs = 0;
while (!processStartFile.exists() && timeoutSecs++ < 20) {
@@ -260,11 +264,12 @@ public class TestContainersMonitor exten
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE, 60);
- GetContainerStatusRequest gcsRequest =
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
- gcsRequest.setContainerId(cId);
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(cId);
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
- containerManager.getContainerStatus(gcsRequest).getStatus();
+ containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
containerStatus.getExitStatus());
String expectedMsgPattern =
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
<parent>
<artifactId>hadoop-yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
<name>hadoop-yarn-server-resourcemanager</name>
<properties>
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Mon Aug 12 21:25:49 2013
@@ -33,8 +33,10 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@@ -58,11 +60,16 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@@ -72,8 +79,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceBlacklistRequestException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -102,7 +107,6 @@ public class ApplicationMasterService ex
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.resync.setAMCommand(AMCommand.AM_RESYNC);
-// this.reboot.containers = new ArrayList<Container>();
this.rmContext = rmContext;
}
@@ -116,10 +120,16 @@ public class ApplicationMasterService ex
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+ Configuration serverConf = conf;
+ // If the auth is not-simple, enforce it to be token-based.
+ serverConf = new Configuration(conf);
+ serverConf.set(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ SaslRpcServer.AuthMethod.TOKEN.toString());
this.server =
rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress,
- conf, this.rmContext.getAMRMTokenSecretManager(),
- conf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+ serverConf, this.rmContext.getAMRMTokenSecretManager(),
+ serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
// Enable service authorization?
@@ -141,33 +151,61 @@ public class ApplicationMasterService ex
return this.bindAddress;
}
- private void authorizeRequest(ApplicationAttemptId appAttemptID)
- throws YarnException {
-
- if (!UserGroupInformation.isSecurityEnabled()) {
- return;
+ // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
+ // currently sets only the required id, but iterate through anyways just to be
+ // sure.
+ private AMRMTokenIdentifier selectAMRMTokenIdentifier(
+ UserGroupInformation remoteUgi) throws IOException {
+ AMRMTokenIdentifier result = null;
+ Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+ for (TokenIdentifier tokenId : tokenIds) {
+ if (tokenId instanceof AMRMTokenIdentifier) {
+ result = (AMRMTokenIdentifier) tokenId;
+ break;
+ }
}
- String appAttemptIDStr = appAttemptID.toString();
+ return result;
+ }
+
+ private ApplicationAttemptId authorizeRequest()
+ throws YarnException {
UserGroupInformation remoteUgi;
try {
remoteUgi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
- String msg = "Cannot obtain the user-name for ApplicationAttemptID: "
- + appAttemptIDStr + ". Got exception: "
- + StringUtils.stringifyException(e);
+ String msg =
+ "Cannot obtain the user-name for authorizing ApplicationMaster. "
+ + "Got exception: " + StringUtils.stringifyException(e);
LOG.warn(msg);
throw RPCUtil.getRemoteException(msg);
}
- if (!remoteUgi.getUserName().equals(appAttemptIDStr)) {
- String msg = "Unauthorized request from ApplicationMaster. "
- + "Expected ApplicationAttemptID: " + remoteUgi.getUserName()
- + " Found: " + appAttemptIDStr;
- LOG.warn(msg);
- throw RPCUtil.getRemoteException(msg);
+ boolean tokenFound = false;
+ String message = "";
+ AMRMTokenIdentifier appTokenIdentifier = null;
+ try {
+ appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
+ if (appTokenIdentifier == null) {
+ tokenFound = false;
+ message = "No AMRMToken found for user " + remoteUgi.getUserName();
+ } else {
+ tokenFound = true;
+ }
+ } catch (IOException e) {
+ tokenFound = false;
+ message =
+ "Got exception while looking for AMRMToken for user "
+ + remoteUgi.getUserName();
}
+
+ if (!tokenFound) {
+ LOG.warn(message);
+ throw RPCUtil.getRemoteException(message);
+ }
+
+ return appTokenIdentifier.getApplicationAttemptId();
}
@Override
@@ -175,9 +213,7 @@ public class ApplicationMasterService ex
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
- ApplicationAttemptId applicationAttemptId = request
- .getApplicationAttemptId();
- authorizeRequest(applicationAttemptId);
+ ApplicationAttemptId applicationAttemptId = authorizeRequest();
ApplicationId appID = applicationAttemptId.getApplicationId();
AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
@@ -247,9 +283,7 @@ public class ApplicationMasterService ex
FinishApplicationMasterRequest request) throws YarnException,
IOException {
- ApplicationAttemptId applicationAttemptId = request
- .getApplicationAttemptId();
- authorizeRequest(applicationAttemptId);
+ ApplicationAttemptId applicationAttemptId = authorizeRequest();
AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) {
@@ -297,8 +331,7 @@ public class ApplicationMasterService ex
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
- ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
- authorizeRequest(appAttemptId);
+ ApplicationAttemptId appAttemptId = authorizeRequest();
this.amLivelinessMonitor.receivedPing(appAttemptId);
@@ -354,7 +387,7 @@ public class ApplicationMasterService ex
// sanity check
try {
- SchedulerUtils.validateResourceRequests(ask,
+ RMServerUtils.validateResourceRequests(ask,
rScheduler.getMaximumResourceCapability());
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
@@ -362,12 +395,19 @@ public class ApplicationMasterService ex
}
try {
- SchedulerUtils.validateBlacklistRequest(blacklistRequest);
+ RMServerUtils.validateBlacklistRequest(blacklistRequest);
} catch (InvalidResourceBlacklistRequestException e) {
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
throw e;
}
+ try {
+ RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
+ } catch (InvalidContainerReleaseException e) {
+ LOG.warn("Invalid container release by application " + appAttemptId, e);
+ throw e;
+ }
+
// Send new requests to appAttempt.
Allocation allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Mon Aug 12 21:25:49 2013
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -218,7 +219,7 @@ public class ClientRMService extends Abs
/**
* It gives response which includes application report if the application
- * present otherwise gives response with application report as null.
+ * present otherwise throws ApplicationNotFoundException.
*/
@Override
public GetApplicationReportResponse getApplicationReport(
@@ -235,10 +236,10 @@ public class ClientRMService extends Abs
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
- // If the RM doesn't have the application, provide the response with
- // application report as null and let the clients to handle.
- return recordFactory
- .newRecordInstance(GetApplicationReportResponse.class);
+ // If the RM doesn't have the application, throw
+ // ApplicationNotFoundException and let client to handle.
+ throw new ApplicationNotFoundException("Application with id '"
+ + applicationId + "' doesn't exist in RM.");
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Mon Aug 12 21:25:49 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -48,7 +49,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Mon Aug 12 21:25:49 2013
@@ -57,7 +57,7 @@ public class RMContextImpl implements RM
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private final DelegationTokenRenewer tokenRenewer;
- private final AMRMTokenSecretManager appTokenSecretManager;
+ private final AMRMTokenSecretManager amRMTokenSecretManager;
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
@@ -68,7 +68,7 @@ public class RMContextImpl implements RM
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
DelegationTokenRenewer tokenRenewer,
- AMRMTokenSecretManager appTokenSecretManager,
+ AMRMTokenSecretManager amRMTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
@@ -78,7 +78,7 @@ public class RMContextImpl implements RM
this.amLivelinessMonitor = amLivelinessMonitor;
this.amFinishingMonitor = amFinishingMonitor;
this.tokenRenewer = tokenRenewer;
- this.appTokenSecretManager = appTokenSecretManager;
+ this.amRMTokenSecretManager = amRMTokenSecretManager;
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
@@ -156,7 +156,7 @@ public class RMContextImpl implements RM
@Override
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
- return this.appTokenSecretManager;
+ return this.amRMTokenSecretManager;
}
@Override
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java Mon Aug 12 21:25:49 2013
@@ -22,8 +22,17 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
/**
* Utility methods to aid serving RM data through the REST and RPC APIs
@@ -55,4 +64,52 @@ public class RMServerUtils {
}
return results;
}
+
+ /**
+ * Utility method to validate a list resource requests, by insuring that the
+ * requested memory/vcore is non-negative and not greater than max
+ */
+ public static void validateResourceRequests(List<ResourceRequest> ask,
+ Resource maximumResource) throws InvalidResourceRequestException {
+ for (ResourceRequest resReq : ask) {
+ SchedulerUtils.validateResourceRequest(resReq, maximumResource);
+ }
+ }
+
+ /*
+ * @throw <code>InvalidResourceBlacklistRequestException </code> if the
+ * resource is not able to be added to the blacklist.
+ */
+ public static void validateBlacklistRequest(ResourceBlacklistRequest blacklistRequest)
+ throws InvalidResourceBlacklistRequestException {
+ if (blacklistRequest != null) {
+ List<String> plus = blacklistRequest.getBlacklistAdditions();
+ if (plus != null && plus.contains(ResourceRequest.ANY)) {
+ throw new InvalidResourceBlacklistRequestException(
+ "Cannot add " + ResourceRequest.ANY + " to the blacklist!");
+ }
+ }
+ }
+
+ /**
+ * It will validate to make sure all the containers belong to correct
+ * application attempt id. If not then it will throw
+ * {@link InvalidContainerReleaseException}
+ * @param containerReleaseList containers to be released as requested by
+ * application master.
+ * @param appAttemptId Application attempt Id
+ * @throws InvalidContainerReleaseException
+ */
+ public static void
+ validateContainerReleaseRequest(List<ContainerId> containerReleaseList,
+ ApplicationAttemptId appAttemptId)
+ throws InvalidContainerReleaseException {
+ for (ContainerId cId : containerReleaseList) {
+ if (!appAttemptId.equals(cId.getApplicationAttemptId())) {
+ throw new InvalidContainerReleaseException("Cannot release container : "
+ + cId.toString() + " not belonging to this application attempt : "
+ + appAttemptId);
+ }
+ }
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Mon Aug 12 21:25:49 2013
@@ -253,7 +253,7 @@ public class ResourceTrackerService exte
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null) {
/* node does not exist */
- String message = "Node not found rebooting " + remoteNodeStatus.getNodeId();
+ String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();
LOG.info(message);
resync.setDiagnosticsMessage(message);
return resync;
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Mon Aug 12 21:25:49 2013
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -33,11 +35,15 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
@@ -46,11 +52,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* The launch of the AM itself.
*/
@@ -69,8 +74,6 @@ public class AMLauncher implements Runna
private final RMAppAttempt application;
private final Configuration conf;
- private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
private final AMLauncherEventType eventType;
private final RMContext rmContext;
private final Container masterContainer;
@@ -103,22 +106,42 @@ public class AMLauncher implements Runna
+ " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
- StartContainerRequest request =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- request.setContainerLaunchContext(launchContext);
- request.setContainerToken(masterContainer.getContainerToken());
- containerMgrProxy.startContainer(request);
- LOG.info("Done launching container " + masterContainer
- + " for AM " + application.getAppAttemptId());
+
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(launchContext,
+ masterContainer.getContainerToken());
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+
+ StartContainersResponse response =
+ containerMgrProxy.startContainers(allRequests);
+ if (response.getFailedRequests() != null
+ && response.getFailedRequests().containsKey(masterContainerID)) {
+ Throwable t =
+ response.getFailedRequests().get(masterContainerID).deSerialize();
+ parseAndThrowException(t);
+ } else {
+ LOG.info("Done launching container " + masterContainer + " for AM "
+ + application.getAppAttemptId());
+ }
}
private void cleanup() throws IOException, YarnException {
connect();
ContainerId containerId = masterContainer.getId();
- StopContainerRequest stopRequest =
- recordFactory.newRecordInstance(StopContainerRequest.class);
- stopRequest.setContainerId(containerId);
- containerMgrProxy.stopContainer(stopRequest);
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(containerId);
+ StopContainersRequest stopRequest =
+ StopContainersRequest.newInstance(containerIds);
+ StopContainersResponse response =
+ containerMgrProxy.stopContainers(stopRequest);
+ if (response.getFailedRequests() != null
+ && response.getFailedRequests().containsKey(containerId)) {
+ Throwable t = response.getFailedRequests().get(containerId).deSerialize();
+ parseAndThrowException(t);
+ }
}
// Protected. For tests.
@@ -193,30 +216,28 @@ public class AMLauncher implements Runna
environment.put(ApplicationConstants.MAX_APP_ATTEMPTS_ENV,
String.valueOf(rmContext.getRMApps().get(
applicationId).getMaxAppAttempts()));
-
- if (UserGroupInformation.isSecurityEnabled()) {
- // TODO: Security enabled/disabled info should come from RM.
-
- Credentials credentials = new Credentials();
-
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- if (container.getTokens() != null) {
- // TODO: Don't do this kind of checks everywhere.
- dibb.reset(container.getTokens());
- credentials.readTokenStorageStream(dibb);
- }
- // Add application token
- Token<AMRMTokenIdentifier> amrmToken =
- application.getAMRMToken();
- if(amrmToken != null) {
- credentials.addToken(amrmToken.getService(), amrmToken);
- }
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- container.setTokens(ByteBuffer.wrap(dob.getData(), 0,
- dob.getLength()));
+ Credentials credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ if (container.getTokens() != null) {
+ // TODO: Don't do this kind of checks everywhere.
+ dibb.reset(container.getTokens());
+ credentials.readTokenStorageStream(dibb);
}
+
+ // Add AMRMToken
+ Token<AMRMTokenIdentifier> amrmToken = getAMRMToken();
+ if (amrmToken != null) {
+ credentials.addToken(amrmToken.getService(), amrmToken);
+ }
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+ }
+
+ @VisibleForTesting
+ protected Token<AMRMTokenIdentifier> getAMRMToken() {
+ return application.getAMRMToken();
}
@SuppressWarnings("unchecked")
@@ -257,4 +278,15 @@ public class AMLauncher implements Runna
break;
}
}
+
+ private void parseAndThrowException(Throwable t) throws YarnException,
+ IOException {
+ if (t instanceof YarnException) {
+ throw (YarnException) t;
+ } else if (t instanceof InvalidToken) {
+ throw (InvalidToken) t;
+ } else {
+ throw (IOException) t;
+ }
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Mon Aug 12 21:25:49 2013
@@ -111,70 +111,66 @@ public class FileSystemRMStateStore exte
private void loadRMAppState(RMState rmState) throws Exception {
try {
- FileStatus[] childNodes = fs.listStatus(rmAppRoot);
List<ApplicationAttemptState> attempts =
- new ArrayList<ApplicationAttemptState>();
- for(FileStatus childNodeStatus : childNodes) {
- assert childNodeStatus.isFile();
- String childNodeName = childNodeStatus.getPath().getName();
- Path childNodePath = getNodePath(rmAppRoot, childNodeName);
- byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
- if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
- // application
- LOG.info("Loading application from node: " + childNodeName);
- ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
- ApplicationStateDataPBImpl appStateData =
- new ApplicationStateDataPBImpl(
- ApplicationStateDataProto.parseFrom(childData));
- ApplicationState appState = new ApplicationState(
- appStateData.getSubmitTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser());
- // assert child node name is same as actual applicationId
- assert appId.equals(appState.context.getApplicationId());
- rmState.appState.put(appId, appState);
- } else if(childNodeName.startsWith(
- ApplicationAttemptId.appAttemptIdStrPrefix)) {
- // attempt
- LOG.info("Loading application attempt from node: " + childNodeName);
- ApplicationAttemptId attemptId =
- ConverterUtils.toApplicationAttemptId(childNodeName);
- ApplicationAttemptStateDataPBImpl attemptStateData =
- new ApplicationAttemptStateDataPBImpl(
+ new ArrayList<ApplicationAttemptState>();
+
+ for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
+ for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ byte[] childData =
+ readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
+ if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+ // application
+ LOG.info("Loading application from node: " + childNodeName);
+ ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
+ ApplicationStateDataPBImpl appStateData =
+ new ApplicationStateDataPBImpl(
+ ApplicationStateDataProto.parseFrom(childData));
+ ApplicationState appState =
+ new ApplicationState(appStateData.getSubmitTime(),
+ appStateData.getApplicationSubmissionContext(),
+ appStateData.getUser());
+ // assert child node name is same as actual applicationId
+ assert appId.equals(appState.context.getApplicationId());
+ rmState.appState.put(appId, appState);
+ } else if (childNodeName
+ .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+ // attempt
+ LOG.info("Loading application attempt from node: " + childNodeName);
+ ApplicationAttemptId attemptId =
+ ConverterUtils.toApplicationAttemptId(childNodeName);
+ ApplicationAttemptStateDataPBImpl attemptStateData =
+ new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
- Credentials credentials = null;
- if(attemptStateData.getAppAttemptTokens() != null){
- credentials = new Credentials();
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- dibb.reset(attemptStateData.getAppAttemptTokens());
- credentials.readTokenStorageStream(dibb);
+ Credentials credentials = null;
+ if (attemptStateData.getAppAttemptTokens() != null) {
+ credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(attemptStateData.getAppAttemptTokens());
+ credentials.readTokenStorageStream(dibb);
+ }
+ ApplicationAttemptState attemptState =
+ new ApplicationAttemptState(attemptId,
+ attemptStateData.getMasterContainer(), credentials);
+
+ // assert child node name is same as application attempt id
+ assert attemptId.equals(attemptState.getAttemptId());
+ attempts.add(attemptState);
+ } else {
+ LOG.info("Unknown child node with name: " + childNodeName);
}
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(attemptId,
- attemptStateData.getMasterContainer(), credentials);
-
- // assert child node name is same as application attempt id
- assert attemptId.equals(attemptState.getAttemptId());
- attempts.add(attemptState);
- } else {
- LOG.info("Unknown child node with name: " + childNodeName);
}
}
- // go through all attempts and add them to their apps
- for(ApplicationAttemptState attemptState : attempts) {
+ // go through all attempts and add them to their apps, Ideally, each
+ // attempt node must have a corresponding app node, because remove
+ // directory operation remove both at the same time
+ for (ApplicationAttemptState attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationState appState = rmState.appState.get(appId);
- if(appState != null) {
- appState.attempts.put(attemptState.getAttemptId(), attemptState);
- } else {
- // the application node may have been removed when the application
- // completed but the RM might have stopped before it could remove the
- // application attempt nodes
- LOG.info("Application node not found for attempt: "
- + attemptState.getAttemptId());
- deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString()));
- }
+ assert appState != null;
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
} catch (Exception e) {
LOG.error("Failed to load state.", e);
@@ -188,6 +184,12 @@ public class FileSystemRMStateStore exte
for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
+ if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
+ rmState.rmSecretManagerState.dtSequenceNumber =
+ Integer.parseInt(childNodeName.split("_")[1]);
+ continue;
+ }
+
Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
ByteArrayInputStream is = new ByteArrayInputStream(childData);
@@ -202,10 +204,7 @@ public class FileSystemRMStateStore exte
long renewDate = fsIn.readLong();
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate);
- } else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
- rmState.rmSecretManagerState.dtSequenceNumber =
- Integer.parseInt(childNodeName.split("_")[1]);
- }else {
+ } else {
LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
}
fsIn.close();
@@ -215,7 +214,9 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void storeApplicationState(String appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
- Path nodeCreatePath = getNodePath(rmAppRoot, appId);
+ Path appDirPath = getAppDir(rmAppRoot, appId);
+ fs.mkdirs(appDirPath);
+ Path nodeCreatePath = getNodePath(appDirPath, appId);
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -232,7 +233,11 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception {
- Path nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+ ApplicationAttemptId appAttemptId =
+ ConverterUtils.toApplicationAttemptId(attemptId);
+ Path appDirPath =
+ getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
+ Path nodeCreatePath = getNodePath(appDirPath, attemptId);
LOG.info("Storing info for attempt: " + attemptId
+ " at: " + nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@@ -250,20 +255,9 @@ public class FileSystemRMStateStore exte
public synchronized void removeApplicationState(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
- Path nodeRemovePath = getNodePath(rmAppRoot, appId);
+ Path nodeRemovePath = getAppDir(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath);
- for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
- removeApplicationAttemptState(attemptId.toString());
- }
- }
-
- public synchronized void removeApplicationAttemptState(String attemptId)
- throws Exception {
- Path nodeRemovePath = getNodePath(rmAppRoot, attemptId);
- LOG.info("Removing info for attempt: " + attemptId
- + " at: " + nodeRemovePath);
- deleteFile(nodeRemovePath);
}
@Override
@@ -329,6 +323,10 @@ public class FileSystemRMStateStore exte
deleteFile(nodeCreatePath);
}
+ private Path getAppDir(Path root, String appId) {
+ return getNodePath(root, appId);
+ }
+
// FileSystem related code
private void deleteFile(Path deletePath) throws Exception {
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Mon Aug 12 21:25:49 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -43,9 +44,9 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
-import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -376,11 +377,16 @@ public abstract class RMStateStore {
protected abstract void removeApplicationState(ApplicationState appState)
throws Exception;
+ // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
+ // YARN-986
+ public static final Text AM_RM_TOKEN_SERVICE = new Text(
+ "AM_RM_TOKEN_SERVICE");
+
private Credentials getTokensFromAppAttempt(RMAppAttempt appAttempt) {
Credentials credentials = new Credentials();
Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
if(appToken != null){
- credentials.addToken(appToken.getService(), appToken);
+ credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
}
Token<ClientToAMTokenIdentifier> clientToAMToken =
appAttempt.getClientToAMToken();