You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/07/19 22:53:10 UTC

svn commit: r965629 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/security/TokenCache.java src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java src/tools/org/apache/hadoop/tools/DistCp.java

Author: ddas
Date: Mon Jul 19 20:53:10 2010
New Revision: 965629

URL: http://svn.apache.org/viewvc?rev=965629&view=rev
Log:
MAPREDUCE-1935. Makes the Distcp to work in a secure environment. Contributed by Boris Shkolnik.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=965629&r1=965628&r2=965629&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jul 19 20:53:10 2010
@@ -83,6 +83,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1848. Put number of speculative, data local, rack local 
     tasks in JobTracker metrics. (Scott Chen via dhruba)
 
+    MAPREDUCE-1935. Makes the Distcp to work in a secure environment.
+    (Boris Shkolnik via ddas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=965629&r1=965628&r2=965629&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Mon Jul 19 20:53:10 2010
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.secu
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,18 +30,18 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
 
 
 /**
@@ -111,8 +110,34 @@ public class TokenCache {
 
         token.setService(new Text(fs_addr));
         credentials.addToken(new Text(fs_addr), token);
-        LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr + 
+        LOG.info("Got dt for " + p + ";uri="+ fs_addr + 
             ";t.service="+token.getService());
+      } else if (fs instanceof HftpFileSystem) {
+        String fs_addr = buildDTServiceName(fs.getUri());
+        Token<DelegationTokenIdentifier> token = 
+          TokenCache.getDelegationToken(credentials, fs_addr); 
+        if(token != null) {
+          LOG.debug("DT for " + token.getService()  + " is already present");
+          continue;
+        }
+        //the initialize method of hftp, called via FileSystem.get() done
+        //earlier gets a delegation token
+
+        Token<? extends TokenIdentifier> t = ((HftpFileSystem) fs).getDelegationToken();
+        if (t != null) {
+          credentials.addToken(new Text(fs_addr), t);
+
+          // in this case port in fs_addr is port for hftp request, but
+          // token's port is for RPC
+          // to find the correct DT we need to know the mapping between Hftp port 
+          // and RPC one. hence this new setting in the conf.
+          URI uri = ((HftpFileSystem) fs).getUri();
+          String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+buildDTServiceName(uri);
+          conf.set(key, t.getService().toString());
+          LOG.info("GOT dt for " + p + " and stored in conf as " + key + "=" 
+              + t.getService());
+
+        }
       }
     }
   }
@@ -184,7 +209,7 @@ public class TokenCache {
     return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
   }
   
-  static String buildDTServiceName(URI uri) {
+  public static String buildDTServiceName(URI uri) {
     int port = uri.getPort();
     if(port == -1) 
       port = NameNode.DEFAULT_PORT;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=965629&r1=965628&r2=965629&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Mon Jul 19 20:53:10 2010
@@ -20,16 +20,13 @@ package org.apache.hadoop.mapreduce.secu
 
 import java.io.IOException;
 import java.net.URI;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -43,10 +40,12 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 
 
 @InterfaceAudience.Private
@@ -106,13 +105,10 @@ public class DelegationTokenRenewal {
   
   //managing the list of tokens using Map
   // jobId=>List<tokens>
-  private static List<DelegationTokenToRenew> delegationTokens = 
-    Collections.synchronizedList(new ArrayList<DelegationTokenToRenew>());
+  private static Set<DelegationTokenToRenew> delegationTokens = 
+    Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
   //adding token
   private static void addTokenToList(DelegationTokenToRenew t) {
-    //check to see if the token already exists in the list
-    if (delegationTokens.contains(t))
-      return;
     delegationTokens.add(t);
   }
   

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=965629&r1=965628&r2=965629&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Mon Jul 19 20:53:10 2010
@@ -34,28 +34,25 @@ import java.util.Random;
 import java.util.Stack;
 import java.util.StringTokenizer;
 
-import javax.security.auth.login.LoginException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -65,16 +62,15 @@ import org.apache.hadoop.mapred.InputSpl
 import org.apache.hadoop.mapred.InvalidInputException;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -739,19 +735,22 @@ public class DistCp implements Tool {
   }
 
   /** Sanity check for srcPath */
-  private static void checkSrcPath(Configuration conf, List<Path> srcPaths,
-      JobConf jobConf) throws IOException {
+  private static void checkSrcPath(JobConf jobConf, List<Path> srcPaths) 
+  throws IOException {
     List<IOException> rslt = new ArrayList<IOException>();
     List<Path> unglobbed = new LinkedList<Path>();
     
     // get tokens for all the required FileSystems..
+    // also set the renewer as the JobTracker for the hftp case
+    jobConf.set(HftpFileSystem.HFTP_RENEWER, 
+        jobConf.get(JobTracker.JT_USER_NAME, ""));
     Path[] ps = new Path[srcPaths.size()];
     ps = srcPaths.toArray(ps);
-    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, conf);
+    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);
     
     
     for (Path p : srcPaths) {
-      FileSystem fs = p.getFileSystem(conf);
+      FileSystem fs = p.getFileSystem(jobConf);
       FileStatus[] inputs = fs.globStatus(p);
       
       if(inputs != null && inputs.length > 0) {
@@ -782,7 +781,7 @@ public class DistCp implements Tool {
 
     JobConf job = createJobConf(conf);
     
-    checkSrcPath(conf, args.srcs, job);
+    checkSrcPath(job, args.srcs);
     if (args.preservedAttributes != null) {
       job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
     }