You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/09/10 00:07:36 UTC
svn commit: r1167374 - in /hadoop/common/branches/branch-0.20-security: ./
src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/security/
src/core/org/apache/hadoop/security/token/ src/hdfs/org/apache/hadoop/hdfs/
src/hdfs/org/apache/hadoop/hdfs/s...
Author: omalley
Date: Fri Sep 9 22:07:35 2011
New Revision: 1167374
URL: http://svn.apache.org/viewvc?rev=1167374&view=rev
Log:
MAPREDUCE-2764. Allow JobTracker to renew and cancel arbitrary token types,
including delegation tokens obtained via hftp. (omalley)
Added:
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/TokenRenewer.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/build.xml
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileSystem.java
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/SecurityUtil.java
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/Token.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep 9 22:07:35 2011
@@ -25,6 +25,9 @@ Release 0.20.205.0 - unreleased
HDFS-1520. Lightweight NameNode operation recoverLease to trigger
lease recovery. (Hairong Kuang via dhruba)
+ MAPREDUCE-2764. Allow JobTracker to renew and cancel arbitrary token types,
+ including delegation tokens obtained via hftp. (omalley)
+
BUG FIXES
MAPREDUCE-2324. Removed usage of broken
Modified: hadoop/common/branches/branch-0.20-security/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/build.xml?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security/build.xml Fri Sep 9 22:07:35 2011
@@ -704,6 +704,13 @@
<attribute name="Implementation-Vendor" value="Apache"/>
</section>
</manifest>
+ <service type="org.apache.hadoop.security.token.TokenRenewer">
+ <provider classname="org.apache.hadoop.hdfs.DFSClient$Renewer"/>
+ <provider classname="org.apache.hadoop.mapred.JobClient$Renewer"/>
+ <provider classname="org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer"/>
+ <provider classname="org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier$Renewer"/>
+ <provider classname="org.apache.hadoop.hdfs.HftpFileSystem$TokenManager"/>
+ </service>
<fileset dir="${conf.dir}" includes="${jar.properties.list}" />
<fileset file="${jar.extra.properties.list}" />
<zipfileset dir="${build.webapps}" prefix="webapps"/>
Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileSystem.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileSystem.java Fri Sep 9 22:07:35 2011
@@ -89,6 +89,19 @@ public abstract class FileSystem extends
*/
private Set<Path> deleteOnExit = new TreeSet<Path>();
+ /**
+ * This method adds a file system for testing so that we can find it later.
+ * It is only for testing.
+ * @param uri the uri to store it under
+ * @param conf the configuration to store it under
+ * @param fs the file system to store
+ * @throws IOException
+ */
+ public static void addFileSystemForTesting(URI uri, Configuration conf,
+ FileSystem fs) throws IOException {
+ CACHE.map.put(new Cache.Key(uri, conf), fs);
+ }
+
public static FileSystem get(final URI uri, final Configuration conf,
final String user)
throws IOException, InterruptedException {
Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/SecurityUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/SecurityUtil.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/SecurityUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/SecurityUtil.java Fri Sep 9 22:07:35 2011
@@ -270,7 +270,8 @@ public class SecurityUtil {
* @return "ip:port"
*/
static String buildDTAuthority(InetSocketAddress addr) {
- return buildDTAuthority(addr.getAddress().getHostAddress(), addr.getPort());
+ String host= addr.getAddress().getHostAddress();
+ return buildDTAuthority(host, addr.getPort());
}
/**
Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/Token.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/Token.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/Token.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/Token.java Fri Sep 9 22:07:35 2011
@@ -22,9 +22,14 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
+import java.util.ServiceLoader;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -36,10 +41,12 @@ import org.apache.hadoop.io.WritableUtil
* The client-side form of the token.
*/
public class Token<T extends TokenIdentifier> implements Writable {
+ public static final Log LOG = LogFactory.getLog(Token.class);
private byte[] identifier;
private byte[] password;
private Text kind;
private Text service;
+ private TokenRenewer renewer;
/**
* Construct a token given a token identifier and a secret manager for the
@@ -79,6 +86,17 @@ public class Token<T extends TokenIdenti
}
/**
+ * Clone a token.
+ * @param other the token to clone
+ */
+ public Token(Token<T> other) {
+ this.identifier = other.identifier;
+ this.password = other.password;
+ this.kind = other.kind;
+ this.service = other.service;
+ }
+
+ /**
* Get the token identifier
* @return the token identifier
*/
@@ -103,6 +121,16 @@ public class Token<T extends TokenIdenti
}
/**
+ * Set the token kind. This is only intended to be used by services that
+ * wrap another service's token, such as HFTP wrapping HDFS.
+ * @param newKind
+ */
+ @InterfaceAudience.Private
+ public void setKind(Text newKind) {
+ kind = newKind;
+ }
+
+ /**
* Get the service on which the token is supposed to be used
* @return the service name
*/
@@ -242,4 +270,88 @@ public class Token<T extends TokenIdenti
buffer.append(service.toString());
return buffer.toString();
}
+
+ private static ServiceLoader<TokenRenewer> renewers =
+ ServiceLoader.load(TokenRenewer.class);
+
+ private synchronized TokenRenewer getRenewer() throws IOException {
+ if (renewer != null) {
+ return renewer;
+ }
+ renewer = TRIVIAL_RENEWER;
+ for (TokenRenewer canidate: renewers) {
+ if (canidate.handleKind(this.kind)) {
+ renewer = canidate;
+ return renewer;
+ }
+ }
+ LOG.warn("No TokenRenewer defined for token kind " + this.kind);
+ return renewer;
+ }
+
+ /**
+ * Is this token managed so that it can be renewed or cancelled?
+ * @return true, if it can be renewed and cancelled.
+ */
+ public boolean isManaged() throws IOException {
+ return getRenewer().isManaged(this);
+ }
+
+ /**
+ * Renew this delegation token
+ * @return the new expiration time
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public long renew(Configuration conf
+ ) throws IOException, InterruptedException {
+ return getRenewer().renew(this, conf);
+ }
+
+ /**
+ * Cancel this delegation token
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void cancel(Configuration conf
+ ) throws IOException, InterruptedException {
+ getRenewer().cancel(this, conf);
+ }
+
+ /**
+ * A trivial renewer for token kinds that aren't managed. Sub-classes need
+ * to implement getKind for their token kind.
+ */
+ public static class TrivialRenewer extends TokenRenewer {
+
+ // define the kind for this renewer
+ protected Text getKind() {
+ return null;
+ }
+
+ @Override
+ public boolean handleKind(Text kind) {
+ return kind.equals(getKind());
+ }
+
+ @Override
+ public boolean isManaged(Token<?> token) {
+ return false;
+ }
+
+ @Override
+ public long renew(Token<?> token, Configuration conf) {
+ throw new UnsupportedOperationException("Token renewal is not supported "+
+ " for " + token.kind + " tokens");
+ }
+
+ @Override
+ public void cancel(Token<?> token, Configuration conf) throws IOException,
+ InterruptedException {
+ throw new UnsupportedOperationException("Token cancel is not supported " +
+ " for " + token.kind + " tokens");
+ }
+
+ }
+ private static final TokenRenewer TRIVIAL_RENEWER = new TrivialRenewer();
}
Added: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/TokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/TokenRenewer.java?rev=1167374&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/TokenRenewer.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/security/token/TokenRenewer.java Fri Sep 9 22:07:35 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This is the interface for plugins that handle tokens.
+ */
+public abstract class TokenRenewer {
+
+ /**
+ * Does this renewer handle this kind of token?
+ * @param kind the kind of the token
+ * @return true if this renewer can renew it
+ */
+ public abstract boolean handleKind(Text kind);
+
+ /**
+ * Is the given token managed? Only managed tokens may be renewed or
+ * cancelled.
+ * @param token the token being checked
+ * @return true if the token may be renewed or cancelled
+ * @throws IOException
+ */
+ public abstract boolean isManaged(Token<?> token) throws IOException;
+
+ /**
+ * Renew the given token.
+ * @return the new expiration time
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract long renew(Token<?> token,
+ Configuration conf
+ ) throws IOException, InterruptedException;
+
+ /**
+ * Cancel the given token
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract void cancel(Token<?> token,
+ Configuration conf
+ ) throws IOException, InterruptedException;
+}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 9 22:07:35 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.ipc.*;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
import org.apache.hadoop.hdfs.protocol.*;
@@ -42,6 +43,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.*;
import org.apache.commons.logging.*;
@@ -298,17 +300,33 @@ public class DFSClient implements FSCons
return result;
}
+ /**
+ * Renew a delegation token
+ * @param token the token to renew
+ * @return the new expiration time
+ * @throws InvalidToken
+ * @throws IOException
+ * @deprecated Use Token.renew instead.
+ */
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
try {
- LOG.info("Renewing " + stringifyToken(token));
- return namenode.renewDelegationToken(token);
+ return token.renew(conf);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("caught interrupted", ie);
} catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class);
}
}
+ /**
+ * Cancel a delegation token
+ * @param token the token to cancel
+ * @throws InvalidToken
+ * @throws IOException
+ * @deprecated Use Token.cancel instead.
+ */
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
try {
@@ -320,6 +338,55 @@ public class DFSClient implements FSCons
}
}
+ @InterfaceAudience.Private
+ public static class Renewer extends TokenRenewer {
+
+ @Override
+ public boolean handleKind(Text kind) {
+ return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public long renew(Token<?> token, Configuration conf) throws IOException {
+ Token<DelegationTokenIdentifier> delToken =
+ (Token<DelegationTokenIdentifier>) token;
+ LOG.info("Renewing " + stringifyToken(delToken));
+ ClientProtocol nn =
+ createRPCNamenode(NameNode.getAddress(token.getService().toString()),
+ conf, UserGroupInformation.getCurrentUser());
+ try {
+ return nn.renewDelegationToken(delToken);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(InvalidToken.class,
+ AccessControlException.class);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void cancel(Token<?> token, Configuration conf) throws IOException {
+ Token<DelegationTokenIdentifier> delToken =
+ (Token<DelegationTokenIdentifier>) token;
+ LOG.info("Cancelling " + stringifyToken(delToken));
+ ClientProtocol nn =
+ createRPCNamenode(NameNode.getAddress(token.getService().toString()),
+ conf, UserGroupInformation.getCurrentUser());
+ try {
+ nn.cancelDelegationToken(delToken);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(InvalidToken.class,
+ AccessControlException.class);
+ }
+ }
+
+ @Override
+ public boolean isManaged(Token<?> token) throws IOException {
+ return true;
+ }
+
+ }
+
/**
* Report corrupt blocks that were discovered by the client.
*/
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Sep 9 22:07:35 2011
@@ -607,10 +607,15 @@ public class DistributedFileSystem exten
* @param token delegation token obtained earlier
* @return the new expiration time
* @throws IOException
+ * @deprecated Use Token.renew instead.
*/
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
- return dfs.renewDelegationToken(token);
+ try {
+ return token.renew(getConf());
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Caught interrupted", ie);
+ }
}
/**
@@ -618,10 +623,15 @@ public class DistributedFileSystem exten
*
* @param token delegation token
* @throws IOException
+ * @deprecated Use Token.cancel instead.
*/
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
- dfs.cancelDelegationToken(token);
+ try {
+ token.cancel(getConf());
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Caught interrupted", ie);
+ }
}
/**
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java Fri Sep 9 22:07:35 2011
@@ -36,6 +36,7 @@ import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -60,6 +61,7 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.Progressable;
import org.xml.sax.Attributes;
import org.xml.sax.InputSource;
@@ -79,16 +81,19 @@ public class HftpFileSystem extends File
HttpURLConnection.setFollowRedirects(true);
}
- private static final int DEFAULT_PORT = 50470;
+ public static final int DEFAULT_PORT = 50470;
+ public static final Text TOKEN_KIND = new Text("HFTP delegation");
protected InetSocketAddress nnAddr;
protected UserGroupInformation ugi;
private String nnHttpUrl;
- private URI hdfsURI;
+ private Text hdfsServiceName;
+ private URI hftpURI;
public static final String HFTP_TIMEZONE = "UTC";
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
- private Token<DelegationTokenIdentifier> delegationToken;
+ private Token<?> delegationToken;
+ private Token<?> renewToken;
public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
public static final SimpleDateFormat getDateFormat() {
@@ -111,10 +116,9 @@ public class HftpFileSystem extends File
@Override
public String getCanonicalServiceName() {
- return SecurityUtil.buildDTServiceName(hdfsURI, getDefaultPort());
+ return SecurityUtil.buildDTServiceName(hftpURI, getDefaultPort());
}
- @SuppressWarnings("unchecked")
@Override
public void initialize(final URI name, final Configuration conf)
throws IOException {
@@ -123,14 +127,20 @@ public class HftpFileSystem extends File
this.ugi = UserGroupInformation.getCurrentUser();
nnAddr = NetUtils.createSocketAddr(name.toString());
- StringBuilder sb = new StringBuilder("https://");
+ StringBuilder sb = new StringBuilder();
sb.append(NetUtils.normalizeHostName(name.getHost()));
sb.append(":");
sb.append(conf.getInt("dfs.https.port", DEFAULT_PORT));
- nnHttpUrl = sb.toString();
+ String tail = sb.toString();
+ nnHttpUrl = "https://" + tail;
+ try {
+ hftpURI = new URI("hftp://" + tail);
+ } catch (URISyntaxException ue) {
+ throw new IOException("bad uri for hdfs", ue);
+ }
String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
- SecurityUtil.buildDTServiceName(name, DEFAULT_PORT);
+ SecurityUtil.buildDTServiceName(name, DEFAULT_PORT);
LOG.debug("Trying to find DT for " + name + " using key=" + key +
"; conf=" + conf.get(key, ""));
String nnServiceName = conf.get(key);
@@ -139,42 +149,70 @@ public class HftpFileSystem extends File
nnPort = NetUtils.createSocketAddr(nnServiceName,
NameNode.DEFAULT_PORT).getPort();
}
-
sb = new StringBuilder("hdfs://");
sb.append(nnAddr.getHostName());
sb.append(":");
sb.append(nnPort);
try {
- hdfsURI = new URI(sb.toString());
+ URI hdfsURI = new URI(sb.toString());
+ hdfsServiceName = new Text(SecurityUtil.buildDTServiceName(hdfsURI,
+ nnPort));
} catch (URISyntaxException ue) {
throw new IOException("bad uri for hdfs", ue);
}
-
if (UserGroupInformation.isSecurityEnabled()) {
-
- //try finding a token for this namenode (esp applicable for tasks
- //using hftp). If there exists one, just set the delegationField
- String canonicalName = getCanonicalServiceName();
+ String hftpServiceName = getCanonicalServiceName();
for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
- if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(t.getKind())&
- t.getService().toString().equals(canonicalName)) {
- LOG.debug("Found existing DT for " + name);
- delegationToken = (Token<DelegationTokenIdentifier>) t;
- break;
+ Text kind = t.getKind();
+ if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind)){
+ if (hdfsServiceName.equals(t.getService())) {
+ setDelegationToken(t);
+ break;
+ }
+ } else if (TOKEN_KIND.equals(kind)) {
+ if (hftpServiceName.equals(normalizeService(t.getService()
+ .toString()))) {
+ setDelegationToken(t);
+ break;
+ }
}
}
//since we don't already have a token, go get one over https
if (delegationToken == null) {
- delegationToken =
- (Token<DelegationTokenIdentifier>) getDelegationToken(null);
+ setDelegationToken(getDelegationToken(null));
renewer.addTokenToRenew(this);
+ LOG.debug("Created new DT for " + delegationToken.getService());
+ } else {
+ LOG.debug("Found existing DT for " + delegationToken.getService());
}
}
}
-
+
+ private String normalizeService(String service) {
+ int colonIndex = service.indexOf(':');
+ if (colonIndex == -1) {
+ throw new IllegalArgumentException("Invalid service for hftp token: " +
+ service);
+ }
+ String hostname =
+ NetUtils.normalizeHostName(service.substring(0, colonIndex));
+ String port = service.substring(colonIndex + 1);
+ return hostname + ":" + port;
+ }
+
+ private <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
+ renewToken = token;
+ // emulate the 203 usage of the tokens
+ // by setting the kind and service as if they were hdfs tokens
+ delegationToken = new Token<T>(token);
+ delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+ delegationToken.setService(hdfsServiceName);
+ }
+
@Override
- public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
+ public synchronized Token<?> getDelegationToken(final String renewer
+ ) throws IOException {
try {
//Renew TGT if needed
ugi.checkTGTAndReloginFromKeytab();
@@ -193,7 +231,6 @@ public class HftpFileSystem extends File
for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
LOG.debug("Got dt for " + getUri() + ";t.service="
+t.getService());
- t.setService(new Text(getCanonicalServiceName()));
return t;
}
return null;
@@ -636,7 +673,8 @@ public class HftpFileSystem extends File
@Override
public int compareTo(Delayed o) {
if (o.getClass() != RenewAction.class) {
- throw new IllegalArgumentException("Illegal comparision to non-RenewAction");
+ throw new IllegalArgumentException
+ ("Illegal comparision to non-RenewAction");
}
RenewAction other = (RenewAction) o;
return timestamp < other.timestamp ? -1 :
@@ -657,31 +695,20 @@ public class HftpFileSystem extends File
* @return
* @throws IOException
*/
- @SuppressWarnings("unchecked")
public boolean renew() throws IOException, InterruptedException {
final HftpFileSystem fs = weakFs.get();
if (fs != null) {
synchronized (fs) {
- fs.ugi.checkTGTAndReloginFromKeytab();
- fs.ugi.doAs(new PrivilegedExceptionAction<Void>() {
-
- @Override
- public Void run() throws Exception {
- try {
- DelegationTokenFetcher.renewDelegationToken(fs.nnHttpUrl,
- fs.delegationToken);
- } catch (IOException ie) {
- try {
- fs.delegationToken =
- (Token<DelegationTokenIdentifier>) fs.getDelegationToken(null);
- } catch (IOException ie2) {
- throw new IOException("Can't renew or get new delegation token ",
- ie);
- }
- }
- return null;
- }
- });
+ try {
+ fs.renewToken.renew(fs.getConf());
+ } catch (IOException ie) {
+ try {
+ fs.setDelegationToken(fs.getDelegationToken(null));
+ } catch (IOException ie2) {
+ throw new IOException("Can't renew or get new delegation token ",
+ ie);
+ }
+ }
}
}
return fs != null;
@@ -717,7 +744,7 @@ public class HftpFileSystem extends File
}
public void addTokenToRenew(HftpFileSystem fs) {
- queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(),fs));
+ queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(), fs));
}
public void run() {
@@ -747,4 +774,44 @@ public class HftpFileSystem extends File
static {
renewer.start();
}
+
+ @InterfaceAudience.Private
+ public static class TokenManager extends TokenRenewer {
+
+ @Override
+ public boolean handleKind(Text kind) {
+ return kind.equals(TOKEN_KIND);
+ }
+
+ @Override
+ public boolean isManaged(Token<?> token) throws IOException {
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public long renew(Token<?> token,
+ Configuration conf) throws IOException {
+ // update the kerberos credentials, if they are coming from a keytab
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ // use https to renew the token
+ return
+ DelegationTokenFetcher.renewDelegationToken
+ ("https://" + token.getService().toString(),
+ (Token<DelegationTokenIdentifier>) token);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void cancel(Token<?> token,
+ Configuration conf) throws IOException {
+ // update the kerberos credentials, if they are coming from a keytab
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ // use https to cancel the token
+ DelegationTokenFetcher.cancelDelegationToken
+ ("https://" + token.getService().toString(),
+ (Token<DelegationTokenIdentifier>) token);
+ }
+
+ }
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Fri Sep 9 22:07:35 2011
@@ -24,10 +24,12 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
public class BlockTokenIdentifier extends TokenIdentifier {
@@ -182,4 +184,12 @@ public class BlockTokenIdentifier extend
return cache;
}
+
+ @InterfaceAudience.Private
+ public static class Renewer extends Token.TrivialRenewer {
+ @Override
+ protected Text getKind() {
+ return KIND_NAME;
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Fri Sep 9 22:07:35 2011
@@ -19,23 +19,24 @@ package org.apache.hadoop.hdfs.tools;
import java.io.BufferedReader;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.PrintStream;
import java.net.HttpURLConnection;
-import java.net.InetAddress;
-import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
@@ -46,71 +47,112 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.GenericOptionsParser;
/**
* Fetch a DelegationToken from the current Namenode and store it in the
* specified file.
*/
public class DelegationTokenFetcher {
- private static final String USAGE =
- "fetchdt retrieves delegation tokens (optionally over http)\n" +
- "and writes them to specified file.\n" +
- "Usage: fetchdt [--webservice <namenode http addr>] <output filename>";
- private final DistributedFileSystem dfs;
- private final UserGroupInformation ugi;
- private final DataOutputStream out;
- private final Configuration conf;
private static final Log LOG =
LogFactory.getLog(DelegationTokenFetcher.class);
+ private static final String WEBSERVICE = "webservice";
+ private static final String CANCEL = "cancel";
+ private static final String RENEW = "renew";
static {
// Enable Kerberos sockets
System.setProperty("https.cipherSuites", "TLS_KRB5_WITH_3DES_EDE_CBC_SHA");
}
+ private static void printUsage(PrintStream err) throws IOException {
+ err.println("fetchdt retrieves delegation tokens from the NameNode");
+ err.println();
+ err.println("fetchdt <opts> <token file>");
+ err.println("Options:");
+ err.println(" --webservice <url> Url to contact NN on");
+ err.println(" --cancel Cancel the delegation token");
+ err.println(" --renew Renew the delegation token");
+ err.println();
+ GenericOptionsParser.printGenericCommandUsage(err);
+ System.exit(1);
+ }
+
+ private static Collection<Token<?>>
+ readTokens(Path file, Configuration conf) throws IOException{
+ Credentials creds = Credentials.readTokenStorageFile(file, conf);
+ return creds.getAllTokens();
+ }
+
/**
* Command-line interface
*/
public static void main(final String [] args) throws Exception {
+ final Configuration conf = new Configuration();
+ Options fetcherOptions = new Options();
+ fetcherOptions.addOption(WEBSERVICE, true,
+ "HTTPS url to reach the NameNode at");
+ fetcherOptions.addOption(CANCEL, false, "cancel the token");
+ fetcherOptions.addOption(RENEW, false, "renew the token");
+ GenericOptionsParser parser =
+ new GenericOptionsParser(conf, fetcherOptions, args);
+ CommandLine cmd = parser.getCommandLine();
+
+ // get options
+ final String webUrl =
+ cmd.hasOption(WEBSERVICE) ? cmd.getOptionValue(WEBSERVICE) : null;
+ final boolean cancel = cmd.hasOption(CANCEL);
+ final boolean renew = cmd.hasOption(RENEW);
+ String[] remaining = parser.getRemainingArgs();
+
+ // check option validity
+ if (cancel && renew) {
+ System.err.println("ERROR: Only specify cancel or renew.");
+ printUsage(System.err);
+ }
+ if (remaining.length != 1 || remaining[0].charAt(0) == '-') {
+ System.err.println("ERROR: Must specify exacltly one token file");
+ printUsage(System.err);
+ }
+ // default to using the local file system
+ FileSystem local = FileSystem.getLocal(conf);
+ final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
+
// Login the current user
- UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Object>() {
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- if(args.length == 3 && "--webservice".equals(args[0])) {
- getDTfromRemoteIntoFile(args[1], args[2]);
- return null;
- }
- // avoid annoying mistake
- if(args.length == 1 && "--webservice".equals(args[0])) {
- System.out.println(USAGE);
- return null;
- }
- if(args.length != 1 || args[0].isEmpty()) {
- System.out.println(USAGE);
- return null;
- }
-
- DataOutputStream out = null;
-
- try {
- Configuration conf = new Configuration();
- DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(conf);
- out = new DataOutputStream(new FileOutputStream(args[0]));
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
- new DelegationTokenFetcher(dfs, out, ugi, conf).go();
-
- out.flush();
- System.out.println("Succesfully wrote token of size " +
- out.size() + " bytes to "+ args[0]);
- } catch (IOException ioe) {
- System.out.println("Exception encountered:\n" +
- StringUtils.stringifyException(ioe));
- } finally {
- if(out != null) out.close();
+ if (cancel) {
+ for(Token<?> token: readTokens(tokenFile, conf)) {
+ if (token.isManaged()) {
+ token.cancel(conf);
+ System.out.println("Cancelled token for " + token.getService());
+ }
+ }
+ } else if (renew) {
+ for(Token<?> token: readTokens(tokenFile, conf)) {
+ if (token.isManaged()) {
+ token.renew(conf);
+ System.out.println("Renewed token for " + token.getService());
+ }
+ }
+ } else {
+ if (webUrl != null) {
+ getDTfromRemote(webUrl, null).
+ writeTokenStorageFile(tokenFile, conf);
+ System.out.println("Fetched token via http for " + webUrl);
+ } else {
+ FileSystem fs = FileSystem.get(conf);
+ Token<?> token = fs.getDelegationToken(ugi.getShortUserName());
+ Credentials cred = new Credentials();
+ cred.addToken(token.getService(), token);
+ cred.writeTokenStorageFile(tokenFile, conf);
+ System.out.println("Fetched token for " + fs.getUri() + " into " +
+ tokenFile);
+ }
}
return null;
}
@@ -118,35 +160,6 @@ public class DelegationTokenFetcher {
}
- public DelegationTokenFetcher(DistributedFileSystem dfs,
- DataOutputStream out, UserGroupInformation ugi, Configuration conf) {
- checkNotNull("dfs", dfs); this.dfs = dfs;
- checkNotNull("out", out); this.out = out;
- checkNotNull("ugi", ugi); this.ugi = ugi;
- checkNotNull("conf",conf); this.conf = conf;
- }
-
- private void checkNotNull(String s, Object o) {
- if(o == null) throw new IllegalArgumentException(s + " cannot be null.");
- }
-
- public void go() throws IOException {
- String fullName = ugi.getUserName();
- String shortName = ugi.getShortUserName();
- Token<DelegationTokenIdentifier> token =
- dfs.getDelegationToken(new Text(fullName));
-
- // Reconstruct the ip:port of the Namenode
- URI uri = FileSystem.getDefaultUri(conf);
- String nnAddress =
- InetAddress.getByName(uri.getHost()).getHostAddress() + ":" + uri.getPort();
- token.setService(new Text(nnAddress));
-
- Credentials ts = new Credentials();
- ts.addToken(new Text(shortName), token);
- ts.writeTokenStorageToStream(out);
- }
-
/**
* Utility method to obtain a delegation token over http
* @param nnHttpAddr Namenode http addr, such as http://namenode:50070
@@ -172,6 +185,12 @@ public class DelegationTokenFetcher {
Credentials ts = new Credentials();
dis = new DataInputStream(in);
ts.readFields(dis);
+ for(Token<?> token: ts.getAllTokens()) {
+ token.setKind(HftpFileSystem.TOKEN_KIND);
+ token.setService(new Text(SecurityUtil.buildDTServiceName
+ (remoteURL.toURI(),
+ HftpFileSystem.DEFAULT_PORT)));
+ }
return ts;
} catch (Exception e) {
throw new IOException("Unable to obtain remote token", e);
@@ -278,20 +297,4 @@ public class DelegationTokenFetcher {
throw ie;
}
}
-
- /**
- * Utility method to obtain a delegation token over http
- * @param nnHttpAddr Namenode http addr, such as http://namenode:50070
- * @param filename Name of file to store token in
- */
- static private void getDTfromRemoteIntoFile(String nnAddr, String filename)
- throws IOException {
- Credentials ts = getDTfromRemote(nnAddr, null);
-
- DataOutputStream file = new DataOutputStream(new FileOutputStream(filename));
- ts.writeTokenStorageToStream(file);
- file.flush();
- System.out.println("Successfully wrote token of " + file.size()
- + " bytes to " + filename);
- }
}
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Sep 9 22:07:35 2011
@@ -43,6 +43,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
@@ -76,6 +77,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@@ -471,7 +473,7 @@ public class JobClient extends Configure
}
}
- private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
+ private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
Configuration conf) throws IOException {
return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID, addr,
@@ -479,6 +481,41 @@ public class JobClient extends Configure
NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
}
+ @InterfaceAudience.Private
+ public static class Renewer extends TokenRenewer {
+
+ @Override
+ public boolean handleKind(Text kind) {
+ return DelegationTokenIdentifier.MAPREDUCE_DELEGATION_KIND.equals(kind);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public long renew(Token<?> token, Configuration conf
+ ) throws IOException, InterruptedException {
+ InetSocketAddress addr =
+ NetUtils.createSocketAddr(token.getService().toString());
+ JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+ return jt.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void cancel(Token<?> token, Configuration conf
+ ) throws IOException, InterruptedException {
+ InetSocketAddress addr =
+ NetUtils.createSocketAddr(token.getService().toString());
+ JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+ jt.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
+ }
+
+ @Override
+ public boolean isManaged(Token<?> token) throws IOException {
+ return true;
+ }
+
+ }
+
/**
* Build a job client, connect to the indicated job tracker.
*
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Fri Sep 9 22:07:35 2011
@@ -19,10 +19,6 @@
package org.apache.hadoop.mapreduce.security.token;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.URI;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
@@ -38,17 +34,10 @@ import java.util.concurrent.LinkedBlocki
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.StringUtils;
@@ -62,14 +51,14 @@ public class DelegationTokenRenewal {
*
*/
private static class DelegationTokenToRenew {
- public final Token<DelegationTokenIdentifier> token;
+ public final Token<?> token;
public final JobID jobId;
public final Configuration conf;
public long expirationDate;
public TimerTask timerTask;
public DelegationTokenToRenew(
- JobID jId, Token<DelegationTokenIdentifier> t,
+ JobID jId, Token<?> t,
Configuration newConf, long newExpirationDate) {
token = t;
jobId = jId;
@@ -117,10 +106,9 @@ public class DelegationTokenRenewal {
private static class DelegationTokenCancelThread extends Thread {
private static class TokenWithConf {
- Token<DelegationTokenIdentifier> token;
+ Token<?> token;
Configuration conf;
- TokenWithConf(Token<DelegationTokenIdentifier> token,
- Configuration conf) {
+ TokenWithConf(Token<?> token, Configuration conf) {
this.token = token;
this.conf = conf;
}
@@ -132,7 +120,7 @@ public class DelegationTokenRenewal {
super("Delegation Token Canceler");
setDaemon(true);
}
- public void cancelToken(Token<DelegationTokenIdentifier> token,
+ public void cancelToken(Token<?> token,
Configuration conf) {
TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
while (!queue.offer(tokenWithConf)) {
@@ -147,17 +135,24 @@ public class DelegationTokenRenewal {
}
public void run() {
+ TokenWithConf tokenWithConf = null;
while (true) {
- TokenWithConf tokenWithConf = null;
try {
tokenWithConf = queue.take();
- DistributedFileSystem dfs = getDFSForToken(tokenWithConf.token,
- tokenWithConf.conf);
+ final TokenWithConf current = tokenWithConf;
if (LOG.isDebugEnabled()) {
- LOG.debug("Canceling token " + tokenWithConf.token.getService() +
- " for dfs=" + dfs);
+ LOG.debug("Canceling token " + tokenWithConf.token.getService());
}
- dfs.cancelDelegationToken(tokenWithConf.token);
+ // need to use doAs so that http can find the kerberos tgt
+ UserGroupInformation.getLoginUser()
+ .doAs(new PrivilegedExceptionAction<Void>(){
+
+ @Override
+ public Void run() throws Exception {
+ current.token.cancel(current.conf);
+ return null;
+ }
+ });
} catch (IOException e) {
LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +
StringUtils.stringifyException(e));
@@ -176,90 +171,28 @@ public class DelegationTokenRenewal {
delegationTokens.add(t);
}
- // kind of tokens we currently renew
- private static final Text kindHdfs =
- DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
-
- @SuppressWarnings("unchecked")
public static synchronized void registerDelegationTokensForRenewal(
- JobID jobId, Credentials ts, Configuration conf) {
+ JobID jobId, Credentials ts, Configuration conf) throws IOException {
if(ts==null)
return; //nothing to add
- Collection <Token<? extends TokenIdentifier>> tokens = ts.getAllTokens();
+ Collection <Token<?>> tokens = ts.getAllTokens();
long now = System.currentTimeMillis();
- for(Token<? extends TokenIdentifier> t : tokens) {
- // currently we only check for HDFS delegation tokens
- // later we can add more different types.
- if(! t.getKind().equals(kindHdfs)) {
- continue;
- }
- Token<DelegationTokenIdentifier> dt =
- (Token<DelegationTokenIdentifier>)t;
-
+ for(Token<?> t : tokens) {
// first renew happens immediately
- DelegationTokenToRenew dtr =
- new DelegationTokenToRenew(jobId, dt, conf, now);
+ if (t.isManaged()) {
+ DelegationTokenToRenew dtr =
+ new DelegationTokenToRenew(jobId, t, conf, now);
- addTokenToList(dtr);
+ addTokenToList(dtr);
- setTimerForTokenRenewal(dtr, true);
- LOG.info("registering token for renewal for service =" + dt.getService()+
- " and jobID = " + jobId);
- }
- }
-
- protected static long renewDelegationTokenOverHttps(
- final Token<DelegationTokenIdentifier> token, final Configuration conf)
- throws InterruptedException, IOException{
- final String httpAddress = getHttpAddressForToken(token, conf);
-
- Long expDate = (Long) UserGroupInformation.getLoginUser().doAs(
- new PrivilegedExceptionAction<Long>() {
- public Long run() throws IOException {
- return DelegationTokenFetcher.renewDelegationToken(httpAddress, token);
- }
- });
- LOG.info("Renew over HTTP done. addr="+httpAddress+";res="+expDate);
- return expDate;
- }
-
- private static long renewDelegationToken(DelegationTokenToRenew dttr)
- throws Exception {
- long newExpirationDate=System.currentTimeMillis()+3600*1000;
- Token<DelegationTokenIdentifier> token = dttr.token;
- Configuration conf = dttr.conf;
-
- if(token.getKind().equals(kindHdfs)) {
- try {
- try {
- DistributedFileSystem dfs = getDFSForToken(token, conf);
- newExpirationDate = dfs.renewDelegationToken(token);
- } catch(VersionMismatchException vme) {
- // if there is a version mismatch we try over https
- LOG.info("Delegation token renew for t=" + token.getService() +
- " failed with VersionMissmaptch:" + vme.toString()+". Trying over https");
- renewDelegationTokenOverHttps(token, conf);
- }
- }catch (InvalidToken ite) {
- LOG.warn("invalid token - not scheduling for renew: " + ite.getLocalizedMessage());
- removeFailedDelegationToken(dttr);
- throw new IOException("failed to renew token", ite);
- } catch (AccessControlException ioe) {
- LOG.warn("failed to renew token:"+token, ioe);
- removeFailedDelegationToken(dttr);
- throw new IOException("failed to renew token", ioe);
- } catch (Exception e) {
- LOG.warn("failed to renew token:"+token, e);
- // returns default expiration date
+ setTimerForTokenRenewal(dtr, true);
+ LOG.info("registering token for renewal for service =" + t.getService()+
+ " and jobID = " + jobId);
}
- } else {
- throw new Exception("unknown token type to renew: "+token.getKind());
}
- return newExpirationDate;
}
-
/**
* Task - to renew a token
@@ -272,74 +205,36 @@ public class DelegationTokenRenewal {
@Override
public void run() {
- Token<DelegationTokenIdentifier> token = dttr.token;
- long newExpirationDate=0;
+ Token<?> token = dttr.token;
try {
- newExpirationDate = renewDelegationToken(dttr);
+ // need to use doAs so that http can find the kerberos tgt
+ dttr.expirationDate = UserGroupInformation.getLoginUser()
+ .doAs(new PrivilegedExceptionAction<Long>(){
+
+ @Override
+ public Long run() throws Exception {
+ return dttr.token.renew(dttr.conf);
+ }
+ });
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("renewing for:" + token.getService() + ";newED=" +
+ dttr.expirationDate);
+ }
+ setTimerForTokenRenewal(dttr, false);// set the next one
} catch (Exception e) {
- return; // message logged in renewDT method
+ LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
+ removeFailedDelegationToken(dttr);
}
- if (LOG.isDebugEnabled())
- LOG.debug("renewing for:"+token.getService()+";newED=" +
- newExpirationDate);
-
- // new expiration date
- dttr.expirationDate = newExpirationDate;
- setTimerForTokenRenewal(dttr, false);// set the next one
}
}
- private static String getHttpAddressForToken(
- Token<DelegationTokenIdentifier> token, final Configuration conf)
- throws IOException {
-
- String[] ipaddr = token.getService().toString().split(":");
-
- InetAddress iaddr = InetAddress.getByName(ipaddr[0]);
- String dnsName = iaddr.getCanonicalHostName();
- String httpsPort = conf.get("dfs.https.port", "50470");
-
- // always use https (it is for security only)
- return "https://" + dnsName+":"+httpsPort;
- }
-
- private static DistributedFileSystem getDFSForToken(
- Token<DelegationTokenIdentifier> token, final Configuration conf)
- throws Exception {
- DistributedFileSystem dfs = null;
- try {
- //TODO: The service is usually an IPaddress:port. We convert
- //it to dns name and then obtain the filesystem just so that
- //we reuse the existing filesystem handle (that the jobtracker
- //might have for this namenode; the namenode is usually
- //specified as the dns name in the jobtracker).
- //THIS IS A WORKAROUND FOR NOW. NEED TO SOLVE THIS PROBLEM
- //IN A BETTER WAY.
- String[] ipaddr = token.getService().toString().split(":");
- InetAddress iaddr = InetAddress.getByName(ipaddr[0]);
- String dnsName = iaddr.getCanonicalHostName();
- final URI uri = new URI (SCHEME + "://" + dnsName+":"+ipaddr[1]);
- dfs = (DistributedFileSystem)
- UserGroupInformation.getLoginUser().doAs(
- new PrivilegedExceptionAction<DistributedFileSystem>() {
- public DistributedFileSystem run() throws IOException {
- return (DistributedFileSystem) FileSystem.get(uri, conf);
- }
- });
-
-
- } catch (Exception e) {
- LOG.warn("Failed to create a dfs to renew for:" + token.getService(), e);
- throw e;
- }
- return dfs;
- }
-
/**
* set task to renew the token
*/
- private static void setTimerForTokenRenewal(
- DelegationTokenToRenew token, boolean firstTime) {
+ private static
+ void setTimerForTokenRenewal(DelegationTokenToRenew token,
+ boolean firstTime) throws IOException {
// calculate timer time
long now = System.currentTimeMillis();
@@ -351,15 +246,11 @@ public class DelegationTokenRenewal {
renewIn = now + expiresIn - expiresIn/10; // little bit before the expiration
}
- try {
- // need to create new task every time
- TimerTask tTask = new RenewalTimerTask(token);
- token.setTimerTask(tTask); // keep reference to the timer
-
- renewalTimer.schedule(token.timerTask, new Date(renewIn));
- } catch (Exception e) {
- LOG.warn("failed to schedule a task, token will not renew more", e);
- }
+ // need to create new task every time
+ TimerTask tTask = new RenewalTimerTask(token);
+ token.setTimerTask(tTask); // keep reference to the timer
+
+ renewalTimer.schedule(token.timerTask, new Date(renewIn));
}
/**
@@ -372,12 +263,7 @@ public class DelegationTokenRenewal {
// cancel a token
private static void cancelToken(DelegationTokenToRenew t) {
- Token<DelegationTokenIdentifier> token = t.token;
- Configuration conf = t.conf;
-
- if(token.getKind().equals(kindHdfs)) {
- dtCancelThread.cancelToken(token, conf);
- }
+ dtCancelThread.cancelToken(t.token, t.conf);
}
/**
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java Fri Sep 9 22:07:35 2011
@@ -22,7 +22,9 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
@@ -31,7 +33,7 @@ import org.apache.hadoop.security.UserGr
*/
public class JobTokenIdentifier extends TokenIdentifier {
private Text jobid;
- final static Text KIND_NAME = new Text("mapreduce.job");
+ public final static Text KIND_NAME = new Text("mapreduce.job");
/**
* Default constructor
@@ -82,4 +84,12 @@ public class JobTokenIdentifier extends
public void write(DataOutput out) throws IOException {
jobid.write(out);
}
+
+ @InterfaceAudience.Private
+ public static class Renewer extends Token.TrivialRenewer {
+ @Override
+ protected Text getKind() {
+ return KIND_NAME;
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java Fri Sep 9 22:07:35 2011
@@ -28,8 +28,9 @@ import org.apache.hadoop.security.token.
//@InterfaceAudience.Private
public class DelegationTokenIdentifier
extends AbstractDelegationTokenIdentifier {
-static final Text MAPREDUCE_DELEGATION_KIND =
- new Text("MAPREDUCE_DELEGATION_TOKEN");
+
+ public static final Text MAPREDUCE_DELEGATION_KIND =
+ new Text("MAPREDUCE_DELEGATION_TOKEN");
/**
* Create an empty delegation token identifier for reading into.
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java?rev=1167374&r1=1167373&r2=1167374&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/TestDelegationTokenFetcher.java Fri Sep 9 22:07:35 2011
@@ -17,42 +17,41 @@
*/
package org.apache.hadoop.tools;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.junit.Before;
import org.junit.Test;
public class TestDelegationTokenFetcher {
private DistributedFileSystem dfs;
- private DataOutputStream out;
- private UserGroupInformation ugi;
private Configuration conf;
+ private URI uri;
+ private static final String SERVICE_VALUE = "localhost:2005";
@Before
- public void init() {
+ public void init() throws URISyntaxException, IOException {
dfs = mock(DistributedFileSystem.class);
- out = mock(DataOutputStream.class);
- ugi = mock(UserGroupInformation.class);
conf = new Configuration();
+ uri = new URI("hdfs://" + SERVICE_VALUE);
+ FileSystem.addFileSystemForTesting(uri, conf, dfs);
}
/**
@@ -61,68 +60,36 @@ public class TestDelegationTokenFetcher
*/
@Test
public void expectedTokenIsRetrievedFromDFS() throws Exception {
- final String LONG_NAME = "TheDoctor@TARDIS";
- final String SHORT_NAME = "TheDoctor";
- final String SERVICE_VALUE = "localhost:2005";
- URI uri = new URI("hdfs://" + SERVICE_VALUE);
- FileSystem.setDefaultUri(conf, uri);
-
- // Mock out the user's long and short names.
- when(ugi.getUserName()).thenReturn(LONG_NAME);
- when(ugi.getShortUserName()).thenReturn(SHORT_NAME);
-
+ final byte[] ident = new byte[]{1,2,3,4};
+ final byte[] pw = new byte[]{42};
+ final Text kind = new Text("MY-KIND");
+ final Text service = new Text(uri.toString());
+
// Create a token for the fetcher to fetch, wire NN to return it when asked
// for this particular user.
- Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
- when(dfs.getDelegationToken(eq(new Text(LONG_NAME)))).thenReturn(t);
-
- // Now, actually let the TokenFetcher go fetch the token.
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- out = new DataOutputStream(baos);
- new DelegationTokenFetcher(dfs, out, ugi, conf).go();
-
- // now read the data back in and verify correct values
- Credentials ts = new Credentials();
- DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ Token<DelegationTokenIdentifier> t =
+ new Token<DelegationTokenIdentifier>(ident, pw, kind, service);
+ when(dfs.getDelegationToken((String) null)).thenReturn(t);
+ when(dfs.renewDelegationToken(eq(t))).thenReturn(1000L);
- ts.readTokenStorageStream(dis);
- Token<? extends TokenIdentifier> newToken = ts.getToken(new Text(SHORT_NAME));
-
- assertEquals("Should only be one token in storage", ts.numberOfTokens(), 1);
- assertEquals("Service value should have survived",
- "127.0.0.1:2005", newToken.getService().toString());
+ DelegationTokenFetcher.main(new String[]{"-fs", uri.toString(),
+ "file.dta"});
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path p = new Path(fs.getWorkingDirectory(), "file.dta");
+ Credentials creds = Credentials.readTokenStorageFile(p , conf);
+ Iterator<Token<?>> itr = creds.getAllTokens().iterator();
+ // make sure we got back exactly the 1 token we expected
+ assertTrue(itr.hasNext());
+ assertEquals(t, itr.next());
+ assertTrue(!itr.hasNext());
+
+ DelegationTokenFetcher.main(new String[]{"-fs", uri.toString(),
+ "--renew", "file.dta"});
+
+ DelegationTokenFetcher.main(new String[]{"-fs", uri.toString(),
+ "--cancel", "file.dta"});
+ verify(dfs).renewDelegationToken(eq(t));
+ verify(dfs).cancelDelegationToken(eq(t));
}
- private void checkWithNullParam(String s) {
- try {
- new DelegationTokenFetcher(dfs, out, ugi, conf);
- } catch (IllegalArgumentException iae) {
- assertEquals("Expected exception message not received",
- s + " cannot be null.", iae.getMessage());
- return; // received expected exception. We're good.
- }
- fail("null parameter should have failed.");
- }
-
- @Test
- public void dfsCannotBeNull() {
- dfs = null;
- String s = "dfs";
- checkWithNullParam(s);
- }
-
- @Test
- public void dosCannotBeNull() {
- out = null;
- String s = "out";
- checkWithNullParam(s);
- }
-
- @Test
- public void ugiCannotBeNull() {
- ugi = null;
- String s = "ugi";
- checkWithNullParam(s);
- }
}