You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/08/04 01:46:51 UTC

svn commit: r982087 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/security/ src/test/mapred/org/apache/hadoop/mapreduce/security/ src/tools/org/apache/hadoop/tools/

Author: ddas
Date: Tue Aug  3 23:46:51 2010
New Revision: 982087

URL: http://svn.apache.org/viewvc?rev=982087&view=rev
Log:
MAPREDUCE-1958. The MapReduce part corresponding to the HADOOP-6873. Contributed by Boris Shkolnik & Owen O'Malley.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=982087&r1=982086&r2=982087&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Aug  3 23:46:51 2010
@@ -223,6 +223,9 @@ Trunk (unreleased changes)
     of configuration properties "mapreduce.job.name" and "mapred.job.name".
     (Ravi Gummadi via amareshwari)
 
+    MAPREDUCE-1958. The MapReduce part corresponding to the HADOOP-6873.
+    (Boris Shkolnik & Owen O'Malley via ddas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=982087&r1=982086&r2=982087&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Tue Aug  3 23:46:51 2010
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -46,6 +47,7 @@ import org.apache.hadoop.mapreduce.proto
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -340,12 +342,7 @@ class JobSubmitter {
       TokenCache.obtainTokensForNamenodes(job.getCredentials(),
           new Path[] { submitJobDir }, conf);
       
-      // load the binary file, if the user has one
-      String binaryTokenFilename = conf.get("mapreduce.job.credentials.binary");
-      if (binaryTokenFilename != null) {
-        job.getCredentials().readTokenStorageFile(
-            new Path("file:///" + binaryTokenFilename), conf);
-      }
+      populateTokenCache(conf, job.getCredentials());
 
       copyAndConfigureFiles(job, submitJobDir);
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
@@ -364,7 +361,7 @@ class JobSubmitter {
       //
       // Now, actually submit the job (using the submit name)
       //
-      populateTokenCache(conf, job.getCredentials());
+      printTokens(jobId, job.getCredentials());
       status = submitClient.submitJob(
           jobId, submitJobDir.toString(), job.getCredentials());
       if (status != null) {
@@ -411,6 +408,21 @@ class JobSubmitter {
   }
   
 
+  
+  @SuppressWarnings("unchecked")
+  private void printTokens(JobID jobId,
+      Credentials credentials) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Printing tokens for job: " + jobId);
+      for(Token<?> token: credentials.getAllTokens()) {
+        if (token.getKind().toString().equals("HDFS_DELEGATION_TOKEN")) {
+          LOG.debug("Submitting with " +
+              DFSClient.stringifyToken((Token<org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier>) token));
+        }
+      }
+    }
+  }
+
   @SuppressWarnings("unchecked")
   private <T extends InputSplit>
   int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
@@ -494,11 +506,18 @@ class JobSubmitter {
     }
   }
   
-  // get secret keys and tokens and store them into TokenCache
   @SuppressWarnings("unchecked")
-  private void populateTokenCache(Configuration conf, Credentials credentials)
-      throws IOException {
-    // create TokenStorage object with user secretKeys
+  private void readTokensFromFiles(Configuration conf, Credentials credentials)
+  throws IOException {
+    // add tokens and secrets coming from a token storage file
+    String binaryTokenFilename =
+      conf.get("mapreduce.job.credentials.binary");
+    if (binaryTokenFilename != null) {
+      Credentials binary = Credentials.readTokenStorageFile(
+          new Path("file:///" + binaryTokenFilename), conf);
+      credentials.addAll(binary);
+    }
+    // add secret keys coming from a json file
     String tokensFileName = conf.get("mapreduce.job.credentials.json");
     if(tokensFileName != null) {
       LOG.info("loading user's secret keys from " + tokensFileName);
@@ -523,10 +542,17 @@ class JobSubmitter {
       if(json_error)
         LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
     }
-    
+  }
+
+  //get secret keys and tokens and store them into TokenCache
+  @SuppressWarnings("unchecked")
+  private void populateTokenCache(Configuration conf, Credentials credentials) 
+  throws IOException{
+    readTokensFromFiles(conf, credentials);
     // add the delegation tokens from configuration
     String [] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
-    LOG.info("adding the following namenodes' delegation tokens:" + Arrays.toString(nameNodes));
+    LOG.debug("adding the following namenodes' delegation tokens:" + 
+        Arrays.toString(nameNodes));
     if(nameNodes != null) {
       Path [] ps = new Path[nameNodes.length];
       for(int i=0; i< nameNodes.length; i++) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=982087&r1=982086&r2=982087&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Tue Aug  3 23:46:51 2010
@@ -106,74 +106,40 @@ public class TokenCache {
     // get jobtracker principal id (for the renewer)
     KerberosName jtKrbName = 
       new KerberosName(conf.get(JTConfig.JT_USER_NAME,""));
-    Text delegTokenRenewer = new Text(jtKrbName.getShortName());
-
-    boolean notReadFile = true;
-
-    // TODO: Connecting to the namenode is not required in the case,
-    // where we already have the credentials in the file
-    if(fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem)fs;
-      URI uri = dfs.getUri();
-      String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+    
+    String delegTokenRenewer = jtKrbName.getShortName();
+    boolean readFile = true;
 
-      // see if we already have the token
-      Token<DelegationTokenIdentifier> token = 
-        TokenCache.getDelegationToken(credentials, fs_addr); 
-      if(token != null) {
-        LOG.debug("DT for " + token.getService()  + " is already present");
-        return;
-      }
-      if (notReadFile) { //read the file only once
+    String fsName = fs.getCanonicalServiceName();
+    if (TokenCache.getDelegationToken(credentials, fsName) == null) {
+      //TODO: Need to come up with a better place to put
+      //this block of code to do with reading the file
+      if (readFile) {
+        readFile = false;
         String binaryTokenFilename =
           conf.get("mapreduce.job.credentials.binary");
         if (binaryTokenFilename != null) {
-          credentials.readTokenStorageFile(new Path("file:///" +  
-              binaryTokenFilename), conf);
+          Credentials binary;
+          try {
+            binary = Credentials.readTokenStorageFile(
+                new Path("file:///" +  binaryTokenFilename), conf);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+          credentials.addAll(binary);
         }
-        notReadFile = false;
-        token = 
-          TokenCache.getDelegationToken(credentials, fs_addr); 
-        if(token != null) {
-          LOG.debug("DT for " + token.getService()  + " is already present");
+        if (TokenCache.getDelegationToken(credentials, fsName) != null) {
+          LOG.debug("DT for " + fsName  + " is already present");
           return;
         }
       }
-      // get the token
-      token = dfs.getDelegationToken(delegTokenRenewer);
-      if(token==null) 
-        throw new IOException("Token from " + fs_addr + " is null");
-
-      token.setService(new Text(fs_addr));
-      credentials.addToken(new Text(fs_addr), token);
-      LOG.info("Got dt for " + uri + ";uri="+ fs_addr + 
-          ";t.service="+token.getService());
-    } else if (fs instanceof HftpFileSystem) {
-      HftpFileSystem hfs = (HftpFileSystem)fs;
-      URI uri = hfs.getUri();
-      String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
-      Token<DelegationTokenIdentifier> token = 
-        TokenCache.getDelegationToken(credentials, fs_addr); 
-      if(token != null) {
-        LOG.debug("DT for " + token.getService()  + " is already present");
-        return;
-      }
-      //the initialize method of hftp, called via FileSystem.get() done
-      //earlier gets a delegation token
-
-      Token<? extends TokenIdentifier> t = hfs.getDelegationToken();
-      if (t != null) {
-        credentials.addToken(new Text(fs_addr), t);
-
-        // in this case port in fs_addr is port for hftp request, but
-        // token's port is for RPC
-        // to find the correct DT we need to know the mapping between Hftp port 
-        // and RPC one. hence this new setting in the conf.
-        String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+fs_addr;
-        conf.set(key, t.getService().toString());
-        LOG.info("GOT dt for " + uri + " and stored in conf as " + key + "=" 
-            + t.getService());
-
+      Token<?> token = fs.getDelegationToken(delegTokenRenewer);
+      if (token != null) {
+        Text fsNameText = new Text(fsName);
+        token.setService(fsNameText);
+        credentials.addToken(fsNameText, token);
+        LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName + 
+            ";t.service="+token.getService());
       }
     }
   }
@@ -213,14 +179,14 @@ public class TokenCache {
   public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
   throws IOException {
     Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
-    
-    Credentials ts = new Credentials();
-    ts.readTokenStorageFile(localJobTokenFile, conf);
+
+    Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
 
     if(LOG.isDebugEnabled()) {
-      LOG.debug("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath() 
-        +"; num of sec keys  = " + ts.numberOfSecretKeys() + " Number of tokens " + 
-        ts.numberOfTokens());
+      LOG.debug("Task: Loaded jobTokenFile from: "+
+          localJobTokenFile.toUri().getPath() 
+          +"; num of sec keys  = " + ts.numberOfSecretKeys() +
+          " Number of tokens " +  ts.numberOfTokens());
     }
     return ts;
   }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=982087&r1=982086&r2=982087&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java Tue Aug  3 23:46:51 2010
@@ -271,7 +271,6 @@ public class TestTokenCache {
   
   @Test
   public void testGetTokensForNamenodes() throws IOException {
-    FileSystem fs = dfsCluster.getFileSystem();
     
     Credentials credentials = new Credentials();
     TokenCache.obtainTokensForNamenodesInternal(credentials, new Path[] { p1,
@@ -305,13 +304,15 @@ public class TestTokenCache {
 
     DelegationTokenSecretManager dtSecretManager = 
       dfsCluster.getNamesystem().getDelegationTokenSecretManager();
+    String renewer = "renewer";
+    jConf.set(JTConfig.JT_USER_NAME,renewer);
     DelegationTokenIdentifier dtId = 
-      new DelegationTokenIdentifier(new Text("user"), new Text("renewer"), null);
+      new DelegationTokenIdentifier(new Text("user"), new Text(renewer), null);
     final Token<DelegationTokenIdentifier> t = 
       new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
 
     final URI uri = new URI("hftp://host:2222/file1");
-    String fs_addr = 
+    final String fs_addr = 
       SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
     t.setService(new Text(fs_addr));
 
@@ -329,12 +330,19 @@ public class TestTokenCache {
       public Token<DelegationTokenIdentifier>  answer(InvocationOnMock invocation)
       throws Throwable {
         return t;
-      }}).when(hfs).getDelegationToken();
-
-
+      }}).when(hfs).getDelegationToken(renewer);
+    
+    //when(hfs.getCanonicalServiceName).thenReturn(fs_addr);
+    Mockito.doAnswer(new Answer<String>(){
+      @Override
+      public String answer(InvocationOnMock invocation)
+      throws Throwable {
+        return fs_addr;
+      }}).when(hfs).getCanonicalServiceName();
+    
     Credentials credentials = new Credentials();
     Path p = new Path(uri.toString());
-    System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr);
+    System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr + "; rn=" + renewer);
     TokenCache.obtainTokensForNamenodesInternal(hfs, credentials, jConf);
 
     Collection<Token<? extends TokenIdentifier>> tns = credentials.getAllTokens();
@@ -350,11 +358,6 @@ public class TestTokenCache {
       }
       assertTrue("didn't find token for " + p, found);
     }
-    // also check the conf value
-    String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY + fs_addr;
-    String confKey = jConf.get(key);
-    assertEquals("jconf key for HFTP DT is not correct", confKey,
-        t.getService().toString());
   }
 
 }

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=982087&r1=982086&r2=982087&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Tue Aug  3 23:46:51 2010
@@ -740,10 +740,6 @@ public class DistCp implements Tool {
     List<IOException> rslt = new ArrayList<IOException>();
     List<Path> unglobbed = new LinkedList<Path>();
     
-    // get tokens for all the required FileSystems..
-    // also set the renewer as the JobTracker for the hftp case
-    jobConf.set(HftpFileSystem.HFTP_RENEWER, 
-        jobConf.get(JobTracker.JT_USER_NAME, ""));
     Path[] ps = new Path[srcPaths.size()];
     ps = srcPaths.toArray(ps);
     TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);