You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by co...@apache.org on 2013/05/20 21:32:51 UTC
svn commit: r1484570 - in
/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Author: cos
Date: Mon May 20 19:32:51 2013
New Revision: 1484570
URL: http://svn.apache.org/r1484570
Log:
MAPREDUCE-5240 inside of FileOutputCommitter the initialized Credentials cache appears to be empty
Modified:
hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt?rev=1484570&r1=1484569&r2=1484570&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt Mon May 20 19:32:51 2013
@@ -28,6 +28,9 @@ Release 2.0.4-alpha - UNRELEASED
MAPREDUCE-5094. Disabled memory monitoring by default in MiniMRYarnCluster
to avoid some downstream tests failing. (Siddharth Seth via vinodkv)
+ MAPREDUCE-5240. inside of FileOutputCommitter the initialized Credentials
+ cache appears to be empty (vinodkv, rvs via cos)
+
Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1484570&r1=1484569&r2=1484570&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon May 20 19:32:51 2013
@@ -1214,7 +1214,7 @@ public class MRAppMaster extends Composi
Integer.parseInt(nodeHttpPortString), appSubmitTime);
ShutdownHookManager.get().addShutdownHook(
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
- YarnConfiguration conf = new YarnConfiguration(new JobConf());
+ JobConf conf = new JobConf(new YarnConfiguration());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
String jobUserName = System
.getenv(ApplicationConstants.Environment.USER.name());
@@ -1261,11 +1261,14 @@ public class MRAppMaster extends Composi
}
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
- final YarnConfiguration conf, String jobUserName) throws IOException,
+ final JobConf conf, String jobUserName) throws IOException,
InterruptedException {
UserGroupInformation.setConfiguration(conf);
+ Credentials credentials =
+ UserGroupInformation.getCurrentUser().getCredentials();
UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
+ conf.getCredentials().addAll(credentials);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1484570&r1=1484569&r2=1484570&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Mon May 20 19:32:51 2013
@@ -21,28 +21,51 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -50,13 +73,20 @@ import org.junit.Test;
public class TestMRAppMaster {
private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
static String stagingDir = "staging/";
+ private static FileContext localFS = null;
+ private static final File testDir = new File("target",
+ TestMRAppMaster.class.getName() + "-tmpDir").getAbsoluteFile();
@BeforeClass
- public static void setup() {
+ public static void setup() throws AccessControlException,
+ FileNotFoundException, IllegalArgumentException, IOException {
//Do not error out if metrics are inited multiple times
DefaultMetricsSystem.setMiniClusterMode(true);
File dir = new File(stagingDir);
stagingDir = dir.getAbsolutePath();
+ localFS = FileContext.getLocalFSFileContext();
+ localFS.delete(new Path(testDir.getAbsolutePath()), true);
+ testDir.mkdir();
}
@Before
@@ -81,7 +111,7 @@ public class TestMRAppMaster {
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis());
- YarnConfiguration conf = new YarnConfiguration();
+ JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
@@ -94,7 +124,7 @@ public class TestMRAppMaster {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
- YarnConfiguration conf = new YarnConfiguration();
+ JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
@@ -107,7 +137,7 @@ public class TestMRAppMaster {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), false);
+ System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -128,7 +158,7 @@ public class TestMRAppMaster {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
- YarnConfiguration conf = new YarnConfiguration();
+ JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
@@ -142,7 +172,7 @@ public class TestMRAppMaster {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), false);
+ System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -163,7 +193,7 @@ public class TestMRAppMaster {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
- YarnConfiguration conf = new YarnConfiguration();
+ JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
@@ -177,7 +207,7 @@ public class TestMRAppMaster {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), false);
+ System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -198,7 +228,7 @@ public class TestMRAppMaster {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
- YarnConfiguration conf = new YarnConfiguration();
+ JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
@@ -212,7 +242,7 @@ public class TestMRAppMaster {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
- System.currentTimeMillis(), false);
+ System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -228,39 +258,155 @@ public class TestMRAppMaster {
assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
appMaster.stop();
}
+
+ // A dirty hack to modify the env of the current JVM itself - Dirty, but
+ // should be okay for testing.
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private static void setNewEnvironmentHack(Map<String, String> newenv)
+ throws Exception {
+ try {
+ Class<?> cl = Class.forName("java.lang.ProcessEnvironment");
+ Field field = cl.getDeclaredField("theEnvironment");
+ field.setAccessible(true);
+ Map<String, String> env = (Map<String, String>) field.get(null);
+ env.clear();
+ env.putAll(newenv);
+ Field ciField = cl.getDeclaredField("theCaseInsensitiveEnvironment");
+ ciField.setAccessible(true);
+ Map<String, String> cienv = (Map<String, String>) ciField.get(null);
+ cienv.clear();
+ cienv.putAll(newenv);
+ } catch (NoSuchFieldException e) {
+ Class[] classes = Collections.class.getDeclaredClasses();
+ Map<String, String> env = System.getenv();
+ for (Class cl : classes) {
+ if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+ Field field = cl.getDeclaredField("m");
+ field.setAccessible(true);
+ Object obj = field.get(env);
+ Map<String, String> map = (Map<String, String>) obj;
+ map.clear();
+ map.putAll(newenv);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testMRAppMasterCredentials() throws Exception {
+
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+
+ // Simulate credentials passed to AM via client->RM->NM
+ Credentials credentials = new Credentials();
+ byte[] identifier = "MyIdentifier".getBytes();
+ byte[] password = "MyPassword".getBytes();
+ Text kind = new Text("MyTokenKind");
+ Text service = new Text("host:port");
+ Token<? extends TokenIdentifier> myToken =
+ new Token<TokenIdentifier>(identifier, password, kind, service);
+ Text tokenAlias = new Text("myToken");
+ credentials.addToken(tokenAlias, myToken);
+ Text keyAlias = new Text("mySecretKeyAlias");
+ credentials.addSecretKey(keyAlias, "mySecretKey".getBytes());
+ Token<? extends TokenIdentifier> storedToken =
+ credentials.getToken(tokenAlias);
+
+ JobConf conf = new JobConf();
+
+ Path tokenFilePath = new Path(testDir.getAbsolutePath(), "tokens-file");
+ Map<String, String> newEnv = new HashMap<String, String>();
+ newEnv.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, tokenFilePath
+ .toUri().getPath());
+ setNewEnvironmentHack(newEnv);
+ credentials.writeTokenStorageFile(tokenFilePath, conf);
+
+ ApplicationId appId = BuilderUtils.newApplicationId(12345, 56);
+ ApplicationAttemptId applicationAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 1);
+ ContainerId containerId =
+ BuilderUtils.newContainerId(applicationAttemptId, 546);
+ String userName = UserGroupInformation.getCurrentUser().getShortUserName();
+
+ // Create staging dir, so MRAppMaster doesn't barf.
+ File stagingDir =
+ new File(MRApps.getStagingAreaDir(conf, userName).toString());
+ stagingDir.mkdirs();
+
+ // Set login-user to null as that is how real world MRApp starts with.
+ // This is null is the reason why token-file is read by UGI.
+ UserGroupInformation.setLoginUser(null);
+
+ MRAppMasterTest appMaster =
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+ System.currentTimeMillis(), false, true);
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+
+ // Now validate the credentials
+ Credentials appMasterCreds = conf.getCredentials();
+ Assert.assertNotNull(appMasterCreds);
+ Assert.assertEquals(1, appMasterCreds.numberOfSecretKeys());
+ Assert.assertEquals(1, appMasterCreds.numberOfTokens());
+
+ // Validate the tokens
+ Token<? extends TokenIdentifier> usedToken =
+ appMasterCreds.getToken(tokenAlias);
+ Assert.assertNotNull(usedToken);
+ Assert.assertEquals(storedToken, usedToken);
+
+ // Validate the keys
+ byte[] usedKey = appMasterCreds.getSecretKey(keyAlias);
+ Assert.assertNotNull(usedKey);
+ Assert.assertEquals("mySecretKey", new String(usedKey));
+
+ // The credentials should also be added to conf so that OuputCommitter can
+ // access it
+ Credentials confCredentials = conf.getCredentials();
+ Assert.assertEquals(1, confCredentials.numberOfSecretKeys());
+ Assert.assertEquals(1, confCredentials.numberOfTokens());
+ Assert.assertEquals(storedToken, confCredentials.getToken(tokenAlias));
+ Assert.assertEquals("mySecretKey",
+ new String(confCredentials.getSecretKey(keyAlias)));
+ }
}
class MRAppMasterTest extends MRAppMaster {
Path stagingDirPath;
private Configuration conf;
- private boolean overrideInitAndStart;
+ private boolean overrideInit;
+ private boolean overrideStart;
ContainerAllocator mockContainerAllocator;
+ CommitterEventHandler mockCommitterEventHandler;
+ RMHeartbeatHandler mockRMHeartbeatHandler;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
long submitTime) {
- this(applicationAttemptId, containerId, host, port, httpPort, submitTime,
- true);
+ this(applicationAttemptId, containerId, host, port, httpPort,
+ submitTime, true, true);
}
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
- long submitTime, boolean overrideInitAndStart) {
+ long submitTime, boolean overrideInit, boolean overrideStart) {
super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
- this.overrideInitAndStart = overrideInitAndStart;
+ this.overrideInit = overrideInit;
+ this.overrideStart = overrideStart;
mockContainerAllocator = mock(ContainerAllocator.class);
+ mockCommitterEventHandler = mock(CommitterEventHandler.class);
+ mockRMHeartbeatHandler = mock(RMHeartbeatHandler.class);
}
@Override
public void init(Configuration conf) {
- if (overrideInitAndStart) {
- this.conf = conf;
- } else {
+ if (!overrideInit) {
super.init(conf);
}
+ this.conf = conf;
}
-
- @Override
+
+ @Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
try {
this.currentUser = UserGroupInformation.getCurrentUser();
@@ -268,7 +414,7 @@ class MRAppMasterTest extends MRAppMaste
throw new YarnException(e);
}
}
-
+
@Override
protected ContainerAllocator createContainerAllocator(
final ClientService clientService, final AppContext context) {
@@ -276,8 +422,19 @@ class MRAppMasterTest extends MRAppMaste
}
@Override
+ protected EventHandler<CommitterEvent> createCommitterEventHandler(
+ AppContext context, OutputCommitter committer) {
+ return mockCommitterEventHandler;
+ }
+
+ @Override
+ protected RMHeartbeatHandler getRMHeartbeatHandler() {
+ return mockRMHeartbeatHandler;
+ }
+
+ @Override
public void start() {
- if (overrideInitAndStart) {
+ if (overrideStart) {
try {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
stagingDirPath = MRApps.getStagingAreaDir(conf, user);