You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/07/24 00:56:43 UTC

svn commit: r967297 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/security/TokenCache.java src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java

Author: ddas
Date: Fri Jul 23 22:56:43 2010
New Revision: 967297

URL: http://svn.apache.org/viewvc?rev=967297&view=rev
Log:
MAPREDUCE-1718. Fixes a bug in the construction of jobconf key for the mapping that the tasks use at runtime for looking up delegation tokens. Contributed by Boris Shkolnik.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=967297&r1=967296&r2=967297&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 23 22:56:43 2010
@@ -195,6 +195,10 @@ Trunk (unreleased changes)
 
     MAPREDUCE-1925. Fix failing TestRumenJobTraces. (Ravi Gummadi via cdouglas)
 
+    MAPREDUCE-1718. Fixes a bug in the construction of jobconf key for the
+    mapping that the tasks use at runtime for looking up delegation tokens.
+    (Boris Shkolnik via ddas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=967297&r1=967296&r2=967297&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Jul 23 22:56:43 2010
@@ -36,9 +36,9 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.KerberosName;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -86,81 +86,98 @@ public class TokenCache {
     
   static void obtainTokensForNamenodesInternal(Credentials credentials,
       Path[] ps, Configuration conf) throws IOException {
+    for(Path p: ps) {
+      FileSystem fs = FileSystem.get(p.toUri(), conf);
+      obtainTokensForNamenodesInternal(fs, credentials, conf);
+    }
+  }
+  
+  /**
+   * get delegation token for a specific FS
+   * @param fs
+   * @param credentials
+   * @param p
+   * @param conf
+   * @throws IOException
+   */
+  static void obtainTokensForNamenodesInternal(FileSystem fs, 
+      Credentials credentials, Configuration conf) throws IOException {
+
     // get jobtracker principal id (for the renewer)
-    KerberosName jtKrbName = new KerberosName(conf.get(JTConfig.JT_USER_NAME,
-        ""));
+    KerberosName jtKrbName = 
+      new KerberosName(conf.get(JTConfig.JT_USER_NAME,""));
     Text delegTokenRenewer = new Text(jtKrbName.getShortName());
+
     boolean notReadFile = true;
-    for(Path p: ps) {
-      // TODO: Connecting to the namenode is not required in the case,
-      // where we already have the credentials in the file
-      FileSystem fs = FileSystem.get(p.toUri(), conf);
-      if(fs instanceof DistributedFileSystem) {
-        DistributedFileSystem dfs = (DistributedFileSystem)fs;
-        URI uri = fs.getUri();
-        String fs_addr = buildDTServiceName(uri);
-        
-        // see if we already have the token
-        Token<DelegationTokenIdentifier> token = 
-          TokenCache.getDelegationToken(credentials, fs_addr); 
-        if(token != null) {
-          LOG.debug("DT for " + token.getService()  + " is already present");
-          continue;
-        }
-        if (notReadFile) { //read the file only once
-          String binaryTokenFilename =
-            conf.get("mapreduce.job.credentials.binary");
-          if (binaryTokenFilename != null) {
-            credentials.readTokenStorageFile(new Path("file:///" +  
-                binaryTokenFilename), conf);
-          }
-          notReadFile = false;
-          token = 
-            TokenCache.getDelegationToken(credentials, fs_addr); 
-          if(token != null) {
-            LOG.debug("DT for " + token.getService()  + " is already present");
-            continue;
-          }
+
+    // TODO: Connecting to the namenode is not required in the case,
+    // where we already have the credentials in the file
+    if(fs instanceof DistributedFileSystem) {
+      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      URI uri = dfs.getUri();
+      String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+
+      // see if we already have the token
+      Token<DelegationTokenIdentifier> token = 
+        TokenCache.getDelegationToken(credentials, fs_addr); 
+      if(token != null) {
+        LOG.debug("DT for " + token.getService()  + " is already present");
+        return;
+      }
+      if (notReadFile) { //read the file only once
+        String binaryTokenFilename =
+          conf.get("mapreduce.job.credentials.binary");
+        if (binaryTokenFilename != null) {
+          credentials.readTokenStorageFile(new Path("file:///" +  
+              binaryTokenFilename), conf);
         }
-        // get the token
-        token = dfs.getDelegationToken(delegTokenRenewer);
-        if(token==null) 
-          throw new IOException("Token from " + fs_addr + " is null");
-
-        token.setService(new Text(fs_addr));
-        credentials.addToken(new Text(fs_addr), token);
-        LOG.info("Got dt for " + p + ";uri="+ fs_addr + 
-            ";t.service="+token.getService());
-      } else if (fs instanceof HftpFileSystem) {
-        String fs_addr = buildDTServiceName(fs.getUri());
-        Token<DelegationTokenIdentifier> token = 
+        notReadFile = false;
+        token = 
           TokenCache.getDelegationToken(credentials, fs_addr); 
         if(token != null) {
           LOG.debug("DT for " + token.getService()  + " is already present");
-          continue;
+          return;
         }
-        //the initialize method of hftp, called via FileSystem.get() done
-        //earlier gets a delegation token
+      }
+      // get the token
+      token = dfs.getDelegationToken(delegTokenRenewer);
+      if(token==null) 
+        throw new IOException("Token from " + fs_addr + " is null");
+
+      token.setService(new Text(fs_addr));
+      credentials.addToken(new Text(fs_addr), token);
+      LOG.info("Got dt for " + uri + ";uri="+ fs_addr + 
+          ";t.service="+token.getService());
+    } else if (fs instanceof HftpFileSystem) {
+      HftpFileSystem hfs = (HftpFileSystem)fs;
+      URI uri = hfs.getUri();
+      String fs_addr = SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+      Token<DelegationTokenIdentifier> token = 
+        TokenCache.getDelegationToken(credentials, fs_addr); 
+      if(token != null) {
+        LOG.debug("DT for " + token.getService()  + " is already present");
+        return;
+      }
+      //the initialize method of hftp, called via FileSystem.get() done
+      //earlier gets a delegation token
 
-        Token<? extends TokenIdentifier> t = ((HftpFileSystem) fs).getDelegationToken();
-        if (t != null) {
-          credentials.addToken(new Text(fs_addr), t);
-
-          // in this case port in fs_addr is port for hftp request, but
-          // token's port is for RPC
-          // to find the correct DT we need to know the mapping between Hftp port 
-          // and RPC one. hence this new setting in the conf.
-          URI uri = ((HftpFileSystem) fs).getUri();
-          String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+buildDTServiceName(uri);
-          conf.set(key, t.getService().toString());
-          LOG.info("GOT dt for " + p + " and stored in conf as " + key + "=" 
-              + t.getService());
+      Token<? extends TokenIdentifier> t = hfs.getDelegationToken();
+      if (t != null) {
+        credentials.addToken(new Text(fs_addr), t);
+
+        // in this case port in fs_addr is port for hftp request, but
+        // token's port is for RPC
+        // to find the correct DT we need to know the mapping between Hftp port 
+        // and RPC one. hence this new setting in the conf.
+        String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+fs_addr;
+        conf.set(key, t.getService().toString());
+        LOG.info("GOT dt for " + uri + " and stored in conf as " + key + "=" 
+            + t.getService());
 
-        }
       }
     }
   }
-  
+
   /**
    * file name used on HDFS for generated job token
    */
@@ -225,15 +242,4 @@ public class TokenCache {
   public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
     return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
   }
-  
-  public static String buildDTServiceName(URI uri) {
-    int port = uri.getPort();
-    if(port == -1) 
-      port = NameNode.DEFAULT_PORT;
-    
-    // build the service name string "ip:port"
-    StringBuffer sb = new StringBuffer();
-    sb.append(NetUtils.normalizeHostName(uri.getHost())).append(":").append(port);
-    return sb.toString();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=967297&r1=967296&r2=967297&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Jul 23 22:56:43 2010
@@ -22,9 +22,12 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.security.NoSuchAlgorithmException;
 import java.util.Collection;
 import java.util.HashMap;
@@ -37,8 +40,11 @@ import org.apache.commons.codec.binary.B
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
@@ -46,18 +52,20 @@ import org.apache.hadoop.mapred.MiniMRCl
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.SleepJob;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.Assert;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestTokenCache {
   private static final int NUM_OF_KEYS = 10;
@@ -79,7 +87,7 @@ public class TestTokenCache {
         dts_size = dts.size();
       
       
-      if(dts.size() != 2) { // one job token and one delegation token
+      if(dts_size != 2) { // one job token and one delegation token
         throw new RuntimeException("tokens are not available"); // fail the test
       }
       
@@ -270,7 +278,8 @@ public class TestTokenCache {
         p2 }, jConf);
 
     // this token is keyed by hostname:port key.
-    String fs_addr = TokenCache.buildDTServiceName(p1.toUri());
+    String fs_addr = 
+      SecurityUtil.buildDTServiceName(p1.toUri(), NameNode.DEFAULT_PORT);
     Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(
         credentials, fs_addr);
     System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " +  nnt);
@@ -289,4 +298,63 @@ public class TestTokenCache {
       assertTrue("didn't find token for " + p1 ,found);
     }
   }
+  
+  @Test
+  public void testGetTokensForHftpFS() throws IOException, URISyntaxException {
+    HftpFileSystem hfs = mock(HftpFileSystem.class);
+
+    DelegationTokenSecretManager dtSecretManager = 
+      dfsCluster.getNamesystem().getDelegationTokenSecretManager();
+    DelegationTokenIdentifier dtId = 
+      new DelegationTokenIdentifier(new Text("user"), new Text("renewer"), null);
+    final Token<DelegationTokenIdentifier> t = 
+      new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
+
+    final URI uri = new URI("hftp://host:2222/file1");
+    String fs_addr = 
+      SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
+    t.setService(new Text(fs_addr));
+
+    //when(hfs.getUri()).thenReturn(uri);
+    Mockito.doAnswer(new Answer<URI>(){
+      @Override
+      public URI answer(InvocationOnMock invocation)
+      throws Throwable {
+        return uri;
+      }}).when(hfs).getUri();
+
+    //when(hfs.getDelegationToken()).thenReturn((Token<? extends TokenIdentifier>) t);
+    Mockito.doAnswer(new Answer<Token<DelegationTokenIdentifier>>(){
+      @Override
+      public Token<DelegationTokenIdentifier>  answer(InvocationOnMock invocation)
+      throws Throwable {
+        return t;
+      }}).when(hfs).getDelegationToken();
+
+
+    Credentials credentials = new Credentials();
+    Path p = new Path(uri.toString());
+    System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr);
+    TokenCache.obtainTokensForNamenodesInternal(hfs, credentials, jConf);
+
+    Collection<Token<? extends TokenIdentifier>> tns = credentials.getAllTokens();
+    assertEquals("number of tokens is not 1", 1, tns.size());
+
+    boolean found = false;
+    for(Token<? extends TokenIdentifier> tt: tns) {
+      System.out.println("token="+tt);
+      if(tt.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) &&
+          tt.getService().equals(new Text(fs_addr))) {
+        found = true;
+        assertEquals("different token", tt, t);
+      }
+      assertTrue("didn't find token for " + p, found);
+    }
+    // also check the conf value
+    String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY + fs_addr;
+    String confKey = jConf.get(key);
+    assertEquals("jconf key for HFTP DT is not correct", confKey,
+        t.getService().toString());
+  }
+
 }