You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/07/23 02:50:14 UTC

svn commit: r966918 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/security/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/map...

Author: ddas
Date: Fri Jul 23 00:50:13 2010
New Revision: 966918

URL: http://svn.apache.org/viewvc?rev=966918&view=rev
Log:
MAPREDUCE-1566. Adds a configuration attribute using which job clients can specify a credentials file. The tokens from there will be passed to the job. Contributed by Jitendra Pandey and Owen O'Malley.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 23 00:50:13 2010
@@ -94,6 +94,10 @@ Trunk (unreleased changes)
 
     MAPREDUCE-1733. Makes pipes applications secure. (Jitendra Pandey via ddas)
 
+    MAPREDUCE-1566. Adds a configuration attribute using which job clients can
+    specify a credentials file. The tokens from there will be passed to the job.
+    (Jitendra Pandey and Owen O'Malley via ddas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Jul 23 00:50:13 2010
@@ -3687,8 +3687,10 @@ public class JobInProgress {
   private void generateAndStoreTokens() throws IOException{
     Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
     Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
-    // we need to create this file using the jobtracker's filesystem
-    FSDataOutputStream os = jobtracker.getFileSystem().create(keysFile);
+
+    if (tokenStorage == null) {
+      tokenStorage = new Credentials();
+    }
     
     //create JobToken file and write token to it
     JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
@@ -3697,15 +3699,10 @@ public class JobInProgress {
         jobtracker.getJobTokenSecretManager());
     token.setService(identifier.getJobId());
     
-    // add this token to the tokenStorage
-    if(tokenStorage == null)
-      tokenStorage = new Credentials();
-    
     TokenCache.setJobToken(token, tokenStorage);
     
     // write TokenStorage out
-    tokenStorage.write(os);
-    os.close();
+    tokenStorage.writeTokenStorageFile(keysFile, conf);
     LOG.info("jobToken generated and stored with users keys in "
         + keysFile.toUri().getPath());
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Jul 23 00:50:13 2010
@@ -340,6 +340,13 @@ class JobSubmitter {
       TokenCache.obtainTokensForNamenodes(job.getCredentials(),
           new Path[] { submitJobDir }, conf);
       
+      // load the binary file, if the user has one
+      String binaryTokenFilename = conf.get("mapreduce.job.credentials.binary");
+      if (binaryTokenFilename != null) {
+        job.getCredentials().readTokenStorageFile(
+            new Path("file:///" + binaryTokenFilename), conf);
+      }
+
       copyAndConfigureFiles(job, submitJobDir);
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
 
@@ -368,7 +375,9 @@ class JobSubmitter {
     } finally {
       if (status == null) {
         LOG.info("Cleaning up the staging area " + submitJobDir);
-        jtFs.delete(submitJobDir, true);
+        if (jtFs != null && submitJobDir != null)
+          jtFs.delete(submitJobDir, true);
+
       }
     }
   }
@@ -490,7 +499,7 @@ class JobSubmitter {
   private void populateTokenCache(Configuration conf, Credentials credentials)
       throws IOException {
     // create TokenStorage object with user secretKeys
-    String tokensFileName = conf.get("tokenCacheFile");
+    String tokensFileName = conf.get("mapreduce.job.credentials.json");
     if(tokensFileName != null) {
       LOG.info("loading user's secret keys from " + tokensFileName);
       String localFileName = new Path(tokensFileName).toUri().getPath();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Jul 23 00:50:13 2010
@@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -91,8 +90,10 @@ public class TokenCache {
     KerberosName jtKrbName = new KerberosName(conf.get(JTConfig.JT_USER_NAME,
         ""));
     Text delegTokenRenewer = new Text(jtKrbName.getShortName());
-    
+    boolean notReadFile = true;
     for(Path p: ps) {
+      // TODO: Connecting to the namenode is not required in the case,
+      // where we already have the credentials in the file
       FileSystem fs = FileSystem.get(p.toUri(), conf);
       if(fs instanceof DistributedFileSystem) {
         DistributedFileSystem dfs = (DistributedFileSystem)fs;
@@ -106,6 +107,21 @@ public class TokenCache {
           LOG.debug("DT for " + token.getService()  + " is already present");
           continue;
         }
+        if (notReadFile) { //read the file only once
+          String binaryTokenFilename =
+            conf.get("mapreduce.job.credentials.binary");
+          if (binaryTokenFilename != null) {
+            credentials.readTokenStorageFile(new Path("file:///" +  
+                binaryTokenFilename), conf);
+          }
+          notReadFile = false;
+          token = 
+            TokenCache.getDelegationToken(credentials, fs_addr); 
+          if(token != null) {
+            LOG.debug("DT for " + token.getService()  + " is already present");
+            continue;
+          }
+        }
         // get the token
         token = dfs.getDelegationToken(delegTokenRenewer);
         if(token==null) 
@@ -179,18 +195,16 @@ public class TokenCache {
   @InterfaceAudience.Private
   public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
   throws IOException {
-    Path localJobTokenFile = new Path (jobTokenFile);
-    FileSystem localFS = FileSystem.getLocal(conf);
-    FSDataInputStream in = localFS.open(localJobTokenFile);
+    Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
     
     Credentials ts = new Credentials();
-    ts.readFields(in);
+    ts.readTokenStorageFile(localJobTokenFile, conf);
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath() 
-        +"; num of sec keys  = " + ts.numberOfSecretKeys());
+        +"; num of sec keys  = " + ts.numberOfSecretKeys() + " Number of tokens " + 
+        ts.numberOfTokens());
     }
-    in.close();
     return ts;
   }
   /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Jul 23 00:50:13 2010
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
@@ -287,13 +288,9 @@ public class TestTaskTrackerLocalization
     File dir = new File(TEST_ROOT_DIR, jobId.toString());
     if(!dir.exists())
       assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
-    
-    File jobTokenFile = new File(dir, TokenCache.JOB_TOKEN_HDFS_FILE);
-    FileOutputStream fos = new FileOutputStream(jobTokenFile);
-    java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
-    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
-    jt.write(out); // writing empty file, we don't the keys for this test 
-    out.close();
+    // writing empty file, we don't need the keys for this test
+    new Credentials().writeTokenStorageFile(new Path("file:///" + dir,
+        TokenCache.JOB_TOKEN_HDFS_FILE), new Configuration());
   }
 
   @Override

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java?rev=966918&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java Fri Jul 23 00:50:13 2010
@@ -0,0 +1,200 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestBinaryTokenFile {
+
+  // my sleep class
+  static class MySleepMapper extends SleepJob.SleepMapper {
+    /**
+     * attempts to access tokenCache as from client
+     */
+    @Override
+    public void map(IntWritable key, IntWritable value, Context context)
+    throws IOException, InterruptedException {
+      // get token storage and a key
+      Credentials ts = context.getCredentials();
+      Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
+      
+      
+      if(dts.size() != 2) { // one job token and one delegation token
+        throw new RuntimeException("tokens are not available"); // fail the test
+      }
+      
+      Token<? extends TokenIdentifier> dt = ts.getToken(new Text("Hdfs"));
+      
+      //Verify that dt is same as the token in the file
+      String tokenFile = context.getConfiguration().get(
+          "mapreduce.job.credentials.binary");
+      Credentials cred = new Credentials();
+      cred.readTokenStorageStream(new DataInputStream(new FileInputStream(
+          tokenFile)));
+      for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) {
+        if (!dt.equals(t)) {
+          throw new RuntimeException(
+              "Delegation token in job is not same as the token passed in file."
+                  + " tokenInFile=" + t + ", dt=" + dt);
+        }
+      }
+      
+      super.map(key, value, context);
+    }
+  }
+  
+  class MySleepJob extends SleepJob {
+    @Override
+    public Job createJob(int numMapper, int numReducer, 
+        long mapSleepTime, int mapSleepCount, 
+        long reduceSleepTime, int reduceSleepCount) 
+    throws IOException {
+      Job job =  super.createJob(numMapper, numReducer,
+           mapSleepTime, mapSleepCount, 
+          reduceSleepTime, reduceSleepCount);
+      
+      job.setMapperClass(MySleepMapper.class);
+      //Populate tokens here because security is disabled.
+      setupBinaryTokenFile(job);
+      return job;
+    }
+    
+    private void setupBinaryTokenFile(Job job) {
+    // Credentials in the job will not have delegation tokens
+    // because security is disabled. Fetch delegation tokens
+    // and store in binary token file.
+      try {
+        Credentials cred1 = new Credentials();
+        Credentials cred2 = new Credentials();
+        TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
+            job.getConfiguration());
+        for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
+          cred2.addToken(new Text("Hdfs"), t);
+        }
+        DataOutputStream os = new DataOutputStream(new FileOutputStream(
+            binaryTokenFileName.toString()));
+        cred2.writeTokenStorageToStream(os);
+        os.close();
+        job.getConfiguration().set("mapreduce.job.credentials.binary",
+            binaryTokenFileName.toString());
+      } catch (IOException e) {
+        Assert.fail("Exception " + e);
+      }
+    }
+  }
+  
+  private static MiniMRCluster mrCluster;
+  private static MiniDFSCluster dfsCluster;
+  private static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data","/tmp"));
+  private static final Path binaryTokenFileName = new Path(TEST_DIR, "tokenFile.binary");
+  private static int numSlaves = 1;
+  private static JobConf jConf;
+  private static Path p1;
+  
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+    jConf = new JobConf(conf);
+    mrCluster = new MiniMRCluster(0, 0, numSlaves, 
+        dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, 
+        jConf);
+
+    dfsCluster.getNamesystem().getDelegationTokenSecretManager().startThreads();
+    FileSystem fs = dfsCluster.getFileSystem();
+    
+    p1 = new Path("file1");
+    p1 = fs.makeQualified(p1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if(mrCluster != null)
+      mrCluster.shutdown();
+    mrCluster = null;
+    if(dfsCluster != null)
+      dfsCluster.shutdown();
+    dfsCluster = null;
+  }
+  
+  /**
+   * run a distributed job and verify that TokenCache is available
+   * @throws IOException
+   */
+  @Test
+  public void testBinaryTokenFile() throws IOException {
+    
+    System.out.println("running dist job");
+    
+    // make sure JT starts
+    jConf = mrCluster.createJobConf();
+    
+    // provide namenodes names for the job to get the delegation tokens for
+    String nnUri = dfsCluster.getURI().toString();
+    jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
+    // job tracker principla id..
+    jConf.set(JTConfig.JT_USER_NAME, "jt_id");
+    
+    // using argument to pass the file name
+    String[] args = { 
+        "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+        };
+     
+    int res = -1;
+    try {
+      res = ToolRunner.run(jConf, new MySleepJob(), args);
+    } catch (Exception e) {
+      System.out.println("Job failed with" + e.getLocalizedMessage());
+      e.printStackTrace(System.out);
+      fail("Job failed");
+    }
+    assertEquals("dist job res is not 0", res, 0);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=966918&r1=966917&r2=966918&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Jul 23 00:50:13 2010
@@ -248,7 +248,7 @@ public class TestTokenCache {
     System.out.println("running local job");
     // this is local job
     String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"}; 
-    jConf.set("tokenCacheFile", tokenFileName.toString());
+    jConf.set("mapreduce.job.credentials.json", tokenFileName.toString());
     
     int res = -1;
     try {