You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:19:59 UTC
svn commit: r1077487 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
core/org/apache/hadoop/fs/ core/org/apache/hadoop/security/
hdfs/org/apache/hadoop/hdfs/ hdfs/org/apache/hadoop/hdfs/server/namenode/
hdfs/org/apache/hadoop/hdfs/tools/...
Author: omalley
Date: Fri Mar 4 04:19:59 2011
New Revision: 1077487
URL: http://svn.apache.org/viewvc?rev=1077487&view=rev
Log:
commit 767074ea6d22994f05496f4f58d49fd33799c7f1
Author: Devaraj Das <dd...@yahoo-inc.com>
Date: Thu Jun 3 14:45:29 2010 -0700
HDFS:1007 from https://issues.apache.org/jira/secure/attachment/12446280/hdfs-1007-long-running-hftp-client.patch
+++ b/YAHOO-CHANGES.txt
+ HDFS-1007. makes long running servers using hftp work. Also has some
+ refactoring in the MR code to do with handling of delegation tokens.
+ (oom & ddas)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
Removed:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DelegationTokenServlet.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FSDataInputStream.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FSDataInputStream.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FSDataInputStream.java Fri Mar 4 04:19:59 2011
@@ -22,7 +22,7 @@ import java.io.*;
/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
* and buffers input through a {@link BufferedInputStream}. */
public class FSDataInputStream extends DataInputStream
- implements Seekable, PositionedReadable {
+ implements Seekable, PositionedReadable, Closeable {
public FSDataInputStream(InputStream in)
throws IOException {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/FileSystem.java Fri Mar 4 04:19:59 2011
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.security.PrivilegedExceptionAction;
@@ -38,11 +37,12 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
/****************************************************************
* An abstract base class for a fairly generic filesystem. It
@@ -145,6 +145,22 @@ public abstract class FileSystem extends
/** Returns a URI whose scheme and authority identify this FileSystem.*/
public abstract URI getUri();
+ /**
+ * Get the default port for this file system.
+ * @return the default port or 0 if there isn't one
+ */
+ protected int getDefaultPort() {
+ return 0;
+ }
+
+ /**
+ * Get a canonical name for this file system.
+ * @return a URI string that uniquely identifies this file system
+ */
+ public String getCanonicalServiceName() {
+ return SecurityUtil.buildDTServiceName(getUri(), getDefaultPort());
+ }
+
/** @deprecated call #getUri() instead.*/
public String getName() { return getUri().toString(); }
@@ -1112,6 +1128,15 @@ public abstract class FileSystem extends
.makeQualified(this);
}
+ /**
+ * Get a new delegation token for this file system.
+ * @param renewer the account name that is allowed to renew the token.
+ * @return a new delegation token
+ * @throws IOException
+ */
+ public Token<?> getDelegationToken(String renewer) throws IOException {
+ return null;
+ }
/**
* Set the current working directory for the given file system. All relative
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/Credentials.java Fri Mar 4 04:19:59 2011
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -119,15 +120,19 @@ public class Credentials implements Writ
* @param conf
* @throws IOException
*/
- public void readTokenStorageFile(Path filename,
- Configuration conf) throws IOException {
- FSDataInputStream in = filename.getFileSystem(conf).open(filename);
+ public static Credentials readTokenStorageFile(Path filename,
+ Configuration conf
+ ) throws IOException {
+ FSDataInputStream in = null;
+ Credentials credentials = new Credentials();
try {
- readTokenStorageStream(in);
+ in = filename.getFileSystem(conf).open(filename);
+ credentials.readTokenStorageStream(in);
+ in.close();
+ return credentials;
} catch(IOException ioe) {
+ IOUtils.cleanup(LOG, in);
throw new IOException("Exception reading " + filename, ioe);
- } finally {
- in.close();
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java Fri Mar 4 04:19:59 2011
@@ -460,8 +460,8 @@ public class UserGroupInformation {
if (fileLocation != null && isSecurityEnabled()) {
// load the token storage file and put all of the tokens into the
// user.
- Credentials cred = new Credentials();
- cred.readTokenStorageFile(new Path("file:///" + fileLocation), conf);
+ Credentials cred = Credentials.readTokenStorageFile(
+ new Path("file:///" + fileLocation), conf);
for (Token<?> token: cred.getAllTokens()) {
loginUser.addToken(token);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar 4 04:19:59 2011
@@ -265,14 +265,34 @@ public class DFSClient implements FSCons
}
}
+ /** A test method for printing out tokens */
+ public static String stringifyToken(Token<DelegationTokenIdentifier> token
+ ) throws IOException {
+ DelegationTokenIdentifier ident = new DelegationTokenIdentifier();
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ ident.readFields(in);
+ String str = ident.getKind() + " token " + ident.getSequenceNumber() +
+ " for " + ident.getUser().getShortUserName();
+ if (token.getService().getLength() > 0) {
+ return (str + " on " + token.getService());
+ } else {
+ return str;
+ }
+ }
+
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
- return namenode.getDelegationToken(renewer);
+ Token<DelegationTokenIdentifier> result =
+ namenode.getDelegationToken(renewer);
+ LOG.info("Created " + stringifyToken(result));
+ return result;
}
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
try {
+ LOG.info("Renewing " + stringifyToken(token));
return namenode.renewDelegationToken(token);
} catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class,
@@ -283,6 +303,7 @@ public class DFSClient implements FSCons
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
try {
+ LOG.info("Cancelling " + stringifyToken(token));
namenode.cancelDelegationToken(token);
} catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class,
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Mar 4 04:19:59 2011
@@ -537,7 +537,22 @@ public class DistributedFileSystem exten
) throws IOException {
dfs.setTimes(getPathName(p), mtime, atime);
}
-
+
+ @Override
+ protected int getDefaultPort() {
+ return NameNode.DEFAULT_PORT;
+ }
+
+ @Override
+ public
+ Token<DelegationTokenIdentifier> getDelegationToken(String renewer
+ ) throws IOException {
+ Token<DelegationTokenIdentifier> result =
+ dfs.getDelegationToken(renewer == null ? null : new Text(renewer));
+ result.setService(new Text(getCanonicalServiceName()));
+ return result;
+ }
+
/**
* Delegation Token Operations
* These are DFS only operations.
@@ -549,7 +564,9 @@ public class DistributedFileSystem exten
* @param renewer Name of the designated renewer for the token
* @return Token<DelegationTokenIdentifier>
* @throws IOException
+ * @Deprecated use {@link #getDelegationToken(String)}
*/
+ @Deprecated
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
return dfs.getDelegationToken(renewer);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java Fri Mar 4 04:19:59 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.ref.WeakReference;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -33,6 +34,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Random;
import java.util.TimeZone;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
@@ -45,6 +49,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
@@ -75,13 +80,16 @@ public class HftpFileSystem extends File
HttpURLConnection.setFollowRedirects(true);
}
+ private static final int DEFAULT_PORT = 50470;
+
protected InetSocketAddress nnAddr;
protected UserGroupInformation ugi;
+ private String nnHttpUrl;
+ private URI hdfsURI;
public static final String HFTP_TIMEZONE = "UTC";
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
- private Token<? extends TokenIdentifier> delegationToken;
- public static final String HFTP_RENEWER = "fs.hftp.renewer";
+ private Token<DelegationTokenIdentifier> delegationToken;
public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
public static final SimpleDateFormat getDateFormat() {
@@ -98,6 +106,17 @@ public class HftpFileSystem extends File
};
@Override
+ protected int getDefaultPort() {
+ return DEFAULT_PORT;
+ }
+
+ @Override
+ public String getCanonicalServiceName() {
+ return SecurityUtil.buildDTServiceName(hdfsURI, getDefaultPort());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
public void initialize(final URI name, final Configuration conf)
throws IOException {
super.initialize(name, conf);
@@ -105,66 +124,82 @@ public class HftpFileSystem extends File
this.ugi = UserGroupInformation.getCurrentUser();
nnAddr = NetUtils.createSocketAddr(name.toString());
+ StringBuilder sb = new StringBuilder("https://");
+ sb.append(NetUtils.normalizeHostName(name.getHost()));
+ sb.append(":");
+ sb.append(conf.getInt("dfs.https.port", DEFAULT_PORT));
+ nnHttpUrl = sb.toString();
+
+ String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
+ SecurityUtil.buildDTServiceName(name, DEFAULT_PORT);
+ LOG.debug("Trying to find DT for " + name + " using key=" + key +
+ "; conf=" + conf.get(key, ""));
+ String nnServiceName = conf.get(key);
+ int nnPort = NameNode.DEFAULT_PORT;
+ if (nnServiceName != null) {
+ nnPort = NetUtils.createSocketAddr(nnServiceName,
+ NameNode.DEFAULT_PORT).getPort();
+ }
+
+ sb = new StringBuilder("hdfs://");
+ sb.append(nnAddr.getHostName());
+ sb.append(":");
+ sb.append(nnPort);
+ try {
+ hdfsURI = new URI(sb.toString());
+ } catch (URISyntaxException ue) {
+ throw new IOException("bad uri for hdfs", ue);
+ }
if (UserGroupInformation.isSecurityEnabled()) {
- // configuration has the actual service name for this url. Build the key
- // and get it.
- final String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
- SecurityUtil.buildDTServiceName(name, NameNode.DEFAULT_PORT);
-
- LOG.debug("Trying to find DT for " + name + " using key=" + key +
- "; conf=" + conf.get(key, ""));
- Text nnServiceNameText = new Text(conf.get(key, ""));
- Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
//try finding a token for this namenode (esp applicable for tasks
//using hftp). If there exists one, just set the delegationField
- for (Token<? extends TokenIdentifier> t : tokens) {
- if ((t.getService()).equals(nnServiceNameText)) {
+ String canonicalName = getCanonicalServiceName();
+ for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
+ if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(t.getKind())&
+ t.getService().toString().equals(canonicalName)) {
LOG.debug("Found existing DT for " + name);
- delegationToken = t;
- return;
+ delegationToken = (Token<DelegationTokenIdentifier>) t;
+ break;
}
}
//since we don't already have a token, go get one over https
- try {
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
- public Object run() throws IOException {
- StringBuffer sb = new StringBuffer();
- //try https (on http we NEVER get a delegation token)
- String nnHttpUrl = "https://" +
- (sb.append(NetUtils.normalizeHostName(name.getHost()))
- .append(":").append(conf.getInt("dfs.https.port", 50470))).
- toString();
- Credentials c;
- try {
- c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl,
- conf.get(HFTP_RENEWER));
- } catch (Exception e) {
- LOG.info("Couldn't get a delegation token from " + nnHttpUrl +
- " using https.");
- LOG.debug("error was ", e);
- //Maybe the server is in unsecure mode (that's bad but okay)
- return null;
- }
- for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
- //the service field is already set and so setService
- //is not required
- delegationToken = t;
- LOG.debug("Got dt for " + getUri() + ";t.service="
- +t.getService());
- }
- return null;
- }
- });
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ if (delegationToken == null) {
+ delegationToken =
+ (Token<DelegationTokenIdentifier>) getDelegationToken(null);
+ renewer.addTokenToRenew(this);
}
}
}
- public Token<? extends TokenIdentifier> getDelegationToken() {
- return delegationToken;
+ @Override
+ public Token<?> getDelegationToken(final String renewer) throws IOException {
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
+ public Token<?> run() throws IOException {
+ Credentials c;
+ try {
+ c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
+ } catch (Exception e) {
+ LOG.info("Couldn't get a delegation token from " + nnHttpUrl +
+ " using https.");
+ LOG.debug("error was ", e);
+ //Maybe the server is in unsecure mode (that's bad but okay)
+ return null;
+ }
+ for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
+ LOG.debug("Got dt for " + getUri() + ";t.service="
+ +t.getService());
+ t.setService(new Text(getCanonicalServiceName()));
+ return t;
+ }
+ return null;
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -214,10 +249,12 @@ public class HftpFileSystem extends File
protected String updateQuery(String query) throws IOException {
String tokenString = null;
if (UserGroupInformation.isSecurityEnabled()) {
- if (delegationToken != null) {
- tokenString = delegationToken.encodeToUrlString();
- return (query + JspHelper.SET_DELEGATION + tokenString);
- } // else we are talking to an unsecure cluster
+ synchronized (this) {
+ if (delegationToken != null) {
+ tokenString = delegationToken.encodeToUrlString();
+ return (query + JspHelper.SET_DELEGATION + tokenString);
+ } // else we are talking to an insecure cluster
+ }
}
return query;
}
@@ -537,4 +574,145 @@ public class HftpFileSystem extends File
final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
return cs != null? cs: super.getContentSummary(f);
}
+
+ /**
+ * An action that will renew and replace the hftp file system's delegation
+ * tokens automatically.
+ */
+ private static class RenewAction implements Delayed {
+ // when should the renew happen
+ private long timestamp;
+ // a weak reference to the file system so that it can be garbage collected
+ private final WeakReference<HftpFileSystem> weakFs;
+
+ RenewAction(long timestamp, HftpFileSystem fs) {
+ this.timestamp = timestamp;
+ this.weakFs = new WeakReference<HftpFileSystem>(fs);
+ }
+
+ /**
+ * Get the delay until this event should happen.
+ */
+ @Override
+ public long getDelay(TimeUnit unit) {
+ long millisLeft = timestamp - System.currentTimeMillis();
+ return unit.convert(millisLeft, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Compare two events in the same queue.
+ */
+ @Override
+ public int compareTo(Delayed o) {
+ if (o.getClass() != RenewAction.class) {
+ throw new IllegalArgumentException("Illegal comparision to non-RenewAction");
+ }
+ RenewAction other = (RenewAction) o;
+ return timestamp < other.timestamp ? -1 :
+ (timestamp == other.timestamp ? 0 : 1);
+ }
+
+ /**
+ * Set a new time for the renewal. Can only be called when the action
+ * is not in the queue.
+ * @param newTime the new time
+ */
+ public void setNewTime(long newTime) {
+ timestamp = newTime;
+ }
+
+ /**
+ * Renew or replace the delegation token for this file system.
+ * @return
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public boolean renew() throws IOException, InterruptedException {
+ final HftpFileSystem fs = weakFs.get();
+ if (fs != null) {
+ synchronized (fs) {
+ fs.ugi.doAs(new PrivilegedExceptionAction<Void>() {
+
+ @Override
+ public Void run() throws Exception {
+ try {
+ DelegationTokenFetcher.renewDelegationToken(fs.nnHttpUrl,
+ fs.delegationToken);
+ } catch (IOException ie) {
+ try {
+ fs.delegationToken =
+ (Token<DelegationTokenIdentifier>) fs.getDelegationToken(null);
+ } catch (IOException ie2) {
+ throw new IOException("Can't renew or get new delegation token ",
+ ie);
+ }
+ }
+ return null;
+ }
+ });
+ }
+ }
+ return fs != null;
+ }
+
+ public String toString() {
+ StringBuilder result = new StringBuilder();
+ HftpFileSystem fs = weakFs.get();
+ if (fs == null) {
+ return "evaporated token renew";
+ }
+ synchronized (fs) {
+ result.append(fs.delegationToken);
+ }
+ result.append(" renew in ");
+ result.append(getDelay(TimeUnit.SECONDS));
+ result.append(" secs");
+ return result.toString();
+ }
+ }
+
+ /**
+ * A daemon thread that waits for the next file system to renew.
+ */
+ private static class RenewerThread extends Thread {
+ private DelayQueue<RenewAction> queue = new DelayQueue<RenewAction>();
+ // wait for 95% of a day between renewals
+ private final int RENEW_CYCLE = (int) (0.95 * 24 * 60 * 60 * 1000);
+
+ public RenewerThread() {
+ super("HFTP Delegation Token Renewer");
+ setDaemon(true);
+ }
+
+ public void addTokenToRenew(HftpFileSystem fs) {
+ queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(),fs));
+ }
+
+ public void run() {
+ RenewAction action = null;
+ while (true) {
+ try {
+ action = queue.take();
+ if (action.renew()) {
+ action.setNewTime(RENEW_CYCLE + System.currentTimeMillis());
+ queue.add(action);
+ }
+ action = null;
+ } catch (InterruptedException ie) {
+ return;
+ } catch (Exception ie) {
+ if (action != null) {
+ LOG.warn("Failure to renew token " + action, ie);
+ } else {
+ LOG.warn("Failure in renew queue", ie);
+ }
+ }
+ }
+ }
+ }
+
+ private static RenewerThread renewer = new RenewerThread();
+ static {
+ renewer.start();
+ }
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java?rev=1077487&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/CancelDelegationTokenServlet.java Fri Mar 4 04:19:59 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.security.PrivilegedExceptionAction;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Cancel delegation tokens over http for use in hftp.
+ */
+@SuppressWarnings("serial")
+public class CancelDelegationTokenServlet extends DfsServlet {
+ private static final Log LOG = LogFactory.getLog(CancelDelegationTokenServlet.class);
+ public static final String PATH_SPEC = "/cancelDelegationToken";
+ public static final String TOKEN = "token";
+
+ @Override
+ protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
+ throws ServletException, IOException {
+ final UserGroupInformation ugi;
+ final ServletContext context = getServletContext();
+ final Configuration conf =
+ (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+ try {
+ ugi = getUGI(req, conf);
+ } catch(IOException ioe) {
+ LOG.info("Request for token received with no authentication from "
+ + req.getRemoteAddr(), ioe);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN,
+ "Unable to identify or authenticate user");
+ return;
+ }
+ final NameNode nn = (NameNode) context.getAttribute("name.node");
+ String tokenString = req.getParameter(TOKEN);
+ if (tokenString == null) {
+ resp.sendError(HttpServletResponse.SC_MULTIPLE_CHOICES,
+ "Token to renew not specified");
+ }
+ final Token<DelegationTokenIdentifier> token =
+ new Token<DelegationTokenIdentifier>();
+ token.decodeFromUrlString(tokenString);
+
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws Exception {
+ nn.cancelDelegationToken(token);
+ return null;
+ }
+ });
+ } catch(Exception e) {
+ LOG.info("Exception while cancelling token. Re-throwing. ", e);
+ resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+ e.getMessage());
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java?rev=1077487&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/GetDelegationTokenServlet.java Fri Mar 4 04:19:59 2011
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Serve delegation tokens over http for use in hftp.
+ */
+@SuppressWarnings("serial")
+public class GetDelegationTokenServlet extends DfsServlet {
+ private static final Log LOG = LogFactory.getLog(GetDelegationTokenServlet.class);
+ public static final String PATH_SPEC = "/getDelegationToken";
+ public static final String RENEWER = "renewer";
+
+ @Override
+ protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
+ throws ServletException, IOException {
+ final UserGroupInformation ugi;
+ final ServletContext context = getServletContext();
+ final Configuration conf =
+ (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+ try {
+ ugi = getUGI(req, conf);
+ } catch(IOException ioe) {
+ LOG.info("Request for token received with no authentication from "
+ + req.getRemoteAddr(), ioe);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN,
+ "Unable to identify or authenticate user");
+ return;
+ }
+ LOG.info("Sending token: {" + ugi.getUserName() + "," + req.getRemoteAddr() +"}");
+ final NameNode nn = (NameNode) context.getAttribute("name.node");
+ String renewer = req.getParameter(RENEWER);
+ final String renewerFinal = (renewer == null) ?
+ req.getUserPrincipal().getName() : renewer;
+
+ DataOutputStream dos = null;
+ try {
+ dos = new DataOutputStream(resp.getOutputStream());
+ final DataOutputStream dosFinal = dos; // for doAs block
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+
+ Token<DelegationTokenIdentifier> token =
+ nn.getDelegationToken(new Text(renewerFinal));
+ String s = NameNode.getAddress(conf).getAddress().getHostAddress()
+ + ":" + NameNode.getAddress(conf).getPort();
+ token.setService(new Text(s));
+ Credentials ts = new Credentials();
+ ts.addToken(new Text(ugi.getShortUserName()), token);
+ ts.write(dosFinal);
+ dosFinal.close();
+ return null;
+ }
+ });
+
+ } catch(Exception e) {
+ LOG.info("Exception while sending token. Re-throwing. ", e);
+ resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ } finally {
+ if(dos != null) dos.close();
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Mar 4 04:19:59 2011
@@ -291,7 +291,15 @@ public class NameNode implements ClientP
httpServer.setAttribute("name.system.image", getFSImage());
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
httpServer.addInternalServlet("getDelegationToken",
- DelegationTokenServlet.PATH_SPEC, DelegationTokenServlet.class, true);
+ GetDelegationTokenServlet.PATH_SPEC,
+ GetDelegationTokenServlet.class, true);
+ httpServer.addInternalServlet("renewDelegationToken",
+ RenewDelegationTokenServlet.PATH_SPEC,
+ RenewDelegationTokenServlet.class, true);
+ httpServer.addInternalServlet("cancelDelegationToken",
+ CancelDelegationTokenServlet.PATH_SPEC,
+ CancelDelegationTokenServlet.class,
+ true);
httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true);
httpServer.addInternalServlet("getimage", "/getimage",
GetImageServlet.class, true);
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java?rev=1077487&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/RenewDelegationTokenServlet.java Fri Mar 4 04:19:59 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.security.PrivilegedExceptionAction;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Renew delegation tokens over http for use in hftp.
+ */
+@SuppressWarnings("serial")
+public class RenewDelegationTokenServlet extends DfsServlet {
+ private static final Log LOG = LogFactory.getLog(RenewDelegationTokenServlet.class);
+ public static final String PATH_SPEC = "/renewDelegationToken";
+ public static final String TOKEN = "token";
+
+ @Override
+ protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
+ throws ServletException, IOException {
+ final UserGroupInformation ugi;
+ final ServletContext context = getServletContext();
+ final Configuration conf =
+ (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+ try {
+ ugi = getUGI(req, conf);
+ } catch(IOException ioe) {
+ LOG.info("Request for token received with no authentication from "
+ + req.getRemoteAddr(), ioe);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN,
+ "Unable to identify or authenticate user");
+ return;
+ }
+ final NameNode nn = (NameNode) context.getAttribute("name.node");
+ String tokenString = req.getParameter(TOKEN);
+ if (tokenString == null) {
+ resp.sendError(HttpServletResponse.SC_MULTIPLE_CHOICES,
+ "Token to renew not specified");
+ }
+ final Token<DelegationTokenIdentifier> token =
+ new Token<DelegationTokenIdentifier>();
+ token.decodeFromUrlString(tokenString);
+
+ try {
+ long result = ugi.doAs(new PrivilegedExceptionAction<Long>() {
+ public Long run() throws Exception {
+ return nn.renewDelegationToken(token);
+ }
+ });
+ PrintStream os = new PrintStream(resp.getOutputStream());
+ os.println(result);
+ os.close();
+ } catch(Exception e) {
+ LOG.info("Exception while renewing token. Re-throwing. ", e);
+ resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+ e.getMessage());
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Fri Mar 4 04:19:59 2011
@@ -17,22 +17,30 @@
*/
package org.apache.hadoop.hdfs.tools;
+import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.PrivilegedExceptionAction;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.DelegationTokenServlet;
+import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
+import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
+import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
@@ -54,6 +62,13 @@ public class DelegationTokenFetcher {
private final UserGroupInformation ugi;
private final DataOutputStream out;
private final Configuration conf;
+ private static final Log LOG =
+ LogFactory.getLog(DelegationTokenFetcher.class);
+
+ static {
+ // Enable Kerberos sockets
+ System.setProperty("https.cipherSuites", "TLS_KRB5_WITH_3DES_EDE_CBC_SHA");
+ }
/**
* Command-line interface
@@ -136,36 +151,99 @@ public class DelegationTokenFetcher {
* Utility method to obtain a delegation token over http
* @param nnHttpAddr Namenode http addr, such as http://namenode:50070
*/
- static public Credentials getDTfromRemote(String nnAddr, String renewer)
- throws IOException {
- // Enable Kerberos sockets
- System.setProperty("https.cipherSuites", "TLS_KRB5_WITH_3DES_EDE_CBC_SHA");
- DataInputStream dis = null;
-
- try {
- StringBuffer url = new StringBuffer();
- if (renewer != null) {
- url.append(nnAddr).append(DelegationTokenServlet.PATH_SPEC).append("?").
- append(DelegationTokenServlet.RENEWER).append("=").append(renewer);
- } else {
- url.append(nnAddr).append(DelegationTokenServlet.PATH_SPEC);
- }
- System.out.println("Retrieving token from: " + url);
- URL remoteURL = new URL(url.toString());
- SecurityUtil.fetchServiceTicket(remoteURL);
- URLConnection connection = remoteURL.openConnection();
-
- InputStream in = connection.getInputStream();
- Credentials ts = new Credentials();
- dis = new DataInputStream(in);
- ts.readFields(dis);
- return ts;
- } catch (Exception e) {
- throw new IOException("Unable to obtain remote token", e);
- } finally {
- if(dis != null) dis.close();
- }
- }
+ static public Credentials getDTfromRemote(String nnAddr,
+ String renewer) throws IOException {
+ DataInputStream dis = null;
+
+ try {
+ StringBuffer url = new StringBuffer();
+ if (renewer != null) {
+ url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC).append("?").
+ append(GetDelegationTokenServlet.RENEWER).append("=").append(renewer);
+ } else {
+ url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC);
+ }
+ System.out.println("Retrieving token from: " + url);
+ URL remoteURL = new URL(url.toString());
+ SecurityUtil.fetchServiceTicket(remoteURL);
+ URLConnection connection = remoteURL.openConnection();
+
+ InputStream in = connection.getInputStream();
+ Credentials ts = new Credentials();
+ dis = new DataInputStream(in);
+ ts.readFields(dis);
+ return ts;
+ } catch (Exception e) {
+ throw new IOException("Unable to obtain remote token", e);
+ } finally {
+ if(dis != null) dis.close();
+ }
+ }
+
+ /**
+ * Renew a Delegation Token.
+ * @param nnAddr the NameNode's address
+ * @param tok the token to renew
+ * @return the Date that the token will expire next.
+ * @throws IOException
+ */
+ static public long renewDelegationToken(String nnAddr,
+ Token<DelegationTokenIdentifier> tok
+ ) throws IOException {
+ StringBuilder buf = new StringBuilder();
+ buf.append(nnAddr);
+ buf.append(RenewDelegationTokenServlet.PATH_SPEC);
+ buf.append("?");
+ buf.append(RenewDelegationTokenServlet.TOKEN);
+ buf.append("=");
+ buf.append(tok.encodeToUrlString());
+ BufferedReader in = null;
+ try {
+ URL url = new URL(buf.toString());
+ SecurityUtil.fetchServiceTicket(url);
+ URLConnection connection = url.openConnection();
+ in = new BufferedReader(new InputStreamReader
+ (connection.getInputStream()));
+ long result = Long.parseLong(in.readLine());
+ in.close();
+ return result;
+ } catch (IOException ie) {
+ IOUtils.cleanup(LOG, in);
+ throw ie;
+ }
+ }
+
+ /**
+ * Cancel a Delegation Token.
+ * @param nnAddr the NameNode's address
+ * @param tok the token to cancel
+ * @throws IOException
+ */
+ static public void cancelDelegationToken(String nnAddr,
+ Token<DelegationTokenIdentifier> tok
+ ) throws IOException {
+ StringBuilder buf = new StringBuilder();
+ buf.append(nnAddr);
+ buf.append(CancelDelegationTokenServlet.PATH_SPEC);
+ buf.append("?");
+ buf.append(CancelDelegationTokenServlet.TOKEN);
+ buf.append("=");
+ buf.append(tok.encodeToUrlString());
+ BufferedReader in = null;
+ try {
+ URL url = new URL(buf.toString());
+ SecurityUtil.fetchServiceTicket(url);
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ throw new IOException("Error cancelling token:" +
+ connection.getResponseMessage());
+ }
+ } catch (IOException ie) {
+ IOUtils.cleanup(LOG, in);
+ throw ie;
+ }
+ }
+
/**
* Utility method to obtain a delegation token over http
* @param nnHttpAddr Namenode http addr, such as http://namenode:50070
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Mar 4 04:19:59 2011
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
@@ -761,13 +762,7 @@ public class JobClient extends Configure
jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
JobStatus status = null;
try {
- // load the binary file, if the user has one
- String binaryTokenFilename =
- jobCopy.get("mapreduce.job.credentials.binary");
- if (binaryTokenFilename != null) {
- jobCopy.getCredentials().readTokenStorageFile
- (new Path("file:///" + binaryTokenFilename), jobCopy);
- }
+ populateTokenCache(jobCopy, jobCopy.getCredentials());
copyAndConfigureFiles(jobCopy, submitJobDir);
@@ -819,7 +814,7 @@ public class JobClient extends Configure
//
// Now, actually submit the job (using the submit name)
//
- populateTokenCache(jobCopy, jobCopy.getCredentials());
+ printTokens(jobId, jobCopy.getCredentials());
status = jobSubmitClient.submitJob(
jobId, submitJobDir.toString(), jobCopy.getCredentials());
if (status != null) {
@@ -839,6 +834,20 @@ public class JobClient extends Configure
}
@SuppressWarnings("unchecked")
+ private void printTokens(JobID jobId,
+ Credentials credentials) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Printing tokens for job: " + jobId);
+ for(Token<?> token: credentials.getAllTokens()) {
+ if (token.getKind().toString().equals("HDFS_DELEGATION_TOKEN")) {
+ LOG.debug("Submitting with " +
+ DFSClient.stringifyToken((Token<org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier>) token));
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
@@ -1894,11 +1903,19 @@ public class JobClient extends Configure
System.exit(res);
}
- //get secret keys and tokens and store them into TokenCache
@SuppressWarnings("unchecked")
- private void populateTokenCache(Configuration conf, Credentials credentials)
- throws IOException{
- // create TokenStorage object with user secretKeys
+ private void readTokensFromFiles(Configuration conf, Credentials credentials
+ ) throws IOException {
+ // add tokens and secrets coming from a token storage file
+ String binaryTokenFilename =
+ conf.get("mapreduce.job.credentials.binary");
+ if (binaryTokenFilename != null) {
+ Credentials binary =
+ Credentials.readTokenStorageFile(new Path("file:///" +
+ binaryTokenFilename), conf);
+ credentials.addAll(binary);
+ }
+ // add secret keys coming from a json file
String tokensFileName = conf.get("mapreduce.job.credentials.json");
if(tokensFileName != null) {
LOG.info("loading user's secret keys from " + tokensFileName);
@@ -1923,11 +1940,18 @@ public class JobClient extends Configure
if(json_error)
LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
}
-
+ }
+
+ //get secret keys and tokens and store them into TokenCache
+ @SuppressWarnings("unchecked")
+ private void populateTokenCache(Configuration conf, Credentials credentials)
+ throws IOException{
+ readTokensFromFiles(conf, credentials);
// add the delegation tokens from configuration
String [] nameNodes = conf.getStrings(JobContext.JOB_NAMENODES);
- LOG.debug("adding the following namenodes' delegation tokens:" + Arrays.toString(nameNodes));
+ LOG.debug("adding the following namenodes' delegation tokens:" +
+ Arrays.toString(nameNodes));
if(nameNodes != null) {
Path [] ps = new Path[nameNodes.length];
for(int i=0; i< nameNodes.length; i++) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar 4 04:19:59 2011
@@ -80,80 +80,45 @@ public class TokenCache {
}
static void obtainTokensForNamenodesInternal(Credentials credentials,
- Path [] ps, Configuration conf)
- throws IOException {
+ Path [] ps,
+ Configuration conf
+ ) throws IOException {
// get jobtracker principal id (for the renewer)
KerberosName jtKrbName = new KerberosName(conf.get(JobTracker.JT_USER_NAME, ""));
- Text delegTokenRenewer = new Text(jtKrbName.getShortName());
- boolean notReadFile = true;
+ String delegTokenRenewer = jtKrbName.getShortName();
+ boolean readFile = true;
for(Path p: ps) {
- //TODO: Connecting to the namenode is not required in the case,
- //where we already have the credentials in the file
FileSystem fs = FileSystem.get(p.toUri(), conf);
- if(fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
- URI uri = fs.getUri();
- String fs_addr =
- SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
-
- // see if we already have the token
- Token<DelegationTokenIdentifier> token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
- continue;
- }
- if (notReadFile) { //read the file only once
+ String fsName = fs.getCanonicalServiceName();
+ if (TokenCache.getDelegationToken(credentials, fsName) == null) {
+ //TODO: Need to come up with a better place to put
+ //this block of code to do with reading the file
+ if (readFile) {
+ readFile = false;
String binaryTokenFilename =
conf.get("mapreduce.job.credentials.binary");
if (binaryTokenFilename != null) {
- credentials.readTokenStorageFile(new Path("file:///" +
- binaryTokenFilename), conf);
+ Credentials binary;
+ try {
+ binary = Credentials.readTokenStorageFile(new Path("file:///" +
+ binaryTokenFilename), conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ credentials.addAll(binary);
}
- notReadFile = false;
- token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
+ if (TokenCache.getDelegationToken(credentials, fsName) != null) {
+ LOG.debug("DT for " + fsName + " is already present");
continue;
}
}
- // get the token
- token = dfs.getDelegationToken(delegTokenRenewer);
- if(token==null) {
- LOG.warn("Token from " + fs_addr + " is null");
- continue;
- }
-
- token.setService(new Text(fs_addr));
- credentials.addToken(new Text(fs_addr), token);
- LOG.info("Got dt for " + p + ";uri="+ fs_addr +
- ";t.service="+token.getService());
- } else if (fs instanceof HftpFileSystem) {
- String fs_addr =
- SecurityUtil.buildDTServiceName(fs.getUri(), NameNode.DEFAULT_PORT);
- Token<DelegationTokenIdentifier> token =
- TokenCache.getDelegationToken(credentials, fs_addr);
- if(token != null) {
- LOG.debug("DT for " + token.getService() + " is already present");
- continue;
- }
- //the initialize method of hftp, called via FileSystem.get() done
- //earlier gets a delegation token
- Token<? extends TokenIdentifier> t = ((HftpFileSystem) fs).getDelegationToken();
- if (t != null) {
- credentials.addToken(new Text(fs_addr), t);
-
- // in this case port in fs_addr is port for hftp request, but
- // token's port is for RPC
- // to find the correct DT we need to know the mapping between Hftp port
- // and RPC one. hence this new setting in the conf.
- URI uri = ((HftpFileSystem) fs).getUri();
- String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
- SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT);
- conf.set(key, t.getService().toString());
- LOG.info("GOT dt for " + p + " and stored in conf as " + key + "="
- + t.getService());
+ Token<?> token = fs.getDelegationToken(delegTokenRenewer);
+ if (token != null) {
+ Text fsNameText = new Text(fsName);
+ token.setService(fsNameText);
+ credentials.addToken(fsNameText, token);
+ LOG.info("Got dt for " + p + ";uri="+ fsName +
+ ";t.service="+token.getService());
}
}
}
@@ -195,12 +160,14 @@ public class TokenCache {
throws IOException {
Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
- Credentials ts = new Credentials();
- ts.readTokenStorageFile(localJobTokenFile, conf);
+ Credentials ts =
+ Credentials.readTokenStorageFile(localJobTokenFile, conf);
if(LOG.isDebugEnabled()) {
- LOG.debug("Task: Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath()
- +"; num of sec keys = " + ts.numberOfSecretKeys() + " Number of tokens " +
+ LOG.debug("Task: Loaded jobTokenFile from: "+
+ localJobTokenFile.toUri().getPath()
+ +"; num of sec keys = " + ts.numberOfSecretKeys() +
+ " Number of tokens " +
ts.numberOfTokens());
}
return ts;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java?rev=1077487&r1=1077486&r2=1077487&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCp.java Fri Mar 4 04:19:59 2011
@@ -627,10 +627,6 @@ public class DistCp implements Tool {
throws IOException {
List<IOException> rslt = new ArrayList<IOException>();
- // get tokens for all the required FileSystems..
- // also set the renewer as the JobTracker for the hftp case
- jobConf.set(HftpFileSystem.HFTP_RENEWER,
- jobConf.get(JobTracker.JT_USER_NAME, ""));
Path[] ps = new Path[srcPaths.size()];
ps = srcPaths.toArray(ps);
TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);