You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/07/19 22:53:10 UTC
svn commit: r965629 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
src/tools/org/apache/hadoop/tools/DistCp.java
Author: ddas
Date: Mon Jul 19 20:53:10 2010
New Revision: 965629
URL: http://svn.apache.org/viewvc?rev=965629&view=rev
Log:
MAPREDUCE-1935. Makes the Distcp to work in a secure environment. Contributed by Boris Shkolnik.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=965629&r1=965628&r2=965629&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jul 19 20:53:10 2010
@@ -83,6 +83,9 @@ Trunk (unreleased changes)
MAPREDUCE-1848. Put number of speculative, data local, rack local
tasks in JobTracker metrics. (Scott Chen via dhruba)
+ MAPREDUCE-1935. Makes the Distcp to work in a secure environment.
+ (Boris Shkolnik via ddas)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=965629&r1=965628&r2=965629&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Mon Jul 19 20:53:10 2010
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.secu
import java.io.IOException;
import java.net.URI;
-import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,18 +30,18 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -111,8 +110,34 @@ public class TokenCache {
token.setService(new Text(fs_addr));
credentials.addToken(new Text(fs_addr), token);
- LOG.info("getting dt for " + p.toString() + ";uri="+ fs_addr +
+ LOG.info("Got dt for " + p + ";uri="+ fs_addr +
";t.service="+token.getService());
+ } else if (fs instanceof HftpFileSystem) {
+ String fs_addr = buildDTServiceName(fs.getUri());
+ Token<DelegationTokenIdentifier> token =
+ TokenCache.getDelegationToken(credentials, fs_addr);
+ if(token != null) {
+ LOG.debug("DT for " + token.getService() + " is already present");
+ continue;
+ }
+ //the initialize method of hftp, called via FileSystem.get() done
+ //earlier gets a delegation token
+
+ Token<? extends TokenIdentifier> t = ((HftpFileSystem) fs).getDelegationToken();
+ if (t != null) {
+ credentials.addToken(new Text(fs_addr), t);
+
+ // in this case port in fs_addr is port for hftp request, but
+ // token's port is for RPC
+ // to find the correct DT we need to know the mapping between Hftp port
+ // and RPC one. hence this new setting in the conf.
+ URI uri = ((HftpFileSystem) fs).getUri();
+ String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+buildDTServiceName(uri);
+ conf.set(key, t.getService().toString());
+ LOG.info("GOT dt for " + p + " and stored in conf as " + key + "="
+ + t.getService());
+
+ }
}
}
}
@@ -184,7 +209,7 @@ public class TokenCache {
return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
}
- static String buildDTServiceName(URI uri) {
+ public static String buildDTServiceName(URI uri) {
int port = uri.getPort();
if(port == -1)
port = NameNode.DEFAULT_PORT;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=965629&r1=965628&r2=965629&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Mon Jul 19 20:53:10 2010
@@ -20,16 +20,13 @@ package org.apache.hadoop.mapreduce.secu
import java.io.IOException;
import java.net.URI;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-
import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -43,10 +40,12 @@ import org.apache.hadoop.hdfs.Distribute
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@InterfaceAudience.Private
@@ -106,13 +105,10 @@ public class DelegationTokenRenewal {
//managing the list of tokens using Map
// jobId=>List<tokens>
- private static List<DelegationTokenToRenew> delegationTokens =
- Collections.synchronizedList(new ArrayList<DelegationTokenToRenew>());
+ private static Set<DelegationTokenToRenew> delegationTokens =
+ Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
//adding token
private static void addTokenToList(DelegationTokenToRenew t) {
- //check to see if the token already exists in the list
- if (delegationTokens.contains(t))
- return;
delegationTokens.add(t);
}
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=965629&r1=965628&r2=965629&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Mon Jul 19 20:53:10 2010
@@ -34,28 +34,25 @@ import java.util.Random;
import java.util.Stack;
import java.util.StringTokenizer;
-import javax.security.auth.login.LoginException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.FileOutputFormat;
@@ -65,16 +62,15 @@ import org.apache.hadoop.mapred.InputSpl
import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -739,19 +735,22 @@ public class DistCp implements Tool {
}
/** Sanity check for srcPath */
- private static void checkSrcPath(Configuration conf, List<Path> srcPaths,
- JobConf jobConf) throws IOException {
+ private static void checkSrcPath(JobConf jobConf, List<Path> srcPaths)
+ throws IOException {
List<IOException> rslt = new ArrayList<IOException>();
List<Path> unglobbed = new LinkedList<Path>();
// get tokens for all the required FileSystems..
+ // also set the renewer as the JobTracker for the hftp case
+ jobConf.set(HftpFileSystem.HFTP_RENEWER,
+ jobConf.get(JobTracker.JT_USER_NAME, ""));
Path[] ps = new Path[srcPaths.size()];
ps = srcPaths.toArray(ps);
- TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, conf);
+ TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);
for (Path p : srcPaths) {
- FileSystem fs = p.getFileSystem(conf);
+ FileSystem fs = p.getFileSystem(jobConf);
FileStatus[] inputs = fs.globStatus(p);
if(inputs != null && inputs.length > 0) {
@@ -782,7 +781,7 @@ public class DistCp implements Tool {
JobConf job = createJobConf(conf);
- checkSrcPath(conf, args.srcs, job);
+ checkSrcPath(job, args.srcs);
if (args.preservedAttributes != null) {
job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
}