You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:56:35 UTC
svn commit: r1077252 [1/2] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
core/org/apache/hadoop/http/ core/org/apache/hadoop/ipc/
core/org/apache/hadoop/security/
core/org/apache/hadoop/security/token/delegation/ hdfs/org/apache/hadoop...
Author: omalley
Date: Fri Mar 4 03:56:33 2011
New Revision: 1077252
URL: http://svn.apache.org/viewvc?rev=1077252&view=rev
Log:
commit d4a276dbdcb8ca9485910d86480ad630330d1f02
Author: Devaraj Das <dd...@yahoo-inc.com>
Date: Sat Feb 27 04:04:26 2010 -0800
HDFS:992 from https://issues.apache.org/jira/secure/attachment/12437340/h992-BK-0.20-07.patch
+++ b/YAHOO-CHANGES.txt
+ HDFS-992. Refactors block access token implementation to conform to the
+ generic Token interface. (Kan Zhang via ddas)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockKey.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/token/
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/token/block/
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
Removed:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/AccessTokenHandler.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/BlockAccessKey.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/BlockAccessToken.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/ExportedAccessKeys.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/InvalidAccessTokenException.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/SecurityTestUtil.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestAccessToken.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/http/HttpServer.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/DelegationKey.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/TestUserGroupInformation.java
hadoop/common/branches/branch-0.20-security-patches/src/webapps/datanode/browseBlock.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/datanode/tail.jsp
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/http/HttpServer.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/http/HttpServer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/http/HttpServer.java Fri Mar 4 03:56:33 2011
@@ -132,17 +132,12 @@ public class HttpServer implements Filte
webAppContext.getServletContext().setAttribute(CONF_CONTEXT_ATTRIBUTE, conf);
webServer.addHandler(webAppContext);
-<<<<<<< HEAD:src/core/org/apache/hadoop/http/HttpServer.java
- addDefaultApps(contexts, appDir);
+ addDefaultApps(contexts, appDir, conf);
defineFilter(webAppContext, "krb5Filter",
Krb5AndCertsSslSocketConnector.Krb5SslFilter.class.getName(),
null, null);
-=======
- addDefaultApps(contexts, appDir, conf);
-
->>>>>>> yahoo-hadoop-0.20.1xx:src/core/org/apache/hadoop/http/HttpServer.java
addGlobalFilter("safety", QuotingInputFilter.class.getName(), null);
final FilterInitializer[] initializers = getFilterInitializers(conf);
if (initializers != null) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java Fri Mar 4 03:56:33 2011
@@ -848,7 +848,13 @@ public abstract class Server {
if (authMethod == SaslRpcServer.AuthMethod.DIGEST) {
TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,
secretManager);
- return tokenId.getUser();
+ UserGroupInformation ugi = tokenId.getUser();
+ if (ugi == null) {
+ throw new AccessControlException(
+ "Can't retrieve username from tokenIdentifier.");
+ }
+ ugi.addTokenIdentifier(tokenId);
+ return ugi;
} else {
return UserGroupInformation.createRemoteUser(authorizedId);
}
@@ -1444,7 +1450,7 @@ public abstract class Server {
public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
/** Starts the service. Must be called before any calls will be handled. */
- public synchronized void start() throws IOException {
+ public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/SaslRpcServer.java Fri Mar 4 03:56:33 2011
@@ -68,10 +68,10 @@ public class SaslRpcServer {
return Base64.decodeBase64(identifier.getBytes());
}
- public static TokenIdentifier getIdentifier(String id,
- SecretManager<TokenIdentifier> secretManager) throws InvalidToken {
+ public static <T extends TokenIdentifier> T getIdentifier(String id,
+ SecretManager<T> secretManager) throws InvalidToken {
byte[] tokenId = decodeIdentifier(id);
- TokenIdentifier tokenIdentifier = secretManager.createIdentifier();
+ T tokenIdentifier = secretManager.createIdentifier();
try {
tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream(
tokenId)));
@@ -201,11 +201,12 @@ public class SaslRpcServer {
ac.setAuthorized(false);
}
if (ac.isAuthorized()) {
- String username = getIdentifier(authzid, secretManager).getUser()
- .getUserName().toString();
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
+ String username = getIdentifier(authzid, secretManager).getUser()
+ .getUserName().toString();
LOG.debug("SASL server DIGEST-MD5 callback: setting "
+ "canonicalized client ID: " + username);
+ }
ac.setAuthorizedID(authzid);
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java Fri Mar 4 03:56:33 2011
@@ -599,6 +599,28 @@ public class UserGroupInformation {
}
/**
+ * Add a TokenIdentifier to this UGI. The TokenIdentifier has typically been
+ * authenticated by the RPC layer as belonging to the user represented by this
+ * UGI.
+ *
+ * @param tokenId
+ * tokenIdentifier to be added
+ * @return true on successful add of new tokenIdentifier
+ */
+ public synchronized boolean addTokenIdentifier(TokenIdentifier tokenId) {
+ return subject.getPublicCredentials().add(tokenId);
+ }
+
+ /**
+ * Get the set of TokenIdentifiers belonging to this UGI
+ *
+ * @return the set of TokenIdentifiers belonging to this UGI
+ */
+ public synchronized Set<TokenIdentifier> getTokenIdentifiers() {
+ return subject.getPublicCredentials(TokenIdentifier.class);
+ }
+
+ /**
* Add a token to this UGI
*
* @param token Token to be added
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/DelegationKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/DelegationKey.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/DelegationKey.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/DelegationKey.java Fri Mar 4 03:56:33 2011
@@ -70,9 +70,13 @@ public class DelegationKey implements Wr
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, keyId);
WritableUtils.writeVLong(out, expiryDate);
- byte[] keyBytes = key.getEncoded();
- WritableUtils.writeVInt(out, keyBytes.length);
- out.write(keyBytes);
+ if (key == null) {
+ WritableUtils.writeVInt(out, -1);
+ } else {
+ byte[] keyBytes = key.getEncoded();
+ WritableUtils.writeVInt(out, keyBytes.length);
+ out.write(keyBytes);
+ }
}
/**
@@ -81,8 +85,12 @@ public class DelegationKey implements Wr
keyId = WritableUtils.readVInt(in);
expiryDate = WritableUtils.readVLong(in);
int len = WritableUtils.readVInt(in);
- byte[] keyBytes = new byte[len];
- in.readFully(keyBytes);
- key = AbstractDelegationTokenSecretManager.createSecretKey(keyBytes);
+ if (len == -1) {
+ key = null;
+ } else {
+ byte[] keyBytes = new byte[len];
+ in.readFully(keyBytes);
+ key = AbstractDelegationTokenSecretManager.createSecretKey(keyBytes);
+ }
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar 4 03:56:33 2011
@@ -29,8 +29,8 @@ import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -132,14 +132,19 @@ public class DFSClient implements FSCons
}
static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
- DatanodeID datanodeid, Configuration conf) throws IOException {
+ DatanodeID datanodeid, Configuration conf,
+ Block block, Token<BlockTokenIdentifier> token) throws IOException {
InetSocketAddress addr = NetUtils.createSocketAddr(
datanodeid.getHost() + ":" + datanodeid.getIpcPort());
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
}
+ UserGroupInformation ticket = UserGroupInformation
+ .createRemoteUser(block.toString());
+ ticket.addToken(token);
return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
- ClientDatanodeProtocol.versionID, addr, conf);
+ ClientDatanodeProtocol.versionID, addr, ticket, conf, NetUtils
+ .getDefaultSocketFactory(conf));
}
/**
@@ -718,7 +723,7 @@ public class DFSClient implements FSCons
out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM);
out.writeLong(block.getBlockId());
out.writeLong(block.getGenerationStamp());
- lb.getAccessToken().write(out);
+ lb.getBlockToken().write(out);
out.flush();
final short reply = in.readShort();
@@ -1382,7 +1387,7 @@ public class DFSClient implements FSCons
checksumSize = this.checksum.getChecksumSize();
}
- public static BlockReader newBlockReader(Socket sock, String file, long blockId, BlockAccessToken accessToken,
+ public static BlockReader newBlockReader(Socket sock, String file, long blockId, Token<BlockTokenIdentifier> accessToken,
long genStamp, long startOffset, long len, int bufferSize) throws IOException {
return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
true);
@@ -1390,7 +1395,7 @@ public class DFSClient implements FSCons
/** Java Doc required */
public static BlockReader newBlockReader( Socket sock, String file, long blockId,
- BlockAccessToken accessToken,
+ Token<BlockTokenIdentifier> accessToken,
long genStamp,
long startOffset, long len,
int bufferSize, boolean verifyChecksum)
@@ -1401,7 +1406,7 @@ public class DFSClient implements FSCons
public static BlockReader newBlockReader( Socket sock, String file,
long blockId,
- BlockAccessToken accessToken,
+ Token<BlockTokenIdentifier> accessToken,
long genStamp,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
@@ -1433,7 +1438,7 @@ public class DFSClient implements FSCons
short status = in.readShort();
if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
- throw new InvalidAccessTokenException(
+ throw new InvalidBlockTokenException(
"Got access token error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file
@@ -1723,7 +1728,7 @@ public class DFSClient implements FSCons
NetUtils.connect(s, targetAddr, socketTimeout);
s.setSoTimeout(socketTimeout);
Block blk = targetBlock.getBlock();
- BlockAccessToken accessToken = targetBlock.getAccessToken();
+ Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(),
accessToken,
@@ -1732,7 +1737,7 @@ public class DFSClient implements FSCons
buffersize, verifyChecksum, clientName);
return chosenNode;
} catch (IOException ex) {
- if (ex instanceof InvalidAccessTokenException && refetchToken > 0) {
+ if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
LOG.info("Will fetch a new access token and retry, "
+ "access token was invalid when connecting to " + targetAddr
+ " : " + ex);
@@ -1951,7 +1956,7 @@ public class DFSClient implements FSCons
dn = socketFactory.createSocket();
NetUtils.connect(dn, targetAddr, socketTimeout);
dn.setSoTimeout(socketTimeout);
- BlockAccessToken accessToken = block.getAccessToken();
+ Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
int len = (int) (end - start + 1);
@@ -1973,7 +1978,7 @@ public class DFSClient implements FSCons
e.getPos() + " from " + chosenNode.getName());
reportChecksumFailure(src, block.getBlock(), chosenNode);
} catch (IOException e) {
- if (e instanceof InvalidAccessTokenException && refetchToken > 0) {
+ if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
LOG.info("Will get a new access token and retry, "
+ "access token was invalid when connecting to " + targetAddr
+ " : " + e);
@@ -2217,7 +2222,7 @@ public class DFSClient implements FSCons
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
private Block block;
- private BlockAccessToken accessToken;
+ private Token<BlockTokenIdentifier> accessToken;
final private long blockSize;
private DataChecksum checksum;
private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
@@ -2243,7 +2248,7 @@ public class DFSClient implements FSCons
private volatile boolean appendChunk = false; // appending to existing partial block
private long initialFileSize = 0; // at time of file open
- BlockAccessToken getAccessToken() {
+ Token<BlockTokenIdentifier> getAccessToken() {
return accessToken;
}
@@ -2685,7 +2690,7 @@ public class DFSClient implements FSCons
try {
// Pick the "least" datanode as the primary datanode to avoid deadlock.
primaryNode = Collections.min(Arrays.asList(newnodes));
- primary = createClientDatanodeProtocolProxy(primaryNode, conf);
+ primary = createClientDatanodeProtocolProxy(primaryNode, conf, block, accessToken);
newBlock = primary.recoverBlock(block, isAppend, newnodes);
} catch (IOException e) {
recoveryErrorCount++;
@@ -2741,7 +2746,7 @@ public class DFSClient implements FSCons
// newBlock should never be null and it should contain a newly
// generated access token.
block = newBlock.getBlock();
- accessToken = newBlock.getAccessToken();
+ accessToken = newBlock.getBlockToken();
nodes = newBlock.getLocations();
this.hasError = false;
@@ -2837,7 +2842,7 @@ public class DFSClient implements FSCons
//
if (lastBlock != null) {
block = lastBlock.getBlock();
- accessToken = lastBlock.getAccessToken();
+ accessToken = lastBlock.getBlockToken();
long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
@@ -2927,7 +2932,7 @@ public class DFSClient implements FSCons
long startTime = System.currentTimeMillis();
lb = locateFollowingBlock(startTime);
block = lb.getBlock();
- accessToken = lb.getAccessToken();
+ accessToken = lb.getBlockToken();
nodes = lb.getLocations();
//
@@ -3014,7 +3019,7 @@ public class DFSClient implements FSCons
firstBadLink = Text.readString(blockReplyStream);
if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
- throw new InvalidAccessTokenException(
+ throw new InvalidBlockTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
} else {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Mar 4 03:56:33 2011
@@ -21,10 +21,13 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.token.TokenInfo;
/** An client-datanode protocol for block recovery
*/
+@TokenInfo(BlockTokenSelector.class)
public interface ClientDatanodeProtocol extends VersionedProtocol {
public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Fri Mar 4 03:56:33 2011
@@ -17,8 +17,9 @@
*/
package org.apache.hadoop.hdfs.protocol;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.security.token.Token;
import java.io.*;
@@ -44,7 +45,7 @@ public class LocatedBlock implements Wri
// else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object
private boolean corrupt;
- private BlockAccessToken accessToken = new BlockAccessToken();
+ private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
/**
*/
@@ -78,12 +79,12 @@ public class LocatedBlock implements Wri
}
}
- public BlockAccessToken getAccessToken() {
- return accessToken;
+ public Token<BlockTokenIdentifier> getBlockToken() {
+ return blockToken;
}
- public void setAccessToken(BlockAccessToken token) {
- this.accessToken = token;
+ public void setBlockToken(Token<BlockTokenIdentifier> token) {
+ this.blockToken = token;
}
/**
@@ -122,7 +123,7 @@ public class LocatedBlock implements Wri
// Writable
///////////////////////////////////////////
public void write(DataOutput out) throws IOException {
- accessToken.write(out);
+ blockToken.write(out);
out.writeBoolean(corrupt);
out.writeLong(offset);
b.write(out);
@@ -133,7 +134,7 @@ public class LocatedBlock implements Wri
}
public void readFields(DataInput in) throws IOException {
- accessToken.readFields(in);
+ blockToken.readFields(in);
this.corrupt = in.readBoolean();
offset = in.readLong();
this.b = new Block();
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockKey.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockKey.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockKey.java Fri Mar 4 03:56:33 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+/**
+ * Key used for generating and verifying block tokens
+ */
+public class BlockKey extends DelegationKey {
+
+ public BlockKey() {
+ super();
+ }
+
+ public BlockKey(int keyId, long expiryDate, SecretKey key) {
+ super(keyId, expiryDate, key);
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Fri Mar 4 03:56:33 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+public class BlockTokenIdentifier extends TokenIdentifier {
+ static final Text KIND_NAME = new Text("HDFS_BLOCK_TOKEN");
+
+ private long expiryDate;
+ private int keyId;
+ private String userId;
+ private long blockId;
+ private EnumSet<AccessMode> modes;
+
+ public BlockTokenIdentifier() {
+ this(null, 0, EnumSet.noneOf(AccessMode.class));
+ }
+
+ public BlockTokenIdentifier(String userId, long blockId,
+ EnumSet<AccessMode> modes) {
+ this.userId = userId;
+ this.blockId = blockId;
+ this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
+ }
+
+ @Override
+ public Text getKind() {
+ return KIND_NAME;
+ }
+
+ @Override
+ public UserGroupInformation getUser() {
+ if (userId == null || "".equals(userId)) {
+ return UserGroupInformation.createRemoteUser(Long.toString(blockId));
+ }
+ return UserGroupInformation.createRemoteUser(userId);
+ }
+
+ public long getExpiryDate() {
+ return expiryDate;
+ }
+
+ public void setExpiryDate(long expiryDate) {
+ this.expiryDate = expiryDate;
+ }
+
+ public int getKeyId() {
+ return this.keyId;
+ }
+
+ public void setKeyId(int keyId) {
+ this.keyId = keyId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public long getBlockId() {
+ return blockId;
+ }
+
+ public EnumSet<AccessMode> getAccessModes() {
+ return modes;
+ }
+
+ public String toString() {
+ return "block_token_identifier (expiryDate=" + this.getExpiryDate()
+ + ", keyId=" + this.getKeyId() + ", userId=" + this.getUserId()
+ + ", blockId=" + this.getBlockId() + ", access modes="
+ + this.getAccessModes() + ")";
+ }
+
+ static boolean isEqual(Object a, Object b) {
+ return a == null ? b == null : a.equals(b);
+ }
+
+ /** {@inheritDoc} */
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof BlockTokenIdentifier) {
+ BlockTokenIdentifier that = (BlockTokenIdentifier) obj;
+ return this.expiryDate == that.expiryDate && this.keyId == that.keyId
+ && isEqual(this.userId, that.userId) && this.blockId == that.blockId
+ && isEqual(this.modes, that.modes);
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ public int hashCode() {
+ return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
+ ^ (userId == null ? 0 : userId.hashCode());
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ expiryDate = WritableUtils.readVLong(in);
+ keyId = WritableUtils.readVInt(in);
+ userId = WritableUtils.readString(in);
+ blockId = WritableUtils.readVLong(in);
+ int length = WritableUtils.readVInt(in);
+ for (int i = 0; i < length; i++) {
+ modes.add(WritableUtils.readEnum(in, AccessMode.class));
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVLong(out, expiryDate);
+ WritableUtils.writeVInt(out, keyId);
+ WritableUtils.writeString(out, userId);
+ WritableUtils.writeVLong(out, blockId);
+ WritableUtils.writeVInt(out, modes.size());
+ for (AccessMode aMode : modes) {
+ WritableUtils.writeEnum(out, aMode);
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Fri Mar 4 03:56:33 2011
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * BlockTokenSecretManager can be instantiated in 2 modes, master mode and slave
+ * mode. Master can generate new block keys and export block keys to slaves,
+ * while slaves can only import and use block keys received from master. Both
+ * master and slave can generate and verify block tokens. Typically, master mode
+ * is used by NN and slave mode is used by DN.
+ */
+public class BlockTokenSecretManager extends
+ SecretManager<BlockTokenIdentifier> {
+ public static final Log LOG = LogFactory
+ .getLog(BlockTokenSecretManager.class);
+ public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();
+
+ private final boolean isMaster;
+ /*
+ * keyUpdateInterval is the interval that NN updates its block keys. It should
+ * be set long enough so that all live DN's and Balancer should have sync'ed
+ * their block keys with NN at least once during each interval.
+ */
+ private final long keyUpdateInterval;
+ private volatile long tokenLifetime;
+ private int serialNo = new SecureRandom().nextInt();
+ private BlockKey currentKey;
+ private BlockKey nextKey;
+ private Map<Integer, BlockKey> allKeys;
+
+ public static enum AccessMode {
+ READ, WRITE, COPY, REPLACE
+ };
+
+ /**
+ * Constructor
+ *
+ * @param isMaster
+ * @param keyUpdateInterval
+ * @param tokenLifetime
+ * @throws IOException
+ */
+ public BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
+ long tokenLifetime) throws IOException {
+ this.isMaster = isMaster;
+ this.keyUpdateInterval = keyUpdateInterval;
+ this.tokenLifetime = tokenLifetime;
+ this.allKeys = new HashMap<Integer, BlockKey>();
+ generateKeys();
+ }
+
+ /** Initialize block keys */
+ private synchronized void generateKeys() {
+ if (!isMaster)
+ return;
+ /*
+ * Need to set estimated expiry dates for currentKey and nextKey so that if
+ * NN crashes, DN can still expire those keys. NN will stop using the newly
+ * generated currentKey after the first keyUpdateInterval, however it may
+ * still be used by DN and Balancer to generate new tokens before they get a
+ * chance to sync their keys with NN. Since we require keyUpdInterval to be
+ * long enough so that all live DN's and Balancer will sync their keys with
+ * NN at least once during the period, the estimated expiry date for
+ * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
+ * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
+ * more.
+ */
+ serialNo++;
+ currentKey = new BlockKey(serialNo, System.currentTimeMillis() + 2
+ * keyUpdateInterval + tokenLifetime, generateSecret());
+ serialNo++;
+ nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
+ * keyUpdateInterval + tokenLifetime, generateSecret());
+ allKeys.put(currentKey.getKeyId(), currentKey);
+ allKeys.put(nextKey.getKeyId(), nextKey);
+ }
+
+ /** Export block keys, only to be used in master mode */
+ public synchronized ExportedBlockKeys exportKeys() {
+ if (!isMaster)
+ return null;
+ if (LOG.isDebugEnabled())
+ LOG.debug("Exporting access keys");
+ return new ExportedBlockKeys(true, keyUpdateInterval, tokenLifetime,
+ currentKey, allKeys.values().toArray(new BlockKey[0]));
+ }
+
+ private synchronized void removeExpiredKeys() {
+ long now = System.currentTimeMillis();
+ for (Iterator<Map.Entry<Integer, BlockKey>> it = allKeys.entrySet()
+ .iterator(); it.hasNext();) {
+ Map.Entry<Integer, BlockKey> e = it.next();
+ if (e.getValue().getExpiryDate() < now) {
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * Set block keys, only to be used in slave mode
+ */
+ public synchronized void setKeys(ExportedBlockKeys exportedKeys)
+ throws IOException {
+ if (isMaster || exportedKeys == null)
+ return;
+ LOG.info("Setting block keys");
+ removeExpiredKeys();
+ this.currentKey = exportedKeys.getCurrentKey();
+ BlockKey[] receivedKeys = exportedKeys.getAllKeys();
+ for (int i = 0; i < receivedKeys.length; i++) {
+ if (receivedKeys[i] == null)
+ continue;
+ this.allKeys.put(receivedKeys[i].getKeyId(), receivedKeys[i]);
+ }
+ }
+
+ /**
+ * Update block keys, only to be used in master mode
+ */
+ public synchronized void updateKeys() throws IOException {
+ if (!isMaster)
+ return;
+ LOG.info("Updating block keys");
+ removeExpiredKeys();
+ // set final expiry date of retiring currentKey
+ allKeys.put(currentKey.getKeyId(), new BlockKey(currentKey.getKeyId(),
+ System.currentTimeMillis() + keyUpdateInterval + tokenLifetime,
+ currentKey.getKey()));
+ // update the estimated expiry date of new currentKey
+ currentKey = new BlockKey(nextKey.getKeyId(), System.currentTimeMillis()
+ + 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
+ allKeys.put(currentKey.getKeyId(), currentKey);
+ // generate a new nextKey
+ serialNo++;
+ nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
+ * keyUpdateInterval + tokenLifetime, generateSecret());
+ allKeys.put(nextKey.getKeyId(), nextKey);
+ }
+
+ /** Generate an block token for current user */
+ public Token<BlockTokenIdentifier> generateToken(Block block,
+ EnumSet<AccessMode> modes) throws IOException {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ String userID = (ugi == null ? null : ugi.getShortUserName());
+ return generateToken(userID, block, modes);
+ }
+
+ /** Generate a block token for a specified user */
+ public Token<BlockTokenIdentifier> generateToken(String userId, Block block,
+ EnumSet<AccessMode> modes) throws IOException {
+ BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
+ .getBlockId(), modes);
+ return new Token<BlockTokenIdentifier>(id, this);
+ }
+
+ /**
+ * Check if access should be allowed. userID is not checked if null. This
+ * method doesn't check if token password is correct. It should be used only
+ * when token password has already been verified (e.g., in the RPC layer).
+ */
+ public void checkAccess(BlockTokenIdentifier id, String userId, Block block,
+ AccessMode mode) throws InvalidToken {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking access for user=" + userId + ", block=" + block
+ + ", access mode=" + mode + " using " + id.toString());
+ }
+ if (userId != null && !userId.equals(id.getUserId())) {
+ throw new InvalidToken("Block token with " + id.toString()
+ + " doesn't belong to user " + userId);
+ }
+ if (id.getBlockId() != block.getBlockId()) {
+ throw new InvalidToken("Block token with " + id.toString()
+ + " doesn't apply to block " + block);
+ }
+ if (isExpired(id.getExpiryDate())) {
+ throw new InvalidToken("Block token with " + id.toString()
+ + " is expired.");
+ }
+ if (!id.getAccessModes().contains(mode)) {
+ throw new InvalidToken("Block token with " + id.toString()
+ + " doesn't have " + mode + " permission");
+ }
+ }
+
+ /** Check if access should be allowed. userID is not checked if null */
+ public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
+ Block block, AccessMode mode) throws InvalidToken {
+ BlockTokenIdentifier id = new BlockTokenIdentifier();
+ try {
+ id.readFields(new DataInputStream(new ByteArrayInputStream(token
+ .getIdentifier())));
+ } catch (IOException e) {
+ throw new InvalidToken(
+ "Unable to de-serialize block token identifier for user=" + userId
+ + ", block=" + block + ", access mode=" + mode);
+ }
+ checkAccess(id, userId, block, mode);
+ if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
+ throw new InvalidToken("Block token with " + id.toString()
+ + " doesn't have the correct token password");
+ }
+ }
+
+ private static boolean isExpired(long expiryDate) {
+ return System.currentTimeMillis() > expiryDate;
+ }
+
+ /**
+ * check if a token is expired. for unit test only. return true when token is
+ * expired, false otherwise
+ */
+ static boolean isTokenExpired(Token<BlockTokenIdentifier> token)
+ throws IOException {
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ long expiryDate = WritableUtils.readVLong(in);
+ return isExpired(expiryDate);
+ }
+
+ /** set token lifetime. */
+ public void setTokenLifetime(long tokenLifetime) {
+ this.tokenLifetime = tokenLifetime;
+ }
+
+ /**
+ * Create an empty block token identifier
+ *
+ * @return a newly created empty block token identifier
+ */
+ @Override
+ public BlockTokenIdentifier createIdentifier() {
+ return new BlockTokenIdentifier();
+ }
+
+ /**
+ * Create a new password/secret for the given block token identifier.
+ *
+ * @param identifier
+ * the block token identifier
+ * @return token password/secret
+ */
+ @Override
+ protected byte[] createPassword(BlockTokenIdentifier identifier) {
+ BlockKey key = null;
+ synchronized (this) {
+ key = currentKey;
+ }
+ if (key == null)
+ throw new IllegalStateException("currentKey hasn't been initialized.");
+ identifier.setExpiryDate(System.currentTimeMillis() + tokenLifetime);
+ identifier.setKeyId(key.getKeyId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generating block token for " + identifier.toString());
+ }
+ return createPassword(identifier.getBytes(), key.getKey());
+ }
+
+ /**
+ * Look up the token password/secret for the given block token identifier.
+ *
+ * @param identifier
+ * the block token identifier to look up
+ * @return token password/secret as byte[]
+ * @throws InvalidToken
+ */
+ @Override
+ public byte[] retrievePassword(BlockTokenIdentifier identifier)
+ throws InvalidToken {
+ if (isExpired(identifier.getExpiryDate())) {
+ throw new InvalidToken("Block token with " + identifier.toString()
+ + " is expired.");
+ }
+ BlockKey key = null;
+ synchronized (this) {
+ key = allKeys.get(identifier.getKeyId());
+ }
+ if (key == null) {
+ throw new InvalidToken("Can't re-compute password for "
+ + identifier.toString() + ", since the required block key (keyID="
+ + identifier.getKeyId() + ") doesn't exist.");
+ }
+ return createPassword(identifier.getBytes(), key.getKey());
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java Fri Mar 4 03:56:33 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * A block token selector for HDFS
+ */
+public class BlockTokenSelector implements TokenSelector<BlockTokenIdentifier> {
+
+ @SuppressWarnings("unchecked")
+ public Token<BlockTokenIdentifier> selectToken(Text service,
+ Collection<Token<? extends TokenIdentifier>> tokens) {
+ if (service == null) {
+ return null;
+ }
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ if (BlockTokenIdentifier.KIND_NAME.equals(token.getKind())) {
+ return (Token<BlockTokenIdentifier>) token;
+ }
+ }
+ return null;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java Fri Mar 4 03:56:33 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Object for passing block keys
+ */
+public class ExportedBlockKeys implements Writable {
+ public static final ExportedBlockKeys DUMMY_KEYS = new ExportedBlockKeys();
+ private boolean isBlockTokenEnabled;
+ private long keyUpdateInterval;
+ private long tokenLifetime;
+ private BlockKey currentKey;
+ private BlockKey[] allKeys;
+
+ public ExportedBlockKeys() {
+ this(false, 0, 0, new BlockKey(), new BlockKey[0]);
+ }
+
+ ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
+ long tokenLifetime, BlockKey currentKey, BlockKey[] allKeys) {
+ this.isBlockTokenEnabled = isBlockTokenEnabled;
+ this.keyUpdateInterval = keyUpdateInterval;
+ this.tokenLifetime = tokenLifetime;
+ this.currentKey = currentKey == null ? new BlockKey() : currentKey;
+ this.allKeys = allKeys == null ? new BlockKey[0] : allKeys;
+ }
+
+ public boolean isBlockTokenEnabled() {
+ return isBlockTokenEnabled;
+ }
+
+ public long getKeyUpdateInterval() {
+ return keyUpdateInterval;
+ }
+
+ public long getTokenLifetime() {
+ return tokenLifetime;
+ }
+
+ public BlockKey getCurrentKey() {
+ return currentKey;
+ }
+
+ public BlockKey[] getAllKeys() {
+ return allKeys;
+ }
+
+ // ///////////////////////////////////////////////
+ // Writable
+ // ///////////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory(ExportedBlockKeys.class,
+ new WritableFactory() {
+ public Writable newInstance() {
+ return new ExportedBlockKeys();
+ }
+ });
+ }
+
+ /**
+ */
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(isBlockTokenEnabled);
+ out.writeLong(keyUpdateInterval);
+ out.writeLong(tokenLifetime);
+ currentKey.write(out);
+ out.writeInt(allKeys.length);
+ for (int i = 0; i < allKeys.length; i++) {
+ allKeys[i].write(out);
+ }
+ }
+
+ /**
+ */
+ public void readFields(DataInput in) throws IOException {
+ isBlockTokenEnabled = in.readBoolean();
+ keyUpdateInterval = in.readLong();
+ tokenLifetime = in.readLong();
+ currentKey.readFields(in);
+ this.allKeys = new BlockKey[in.readInt()];
+ for (int i = 0; i < allKeys.length; i++) {
+ allKeys[i] = new BlockKey();
+ allKeys[i].readFields(in);
+ }
+ }
+
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java?rev=1077252&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java Fri Mar 4 03:56:33 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.IOException;
+
+/**
+ * Access token verification failed.
+ */
+public class InvalidBlockTokenException extends IOException {
+ private static final long serialVersionUID = 168L;
+
+ public InvalidBlockTokenException() {
+ super();
+ }
+
+ public InvalidBlockTokenException(String msg) {
+ super(msg);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Mar 4 03:56:33 2011
@@ -56,9 +56,9 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -76,6 +76,7 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@@ -194,10 +195,10 @@ public class Balancer implements Tool {
private NamenodeProtocol namenode;
private ClientProtocol client;
private FileSystem fs;
- private boolean isAccessTokenEnabled;
+ private boolean isBlockTokenEnabled;
private boolean shouldRun;
private long keyUpdaterInterval;
- private AccessTokenHandler accessTokenHandler;
+ private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
private final static Random rnd = new Random();
@@ -368,11 +369,11 @@ public class Balancer implements Tool {
out.writeLong(block.getBlock().getGenerationStamp());
Text.writeString(out, source.getStorageID());
proxySource.write(out);
- BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
- if (isAccessTokenEnabled) {
- accessToken = accessTokenHandler.generateToken(null, block.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
- AccessTokenHandler.AccessMode.COPY));
+ Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+ if (isBlockTokenEnabled) {
+ accessToken = blockTokenSecretManager.generateToken(null, block.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
+ BlockTokenSecretManager.AccessMode.COPY));
}
accessToken.write(out);
out.flush();
@@ -859,25 +860,25 @@ public class Balancer implements Tool {
this.namenode = createNamenode(conf);
this.client = DFSClient.createNamenode(conf);
this.fs = FileSystem.get(conf);
- ExportedAccessKeys keys = namenode.getAccessKeys();
- this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
- if (isAccessTokenEnabled) {
- long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
- long accessTokenLifetime = keys.getTokenLifetime();
- LOG.info("Access token params received from NN: keyUpdateInterval="
- + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
- + accessTokenLifetime / (60 * 1000) + " min(s)");
- this.accessTokenHandler = new AccessTokenHandler(false,
- accessKeyUpdateInterval, accessTokenLifetime);
- this.accessTokenHandler.setKeys(keys);
+ ExportedBlockKeys keys = namenode.getBlockKeys();
+ this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
+ if (isBlockTokenEnabled) {
+ long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+ long blockTokenLifetime = keys.getTokenLifetime();
+ LOG.info("Block token params received from NN: keyUpdateInterval="
+ + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+ + blockTokenLifetime / (60 * 1000) + " min(s)");
+ this.blockTokenSecretManager = new BlockTokenSecretManager(false,
+ blockKeyUpdateInterval, blockTokenLifetime);
+ this.blockTokenSecretManager.setKeys(keys);
/*
- * Balancer should sync its access keys with NN more frequently than NN
- * updates its access keys
+ * Balancer should sync its block keys with NN more frequently than NN
+ * updates its block keys
*/
- this.keyUpdaterInterval = accessKeyUpdateInterval / 4;
- LOG.info("Balancer will update its access keys every "
+ this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
+ LOG.info("Balancer will update its block keys every "
+ keyUpdaterInterval / (60 * 1000) + " minute(s)");
- this.keyupdaterthread = new Daemon(new AccessKeyUpdater());
+ this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
this.shouldRun = true;
this.keyupdaterthread.start();
}
@@ -886,12 +887,12 @@ public class Balancer implements Tool {
/**
* Periodically updates access keys.
*/
- class AccessKeyUpdater implements Runnable {
+ class BlockKeyUpdater implements Runnable {
public void run() {
while (shouldRun) {
try {
- accessTokenHandler.setKeys(namenode.getAccessKeys());
+ blockTokenSecretManager.setKeys(namenode.getBlockKeys());
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Mar 4 03:56:33 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.junit.Assert.assertTrue;
+
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
@@ -40,6 +42,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -62,6 +65,9 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
@@ -97,10 +103,9 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
/**********************************************************
* DataNode is a class (and program) that stores a set of
@@ -198,9 +203,9 @@ public class DataNode extends Configured
int socketWriteTimeout = 0;
boolean transferToAllowed = true;
int writePacketSize = 0;
- boolean isAccessTokenEnabled;
- AccessTokenHandler accessTokenHandler;
- boolean isAccessTokenInitialized = false;
+ boolean isBlockTokenEnabled;
+ BlockTokenSecretManager blockTokenSecretManager;
+ boolean isBlockTokenInitialized = false;
public DataBlockScanner blockScanner = null;
public Daemon blockScannerThread = null;
@@ -410,12 +415,16 @@ public class DataNode extends Configured
ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());
}
+ // BlockTokenSecretManager is created here, but it shouldn't be
+ // used until it is initialized in register().
+ this.blockTokenSecretManager = new BlockTokenSecretManager(false,
+ 0, 0);
//init ipc server
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
conf.get("dfs.datanode.ipc.address"));
ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),
- conf.getInt("dfs.datanode.handler.count", 3), false, conf);
- ipcServer.start();
+ conf.getInt("dfs.datanode.handler.count", 3), false, conf,
+ blockTokenSecretManager);
dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
LOG.info("dnRegistration = " + dnRegistration);
@@ -575,25 +584,24 @@ public class DataNode extends Configured
+ ". Expecting " + storage.getStorageID());
}
- if (!isAccessTokenInitialized) {
+ if (!isBlockTokenInitialized) {
/* first time registering with NN */
- ExportedAccessKeys keys = dnRegistration.exportedKeys;
- this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
- if (isAccessTokenEnabled) {
- long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
- long accessTokenLifetime = keys.getTokenLifetime();
- LOG.info("Access token params received from NN: keyUpdateInterval="
- + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
- + accessTokenLifetime / (60 * 1000) + " min(s)");
- this.accessTokenHandler = new AccessTokenHandler(false,
- accessKeyUpdateInterval, accessTokenLifetime);
- }
- isAccessTokenInitialized = true;
- }
-
- if (isAccessTokenEnabled) {
- accessTokenHandler.setKeys(dnRegistration.exportedKeys);
- dnRegistration.exportedKeys = ExportedAccessKeys.DUMMY_KEYS;
+ ExportedBlockKeys keys = dnRegistration.exportedKeys;
+ this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
+ if (isBlockTokenEnabled) {
+ long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+ long blockTokenLifetime = keys.getTokenLifetime();
+ LOG.info("Block token params received from NN: keyUpdateInterval="
+ + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+ + blockTokenLifetime / (60 * 1000) + " min(s)");
+ blockTokenSecretManager.setTokenLifetime(blockTokenLifetime);
+ }
+ isBlockTokenInitialized = true;
+ }
+
+ if (isBlockTokenEnabled) {
+ blockTokenSecretManager.setKeys(dnRegistration.exportedKeys);
+ dnRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
}
// random short delay - helps scatter the BR from all DNs
@@ -962,8 +970,8 @@ public class DataNode extends Configured
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
- if (isAccessTokenEnabled) {
- accessTokenHandler.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
+ if (isBlockTokenEnabled) {
+ blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
}
break;
default:
@@ -1222,10 +1230,10 @@ public class DataNode extends Configured
for (int i = 1; i < targets.length; i++) {
targets[i].write(out);
}
- BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
- if (isAccessTokenEnabled) {
- accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
- EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
+ Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+ if (isBlockTokenEnabled) {
+ accessToken = blockTokenSecretManager.generateToken(null, b,
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
}
accessToken.write(out);
// send data & checksum
@@ -1261,6 +1269,7 @@ public class DataNode extends Configured
// start dataXceiveServer
dataXceiverServer.start();
+ ipcServer.start();
while (shouldRun) {
try {
@@ -1631,9 +1640,9 @@ public class DataNode extends Configured
DatanodeID.EMPTY_ARRAY);
//always return a new access token even if everything else stays the same
LocatedBlock b = new LocatedBlock(block, targets);
- if (isAccessTokenEnabled) {
- b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ if (isBlockTokenEnabled) {
+ b.setBlockToken(blockTokenSecretManager.generateToken(null, b.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
return b;
}
@@ -1666,9 +1675,9 @@ public class DataNode extends Configured
LocatedBlock b = new LocatedBlock(newblock, info); // success
// should have used client ID to generate access token, but since
// owner ID is not checked, we simply pass null for now.
- if (isAccessTokenEnabled) {
- b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ if (isBlockTokenEnabled) {
+ b.setBlockToken(blockTokenSecretManager.generateToken(null, b.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
return b;
}
@@ -1687,6 +1696,23 @@ public class DataNode extends Configured
public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
) throws IOException {
logRecoverBlock("Client", block, targets);
+ if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
+ Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
+ .getTokenIdentifiers();
+ if (tokenIds.size() != 1) {
+ throw new IOException("Can't continue with recoverBlock() "
+ + "authorization since " + tokenIds.size() + " BlockTokenIdentifier "
+ + "is found.");
+ }
+ for (TokenIdentifier tokenId : tokenIds) {
+ BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got: " + id.toString());
+ }
+ blockTokenSecretManager.checkAccess(id, null, block,
+ BlockTokenSecretManager.AccessMode.WRITE);
+ }
+ }
return recoverBlock(block, keepLength, targets, false);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Mar 4 03:56:33 2011
@@ -32,14 +32,16 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
@@ -151,23 +153,26 @@ class DataXceiver implements Runnable, F
long startOffset = in.readLong();
long length = in.readLong();
String clientName = Text.readString(in);
- BlockAccessToken accessToken = new BlockAccessToken();
+ Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
- if (datanode.isAccessTokenEnabled
- && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
- AccessTokenHandler.AccessMode.READ)) {
+ if (datanode.isBlockTokenEnabled) {
try {
- out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
- out.flush();
- throw new IOException("Access token verification failed, for client "
- + remoteAddress + " for OP_READ_BLOCK for block " + block);
- } finally {
- IOUtils.closeStream(out);
+ datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
+ BlockTokenSecretManager.AccessMode.READ);
+ } catch (InvalidToken e) {
+ try {
+ out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+ out.flush();
+ throw new IOException("Access token verification failed, for client "
+ + remoteAddress + " for OP_READ_BLOCK for block " + block);
+ } finally {
+ IOUtils.closeStream(out);
+ }
}
}
// send the block
@@ -258,24 +263,27 @@ class DataXceiver implements Runnable, F
tmp.readFields(in);
targets[i] = tmp;
}
- BlockAccessToken accessToken = new BlockAccessToken();
+ Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);
DataOutputStream replyOut = null; // stream to prev target
replyOut = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
- if (datanode.isAccessTokenEnabled
- && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
- .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
+ if (datanode.isBlockTokenEnabled) {
try {
- if (client.length() != 0) {
- replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
- Text.writeString(replyOut, datanode.dnRegistration.getName());
- replyOut.flush();
+ datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
+ BlockTokenSecretManager.AccessMode.WRITE);
+ } catch (InvalidToken e) {
+ try {
+ if (client.length() != 0) {
+ replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+ Text.writeString(replyOut, datanode.dnRegistration.getName());
+ replyOut.flush();
+ }
+ throw new IOException("Access token verification failed, for client "
+ + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
+ } finally {
+ IOUtils.closeStream(replyOut);
}
- throw new IOException("Access token verification failed, for client "
- + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
- } finally {
- IOUtils.closeStream(replyOut);
}
}
@@ -423,21 +431,24 @@ class DataXceiver implements Runnable, F
*/
void getBlockChecksum(DataInputStream in) throws IOException {
final Block block = new Block(in.readLong(), 0 , in.readLong());
- BlockAccessToken accessToken = new BlockAccessToken();
+ Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);
DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
datanode.socketWriteTimeout));
- if (datanode.isAccessTokenEnabled
- && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
- .getBlockId(), AccessTokenHandler.AccessMode.READ)) {
+ if (datanode.isBlockTokenEnabled) {
try {
- out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
- out.flush();
- throw new IOException(
- "Access token verification failed, for client " + remoteAddress
- + " for OP_BLOCK_CHECKSUM for block " + block);
- } finally {
- IOUtils.closeStream(out);
+ datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
+ BlockTokenSecretManager.AccessMode.READ);
+ } catch (InvalidToken e) {
+ try {
+ out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+ out.flush();
+ throw new IOException(
+ "Access token verification failed, for client " + remoteAddress
+ + " for OP_BLOCK_CHECKSUM for block " + block);
+ } finally {
+ IOUtils.closeStream(out);
+ }
}
}
@@ -484,17 +495,20 @@ class DataXceiver implements Runnable, F
// Read in the header
long blockId = in.readLong(); // read block id
Block block = new Block(blockId, 0, in.readLong());
- BlockAccessToken accessToken = new BlockAccessToken();
+ Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);
- if (datanode.isAccessTokenEnabled
- && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
- AccessTokenHandler.AccessMode.COPY)) {
- LOG.warn("Invalid access token in request from "
- + remoteAddress + " for OP_COPY_BLOCK for block " + block);
- sendResponse(s,
- (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
- datanode.socketWriteTimeout);
- return;
+ if (datanode.isBlockTokenEnabled) {
+ try {
+ datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
+ BlockTokenSecretManager.AccessMode.COPY);
+ } catch (InvalidToken e) {
+ LOG.warn("Invalid access token in request from "
+ + remoteAddress + " for OP_COPY_BLOCK for block " + block);
+ sendResponse(s,
+ (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
+ datanode.socketWriteTimeout);
+ return;
+ }
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
@@ -562,16 +576,19 @@ class DataXceiver implements Runnable, F
String sourceID = Text.readString(in); // read del hint
DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
proxySource.readFields(in);
- BlockAccessToken accessToken = new BlockAccessToken();
+ Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);
- if (datanode.isAccessTokenEnabled
- && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
- AccessTokenHandler.AccessMode.REPLACE)) {
- LOG.warn("Invalid access token in request from "
- + remoteAddress + " for OP_REPLACE_BLOCK for block " + block);
- sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
- datanode.socketWriteTimeout);
- return;
+ if (datanode.isBlockTokenEnabled) {
+ try {
+ datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
+ BlockTokenSecretManager.AccessMode.REPLACE);
+ } catch (InvalidToken e) {
+ LOG.warn("Invalid access token in request from "
+ + remoteAddress + " for OP_REPLACE_BLOCK for block " + block);
+ sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
+ datanode.socketWriteTimeout);
+ return;
+ }
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1077252&r1=1077251&r2=1077252&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar 4 03:56:33 2011
@@ -20,8 +20,11 @@ package org.apache.hadoop.hdfs.server.na
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
@@ -30,8 +33,6 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
@@ -139,7 +140,7 @@ public class FSNamesystem implements FSC
private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
private int totalLoad = 0;
boolean isAccessTokenEnabled;
- AccessTokenHandler accessTokenHandler;
+ BlockTokenSecretManager accessTokenHandler;
private long accessKeyUpdateInterval;
private long accessTokenLifetime;
@@ -344,7 +345,7 @@ public class FSNamesystem implements FSC
conf.getInt("dfs.replication.pending.timeout.sec",
-1) * 1000L);
if (isAccessTokenEnabled) {
- accessTokenHandler = new AccessTokenHandler(true,
+ accessTokenHandler = new BlockTokenSecretManager(true,
accessKeyUpdateInterval, accessTokenLifetime);
}
this.hbthread = new Daemon(new HeartbeatMonitor());
@@ -463,12 +464,12 @@ public class FSNamesystem implements FSC
this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0);
this.supportAppends = conf.getBoolean("dfs.support.append", false);
this.isAccessTokenEnabled = conf.getBoolean(
- AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, false);
if (isAccessTokenEnabled) {
this.accessKeyUpdateInterval = conf.getLong(
- AccessTokenHandler.STRING_ACCESS_KEY_UPDATE_INTERVAL, 600) * 60 * 1000L; // 10 hrs
+ DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY, 600) * 60 * 1000L; // 10 hrs
this.accessTokenLifetime = conf.getLong(
- AccessTokenHandler.STRING_ACCESS_TOKEN_LIFETIME, 600) * 60 * 1000L; // 10 hrs
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, 600) * 60 * 1000L; // 10 hrs
}
LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
+ " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
@@ -714,9 +715,9 @@ public class FSNamesystem implements FSC
*
* @return current access keys
*/
- ExportedAccessKeys getAccessKeys() {
+ ExportedBlockKeys getBlockKeys() {
return isAccessTokenEnabled ? accessTokenHandler.exportKeys()
- : ExportedAccessKeys.DUMMY_KEYS;
+ : ExportedBlockKeys.DUMMY_KEYS;
}
/**
@@ -913,8 +914,8 @@ public class FSNamesystem implements FSC
LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
blockCorrupt);
if (isAccessTokenEnabled) {
- b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+ b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
}
results.add(b);
curPos += blocks[curBlk].getNumBytes();
@@ -1265,8 +1266,8 @@ public class FSNamesystem implements FSC
lb = new LocatedBlock(last, targets,
fileLength-storedBlock.getNumBytes());
if (isAccessTokenEnabled) {
- lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ lb.setBlockToken(accessTokenHandler.generateToken(lb.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
// Remove block from replication queue.
@@ -1379,8 +1380,8 @@ public class FSNamesystem implements FSC
// Create next block
LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
if (isAccessTokenEnabled) {
- b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
return b;
}
@@ -2139,7 +2140,7 @@ public class FSNamesystem implements FSC
nodeReg.getInfoPort(),
nodeReg.getIpcPort());
nodeReg.updateRegInfo(dnReg);
- nodeReg.exportedKeys = getAccessKeys();
+ nodeReg.exportedKeys = getBlockKeys();
NameNode.stateChangeLog.info(
"BLOCK* NameSystem.registerDatanode: "