You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by co...@apache.org on 2013/05/20 21:32:51 UTC

svn commit: r1484570 - in /hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...

Author: cos
Date: Mon May 20 19:32:51 2013
New Revision: 1484570

URL: http://svn.apache.org/r1484570
Log:
MAPREDUCE-5240 inside of FileOutputCommitter the initialized Credentials cache appears to be empty

Modified:
    hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java

Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt?rev=1484570&r1=1484569&r2=1484570&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt Mon May 20 19:32:51 2013
@@ -28,6 +28,9 @@ Release 2.0.4-alpha - UNRELEASED
     MAPREDUCE-5094. Disabled memory monitoring by default in MiniMRYarnCluster
     to avoid some downstream tests failing. (Siddharth Seth via vinodkv)
 
+    MAPREDUCE-5240. inside of FileOutputCommitter the initialized Credentials
+    cache appears to be empty (vinodkv, rvs via cos)
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1484570&r1=1484569&r2=1484570&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon May 20 19:32:51 2013
@@ -1214,7 +1214,7 @@ public class MRAppMaster extends Composi
               Integer.parseInt(nodeHttpPortString), appSubmitTime);
       ShutdownHookManager.get().addShutdownHook(
         new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
-      YarnConfiguration conf = new YarnConfiguration(new JobConf());
+      JobConf conf = new JobConf(new YarnConfiguration());
       conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
       String jobUserName = System
           .getenv(ApplicationConstants.Environment.USER.name());
@@ -1261,11 +1261,14 @@ public class MRAppMaster extends Composi
   }
 
   protected static void initAndStartAppMaster(final MRAppMaster appMaster,
-      final YarnConfiguration conf, String jobUserName) throws IOException,
+      final JobConf conf, String jobUserName) throws IOException,
       InterruptedException {
     UserGroupInformation.setConfiguration(conf);
+    Credentials credentials =
+        UserGroupInformation.getCurrentUser().getCredentials();
     UserGroupInformation appMasterUgi = UserGroupInformation
         .createRemoteUser(jobUserName);
+    conf.getCredentials().addAll(credentials);
     appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
       @Override
       public Object run() throws Exception {

Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1484570&r1=1484569&r2=1484570&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Mon May 20 19:32:51 2013
@@ -21,28 +21,51 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -50,13 +73,20 @@ import org.junit.Test;
 public class TestMRAppMaster {
   private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
   static String stagingDir = "staging/";
+  private static FileContext localFS = null;
+  private static final File testDir = new File("target",
+    TestMRAppMaster.class.getName() + "-tmpDir").getAbsoluteFile();
   
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws AccessControlException,
+      FileNotFoundException, IllegalArgumentException, IOException {
     //Do not error out if metrics are inited multiple times
     DefaultMetricsSystem.setMiniClusterMode(true);
     File dir = new File(stagingDir);
     stagingDir = dir.getAbsolutePath();
+    localFS = FileContext.getLocalFSFileContext();
+    localFS.delete(new Path(testDir.getAbsolutePath()), true);
+    testDir.mkdir();
   }
   
   @Before
@@ -81,7 +111,7 @@ public class TestMRAppMaster {
     MRAppMasterTest appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
             System.currentTimeMillis());
-    YarnConfiguration conf = new YarnConfiguration();
+    JobConf conf = new JobConf();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
     assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
@@ -94,7 +124,7 @@ public class TestMRAppMaster {
     String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
     String containerIdStr = "container_1317529182569_0004_000002_1";
     String userName = "TestAppMasterUser";
-    YarnConfiguration conf = new YarnConfiguration();
+    JobConf conf = new JobConf();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     ApplicationAttemptId applicationAttemptId = ConverterUtils
         .toApplicationAttemptId(applicationAttemptIdStr);
@@ -107,7 +137,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -128,7 +158,7 @@ public class TestMRAppMaster {
     String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
     String containerIdStr = "container_1317529182569_0004_000002_1";
     String userName = "TestAppMasterUser";
-    YarnConfiguration conf = new YarnConfiguration();
+    JobConf conf = new JobConf();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     ApplicationAttemptId applicationAttemptId = ConverterUtils
         .toApplicationAttemptId(applicationAttemptIdStr);
@@ -142,7 +172,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -163,7 +193,7 @@ public class TestMRAppMaster {
     String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
     String containerIdStr = "container_1317529182569_0004_000002_1";
     String userName = "TestAppMasterUser";
-    YarnConfiguration conf = new YarnConfiguration();
+    JobConf conf = new JobConf();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     ApplicationAttemptId applicationAttemptId = ConverterUtils
         .toApplicationAttemptId(applicationAttemptIdStr);
@@ -177,7 +207,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -198,7 +228,7 @@ public class TestMRAppMaster {
     String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
     String containerIdStr = "container_1317529182569_0004_000002_1";
     String userName = "TestAppMasterUser";
-    YarnConfiguration conf = new YarnConfiguration();
+    JobConf conf = new JobConf();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     ApplicationAttemptId applicationAttemptId = ConverterUtils
         .toApplicationAttemptId(applicationAttemptIdStr);
@@ -212,7 +242,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -228,39 +258,155 @@ public class TestMRAppMaster {
     assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
     appMaster.stop();
   }
+
+  // A dirty hack to modify the env of the current JVM itself - Dirty, but
+  // should be okay for testing.
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private static void setNewEnvironmentHack(Map<String, String> newenv)
+      throws Exception {
+    try {
+      Class<?> cl = Class.forName("java.lang.ProcessEnvironment");
+      Field field = cl.getDeclaredField("theEnvironment");
+      field.setAccessible(true);
+      Map<String, String> env = (Map<String, String>) field.get(null);
+      env.clear();
+      env.putAll(newenv);
+      Field ciField = cl.getDeclaredField("theCaseInsensitiveEnvironment");
+      ciField.setAccessible(true);
+      Map<String, String> cienv = (Map<String, String>) ciField.get(null);
+      cienv.clear();
+      cienv.putAll(newenv);
+    } catch (NoSuchFieldException e) {
+      Class[] classes = Collections.class.getDeclaredClasses();
+      Map<String, String> env = System.getenv();
+      for (Class cl : classes) {
+        if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+          Field field = cl.getDeclaredField("m");
+          field.setAccessible(true);
+          Object obj = field.get(env);
+          Map<String, String> map = (Map<String, String>) obj;
+          map.clear();
+          map.putAll(newenv);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testMRAppMasterCredentials() throws Exception {
+
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+
+    // Simulate credentials passed to AM via client->RM->NM
+    Credentials credentials = new Credentials();
+    byte[] identifier = "MyIdentifier".getBytes();
+    byte[] password = "MyPassword".getBytes();
+    Text kind = new Text("MyTokenKind");
+    Text service = new Text("host:port");
+    Token<? extends TokenIdentifier> myToken =
+        new Token<TokenIdentifier>(identifier, password, kind, service);
+    Text tokenAlias = new Text("myToken");
+    credentials.addToken(tokenAlias, myToken);
+    Text keyAlias = new Text("mySecretKeyAlias");
+    credentials.addSecretKey(keyAlias, "mySecretKey".getBytes());
+    Token<? extends TokenIdentifier> storedToken =
+        credentials.getToken(tokenAlias);
+
+    JobConf conf = new JobConf();
+
+    Path tokenFilePath = new Path(testDir.getAbsolutePath(), "tokens-file");
+    Map<String, String> newEnv = new HashMap<String, String>();
+    newEnv.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, tokenFilePath
+      .toUri().getPath());
+    setNewEnvironmentHack(newEnv);
+    credentials.writeTokenStorageFile(tokenFilePath, conf);
+
+    ApplicationId appId = BuilderUtils.newApplicationId(12345, 56);
+    ApplicationAttemptId applicationAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+    ContainerId containerId =
+        BuilderUtils.newContainerId(applicationAttemptId, 546);
+    String userName = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    // Create staging dir, so MRAppMaster doesn't barf.
+    File stagingDir =
+        new File(MRApps.getStagingAreaDir(conf, userName).toString());
+    stagingDir.mkdirs();
+
+    // Set login-user to null as that is how real world MRApp starts with.
+    // This is null is the reason why token-file is read by UGI.
+    UserGroupInformation.setLoginUser(null);
+
+    MRAppMasterTest appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+          System.currentTimeMillis(), false, true);
+    MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+
+    // Now validate the credentials
+    Credentials appMasterCreds = conf.getCredentials();
+    Assert.assertNotNull(appMasterCreds);
+    Assert.assertEquals(1, appMasterCreds.numberOfSecretKeys());
+    Assert.assertEquals(1, appMasterCreds.numberOfTokens());
+
+    // Validate the tokens
+    Token<? extends TokenIdentifier> usedToken =
+        appMasterCreds.getToken(tokenAlias);
+    Assert.assertNotNull(usedToken);
+    Assert.assertEquals(storedToken, usedToken);
+
+    // Validate the keys
+    byte[] usedKey = appMasterCreds.getSecretKey(keyAlias);
+    Assert.assertNotNull(usedKey);
+    Assert.assertEquals("mySecretKey", new String(usedKey));
+
+    // The credentials should also be added to conf so that OuputCommitter can
+    // access it
+    Credentials confCredentials = conf.getCredentials();
+    Assert.assertEquals(1, confCredentials.numberOfSecretKeys());
+    Assert.assertEquals(1, confCredentials.numberOfTokens());
+    Assert.assertEquals(storedToken, confCredentials.getToken(tokenAlias));
+    Assert.assertEquals("mySecretKey",
+      new String(confCredentials.getSecretKey(keyAlias)));
+  }
 }
 
 class MRAppMasterTest extends MRAppMaster {
 
   Path stagingDirPath;
   private Configuration conf;
-  private boolean overrideInitAndStart;
+  private boolean overrideInit;
+  private boolean overrideStart;
   ContainerAllocator mockContainerAllocator;
+  CommitterEventHandler mockCommitterEventHandler;
+  RMHeartbeatHandler mockRMHeartbeatHandler;
 
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
       long submitTime) {
-    this(applicationAttemptId, containerId, host, port, httpPort, submitTime, 
-        true);
+    this(applicationAttemptId, containerId, host, port, httpPort,
+        submitTime, true, true);
   }
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
-      long submitTime, boolean overrideInitAndStart) {
+      long submitTime, boolean overrideInit, boolean overrideStart) {
     super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
-    this.overrideInitAndStart = overrideInitAndStart;
+    this.overrideInit = overrideInit;
+    this.overrideStart = overrideStart;
     mockContainerAllocator = mock(ContainerAllocator.class);
+    mockCommitterEventHandler = mock(CommitterEventHandler.class);
+    mockRMHeartbeatHandler = mock(RMHeartbeatHandler.class);
   }
 
   @Override
   public void init(Configuration conf) {
-    if (overrideInitAndStart) {
-      this.conf = conf; 
-    } else {
+    if (!overrideInit) {
       super.init(conf);
     }
+    this.conf = conf;
   }
-  
-  @Override 
+
+  @Override
   protected void downloadTokensAndSetupUGI(Configuration conf) {
     try {
       this.currentUser = UserGroupInformation.getCurrentUser();
@@ -268,7 +414,7 @@ class MRAppMasterTest extends MRAppMaste
       throw new YarnException(e);
     }
   }
-  
+
   @Override
   protected ContainerAllocator createContainerAllocator(
       final ClientService clientService, final AppContext context) {
@@ -276,8 +422,19 @@ class MRAppMasterTest extends MRAppMaste
   }
 
   @Override
+  protected EventHandler<CommitterEvent> createCommitterEventHandler(
+      AppContext context, OutputCommitter committer) {
+    return mockCommitterEventHandler;
+  }
+
+  @Override
+  protected RMHeartbeatHandler getRMHeartbeatHandler() {
+    return mockRMHeartbeatHandler;
+  }
+
+  @Override
   public void start() {
-    if (overrideInitAndStart) {
+    if (overrideStart) {
       try {
         String user = UserGroupInformation.getCurrentUser().getShortUserName();
         stagingDirPath = MRApps.getStagingAreaDir(conf, user);