You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by jg...@apache.org on 2010/05/27 01:31:57 UTC
svn commit: r948634 [2/2] - in /hadoop/hdfs/trunk: ./ ivy/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/security/
src/java/org/apache/hadoop/hdfs/security/token/block/
src/java/org/apache/ha...
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed May 26 23:31:56 2010
@@ -39,8 +39,8 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
import org.apache.hadoop.io.IOUtils;
@@ -49,6 +49,8 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
@@ -126,27 +128,33 @@ class DataXceiver extends DataTransferPr
* Read a block from the disk.
*/
@Override
- protected void opReadBlock(DataInputStream in,
- long blockId, long blockGs, long startOffset, long length,
- String clientName, BlockAccessToken accessToken) throws IOException {
+ protected void opReadBlock(DataInputStream in, long blockId, long blockGs,
+ long startOffset, long length, String clientName,
+ Token<BlockTokenIdentifier> blockToken) throws IOException {
final Block block = new Block(blockId, 0 , blockGs);
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-
- if (datanode.isAccessTokenEnabled
- && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
- AccessTokenHandler.AccessMode.READ)) {
+
+ if (datanode.isBlockTokenEnabled) {
try {
- ERROR_ACCESS_TOKEN.write(out);
- out.flush();
- throw new IOException("Access token verification failed, for client "
- + remoteAddress + " for OP_READ_BLOCK for block " + block);
- } finally {
- IOUtils.closeStream(out);
+ datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+ BlockTokenSecretManager.AccessMode.READ);
+ } catch (InvalidToken e) {
+ try {
+ ERROR_ACCESS_TOKEN.write(out);
+ out.flush();
+ LOG.warn("Block token verification failed, for client "
+ + remoteAddress + " for OP_READ_BLOCK for block " + block + " : "
+ + e.getLocalizedMessage());
+ throw e;
+ } finally {
+ IOUtils.closeStream(out);
+ }
}
}
+
// send the block
BlockSender blockSender = null;
final String clientTraceFmt =
@@ -212,7 +220,7 @@ class DataXceiver extends DataTransferPr
int pipelineSize, BlockConstructionStage stage,
long newGs, long minBytesRcvd, long maxBytesRcvd,
String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
- BlockAccessToken accessToken) throws IOException {
+ Token<BlockTokenIdentifier> blockToken) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
@@ -228,19 +236,24 @@ class DataXceiver extends DataTransferPr
DataOutputStream replyOut = null; // stream to prev target
replyOut = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
- if (datanode.isAccessTokenEnabled
- && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
- .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
+ if (datanode.isBlockTokenEnabled) {
try {
- if (client.length() != 0) {
- ERROR_ACCESS_TOKEN.write(replyOut);
- Text.writeString(replyOut, datanode.dnRegistration.getName());
- replyOut.flush();
+ datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+ BlockTokenSecretManager.AccessMode.WRITE);
+ } catch (InvalidToken e) {
+ try {
+ if (client.length() != 0) {
+ ERROR_ACCESS_TOKEN.write(replyOut);
+ Text.writeString(replyOut, datanode.dnRegistration.getName());
+ replyOut.flush();
+ }
+ LOG.warn("Block token verification failed, for client "
+ + remoteAddress + " for OP_WRITE_BLOCK for block " + block
+ + " : " + e.getLocalizedMessage());
+ throw e;
+ } finally {
+ IOUtils.closeStream(replyOut);
}
- throw new IOException("Access token verification failed, for client "
- + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
- } finally {
- IOUtils.closeStream(replyOut);
}
}
@@ -292,7 +305,7 @@ class DataXceiver extends DataTransferPr
DataTransferProtocol.Sender.opWriteBlock(mirrorOut,
blockId, blockGs,
pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client,
- srcDataNode, targets, accessToken);
+ srcDataNode, targets, blockToken);
if (blockReceiver != null) { // send checksum header
blockReceiver.writeChecksumHeader(mirrorOut);
@@ -395,22 +408,27 @@ class DataXceiver extends DataTransferPr
* Get block checksum (MD5 of CRC32).
*/
@Override
- protected void opBlockChecksum(DataInputStream in,
- long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
+ protected void opBlockChecksum(DataInputStream in, long blockId,
+ long blockGs, Token<BlockTokenIdentifier> blockToken) throws IOException {
final Block block = new Block(blockId, 0 , blockGs);
DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
datanode.socketWriteTimeout));
- if (datanode.isAccessTokenEnabled
- && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
- .getBlockId(), AccessTokenHandler.AccessMode.READ)) {
+ if (datanode.isBlockTokenEnabled) {
try {
- ERROR_ACCESS_TOKEN.write(out);
- out.flush();
- throw new IOException(
- "Access token verification failed, for client " + remoteAddress
- + " for OP_BLOCK_CHECKSUM for block " + block);
- } finally {
- IOUtils.closeStream(out);
+ datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+ BlockTokenSecretManager.AccessMode.READ);
+ } catch (InvalidToken e) {
+ try {
+ ERROR_ACCESS_TOKEN.write(out);
+ out.flush();
+ LOG.warn("Block token verification failed, for client "
+ + remoteAddress + " for OP_BLOCK_CHECKSUM for block " + block
+ + " : " + e.getLocalizedMessage());
+ throw e;
+ } finally {
+ IOUtils.closeStream(out);
+ }
+
}
}
@@ -454,17 +472,22 @@ class DataXceiver extends DataTransferPr
* Read a block from the disk and then sends it to a destination.
*/
@Override
- protected void opCopyBlock(DataInputStream in,
- long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
+ protected void opCopyBlock(DataInputStream in, long blockId, long blockGs,
+ Token<BlockTokenIdentifier> blockToken) throws IOException {
// Read in the header
Block block = new Block(blockId, 0, blockGs);
- if (datanode.isAccessTokenEnabled
- && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
- AccessTokenHandler.AccessMode.COPY)) {
- LOG.warn("Invalid access token in request from "
- + remoteAddress + " for OP_COPY_BLOCK for block " + block);
- sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
- return;
+ if (datanode.isBlockTokenEnabled) {
+ try {
+ datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+ BlockTokenSecretManager.AccessMode.COPY);
+ } catch (InvalidToken e) {
+ LOG.warn("Invalid access token in request from " + remoteAddress
+ + " for OP_COPY_BLOCK for block " + block + " : "
+ + e.getLocalizedMessage());
+ sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+ return;
+ }
+
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
@@ -526,17 +549,21 @@ class DataXceiver extends DataTransferPr
@Override
protected void opReplaceBlock(DataInputStream in,
long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
- BlockAccessToken accessToken) throws IOException {
+ Token<BlockTokenIdentifier> blockToken) throws IOException {
/* read header */
final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
blockGs);
- if (datanode.isAccessTokenEnabled
- && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
- AccessTokenHandler.AccessMode.REPLACE)) {
- LOG.warn("Invalid access token in request from "
- + remoteAddress + " for OP_REPLACE_BLOCK for block " + block);
- sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
- return;
+ if (datanode.isBlockTokenEnabled) {
+ try {
+ datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+ BlockTokenSecretManager.AccessMode.REPLACE);
+ } catch (InvalidToken e) {
+ LOG.warn("Invalid access token in request from " + remoteAddress
+ + " for OP_REPLACE_BLOCK for block " + block + " : "
+ + e.getLocalizedMessage());
+ sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+ return;
+ }
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
@@ -567,7 +594,7 @@ class DataXceiver extends DataTransferPr
/* send request to the proxy */
DataTransferProtocol.Sender.opCopyBlock(proxyOut, block.getBlockId(),
- block.getGenerationStamp(), accessToken);
+ block.getGenerationStamp(), blockToken);
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Wed May 26 23:31:56 2010
@@ -39,10 +39,12 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
@InterfaceAudience.Private
@@ -379,7 +381,7 @@ public class DatanodeJspHelper {
final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddr(), conf);
- BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
+ Token<BlockTokenIdentifier> blockToken = BlockTokenSecretManager.DUMMY_TOKEN;
if (conf.getBoolean(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT)) {
@@ -392,7 +394,7 @@ public class DatanodeJspHelper {
}
for (int i = 0; i < blks.size(); i++) {
if (blks.get(i).getBlock().getBlockId() == blockId) {
- accessToken = blks.get(i).getAccessToken();
+ blockToken = blks.get(i).getBlockToken();
break;
}
}
@@ -558,7 +560,7 @@ public class DatanodeJspHelper {
out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
try {
JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
- datanodePort), blockId, accessToken, genStamp, blockSize,
+ datanodePort), blockId, blockToken, genStamp, blockSize,
startOffset, chunkSizeToView, out, conf);
} catch (Exception e) {
out.print(e);
@@ -627,7 +629,7 @@ public class DatanodeJspHelper {
LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
long blockSize = lastBlk.getBlock().getNumBytes();
long blockId = lastBlk.getBlock().getBlockId();
- BlockAccessToken accessToken = lastBlk.getAccessToken();
+ Token<BlockTokenIdentifier> accessToken = lastBlk.getBlockToken();
long genStamp = lastBlk.getBlock().getGenerationStamp();
DatanodeInfo chosenNode;
try {
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed May 26 23:31:56 2010
@@ -23,8 +23,8 @@ import org.apache.hadoop.HadoopIllegalAr
import org.apache.hadoop.conf.*;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -164,10 +164,10 @@ public class FSNamesystem implements FSC
private FSNamesystemMetrics myFSMetrics;
private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
private int totalLoad = 0;
- boolean isAccessTokenEnabled;
- AccessTokenHandler accessTokenHandler;
- private long accessKeyUpdateInterval;
- private long accessTokenLifetime;
+ boolean isBlockTokenEnabled;
+ BlockTokenSecretManager blockTokenSecretManager;
+ private long blockKeyUpdateInterval;
+ private long blockTokenLifetime;
// Scan interval is not configurable.
private final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 3600000; // 1 hour
@@ -301,9 +301,9 @@ public class FSNamesystem implements FSC
this.safeMode = new SafeModeInfo(conf);
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
conf.get("dfs.hosts.exclude",""));
- if (isAccessTokenEnabled) {
- accessTokenHandler = new AccessTokenHandler(true,
- accessKeyUpdateInterval, accessTokenLifetime);
+ if (isBlockTokenEnabled) {
+ blockTokenSecretManager = new BlockTokenSecretManager(true,
+ blockKeyUpdateInterval, blockTokenLifetime);
}
}
@@ -455,20 +455,20 @@ public class FSNamesystem implements FSC
this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
this.supportAppends = conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
- this.isAccessTokenEnabled = conf.getBoolean(
+ this.isBlockTokenEnabled = conf.getBoolean(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
- if (isAccessTokenEnabled) {
- this.accessKeyUpdateInterval = conf.getLong(
+ if (isBlockTokenEnabled) {
+ this.blockKeyUpdateInterval = conf.getLong(
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT) * 60 * 1000L; // 10 hrs
- this.accessTokenLifetime = conf.getLong(
+ this.blockTokenLifetime = conf.getLong(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT) * 60 * 1000L; // 10 hrs
}
- LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
- + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
- + " min(s), accessTokenLifetime=" + accessTokenLifetime / (60 * 1000)
+ LOG.info("isBlockTokenEnabled=" + isBlockTokenEnabled
+ + " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
+ + " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
}
@@ -632,9 +632,9 @@ public class FSNamesystem implements FSC
*
* @return current access keys
*/
- ExportedAccessKeys getAccessKeys() {
- return isAccessTokenEnabled ? accessTokenHandler.exportKeys()
- : ExportedAccessKeys.DUMMY_KEYS;
+ ExportedBlockKeys getBlockKeys() {
+ return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
+ : ExportedBlockKeys.DUMMY_KEYS;
}
/**
@@ -802,9 +802,9 @@ public class FSNamesystem implements FSC
LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
final long offset, final boolean corrupt) throws IOException {
final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
- if (isAccessTokenEnabled) {
- lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
- EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+ if (isBlockTokenEnabled) {
+ lb.setBlockToken(blockTokenSecretManager.generateToken(b,
+ EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
}
return lb;
}
@@ -1364,9 +1364,9 @@ public class FSNamesystem implements FSC
lb = new LocatedBlock(lastBlock, targets,
fileLength-lastBlock.getNumBytes());
- if (isAccessTokenEnabled) {
- lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ if (isBlockTokenEnabled) {
+ lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
// Remove block from replication queue.
@@ -1482,9 +1482,9 @@ public class FSNamesystem implements FSC
// Create next block
LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
- if (isAccessTokenEnabled) {
- b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ if (isBlockTokenEnabled) {
+ b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
return b;
}
@@ -2310,7 +2310,7 @@ public class FSNamesystem implements FSC
nodeReg.getInfoPort(),
nodeReg.getIpcPort());
nodeReg.updateRegInfo(dnReg);
- nodeReg.exportedKeys = getAccessKeys();
+ nodeReg.exportedKeys = getBlockKeys();
NameNode.stateChangeLog.info(
"BLOCK* NameSystem.registerDatanode: "
@@ -2523,8 +2523,8 @@ public class FSNamesystem implements FSC
cmds.add(cmd);
}
// check access key update
- if (isAccessTokenEnabled && nodeinfo.needKeyUpdate) {
- cmds.add(new KeyUpdateCommand(accessTokenHandler.exportKeys()));
+ if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
+ cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
nodeinfo.needKeyUpdate = false;
}
if (!cmds.isEmpty()) {
@@ -2562,8 +2562,8 @@ public class FSNamesystem implements FSC
/**
* Update access keys.
*/
- void updateAccessKey() throws IOException {
- this.accessTokenHandler.updateKeys();
+ void updateBlockKey() throws IOException {
+ this.blockTokenSecretManager.updateKeys();
synchronized (heartbeats) {
for (DatanodeDescriptor nodeInfo : heartbeats) {
nodeInfo.needKeyUpdate = true;
@@ -2572,11 +2572,11 @@ public class FSNamesystem implements FSC
}
/**
- * Periodically calls heartbeatCheck() and updateAccessKey()
+ * Periodically calls heartbeatCheck() and updateBlockKey()
*/
class HeartbeatMonitor implements Runnable {
private long lastHeartbeatCheck;
- private long lastAccessKeyUpdate;
+ private long lastBlockKeyUpdate;
/**
*/
public void run() {
@@ -2587,9 +2587,9 @@ public class FSNamesystem implements FSC
heartbeatCheck();
lastHeartbeatCheck = now;
}
- if (isAccessTokenEnabled && (lastAccessKeyUpdate + accessKeyUpdateInterval < now)) {
- updateAccessKey();
- lastAccessKeyUpdate = now;
+ if (isBlockTokenEnabled && (lastBlockKeyUpdate + blockKeyUpdateInterval < now)) {
+ updateBlockKey();
+ lastBlockKeyUpdate = now;
}
} catch (Exception e) {
FSNamesystem.LOG.error(StringUtils.stringifyException(e));
@@ -4229,9 +4229,9 @@ public class FSNamesystem implements FSC
// get a new generation stamp and an access token
block.setGenerationStamp(nextGenerationStamp());
LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
- if (isAccessTokenEnabled) {
- locatedBlock.setAccessToken(accessTokenHandler.generateToken(
- block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ if (isBlockTokenEnabled) {
+ locatedBlock.setBlockToken(blockTokenSecretManager.generateToken(
+ block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
return locatedBlock;
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed May 26 23:31:56 2010
@@ -54,7 +54,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -498,8 +498,8 @@ public class NameNode implements Namenod
}
/** {@inheritDoc} */
- public ExportedAccessKeys getAccessKeys() throws IOException {
- return namesystem.getAccessKeys();
+ public ExportedBlockKeys getBlockKeys() throws IOException {
+ return namesystem.getBlockKeys();
}
@Override // NamenodeProtocol
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed May 26 23:31:56 2010
@@ -501,7 +501,7 @@ public class NamenodeFsck {
BlockReader.newBlockReader(s, targetAddr.toString() + ":" +
block.getBlockId(),
block.getBlockId(),
- lblock.getAccessToken(),
+ lblock.getBlockToken(),
block.getGenerationStamp(),
0, -1,
conf.getInt("io.file.buffer.size", 4096));
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Wed May 26 23:31:56 2010
@@ -23,7 +23,7 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@@ -47,7 +47,7 @@ implements Writable, NodeRegistration {
}
public StorageInfo storageInfo;
- public ExportedAccessKeys exportedKeys;
+ public ExportedBlockKeys exportedKeys;
/**
* Default constructor.
@@ -62,7 +62,7 @@ implements Writable, NodeRegistration {
public DatanodeRegistration(String nodeName) {
super(nodeName);
this.storageInfo = new StorageInfo();
- this.exportedKeys = new ExportedAccessKeys();
+ this.exportedKeys = new ExportedBlockKeys();
}
public void setInfoPort(int infoPort) {
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java Wed May 26 23:31:56 2010
@@ -21,24 +21,24 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
public class KeyUpdateCommand extends DatanodeCommand {
- private ExportedAccessKeys keys;
+ private ExportedBlockKeys keys;
KeyUpdateCommand() {
- this(new ExportedAccessKeys());
+ this(new ExportedBlockKeys());
}
- public KeyUpdateCommand(ExportedAccessKeys keys) {
+ public KeyUpdateCommand(ExportedBlockKeys keys) {
super(DatanodeProtocol.DNA_ACCESSKEYUPDATE);
this.keys = keys;
}
- public ExportedAccessKeys getExportedKeys() {
+ public ExportedBlockKeys getExportedKeys() {
return this.keys;
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java Wed May 26 23:31:56 2010
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo;
@@ -74,12 +74,12 @@ public interface NamenodeProtocol extend
throws IOException;
/**
- * Get the current access keys
+ * Get the current block keys
*
- * @return ExportedAccessKeys containing current access keys
+ * @return ExportedBlockKeys containing current block keys
* @throws IOException
*/
- public ExportedAccessKeys getAccessKeys() throws IOException;
+ public ExportedBlockKeys getBlockKeys() throws IOException;
/**
* Get the size of the current edit log (in bytes).
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Wed May 26 23:31:56 2010
@@ -42,9 +42,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
/** Utilities for HDFS tests */
@@ -262,8 +263,9 @@ public class DFSTestUtil {
return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
}
- public static BlockAccessToken getAccessToken(FSDataOutputStream out) {
- return ((DFSOutputStream) out.getWrappedStream()).getAccessToken();
+ public static Token<BlockTokenIdentifier> getBlockToken(
+ FSDataOutputStream out) {
+ return ((DFSOutputStream) out.getWrappedStream()).getBlockToken();
}
static void setLogLevel2All(org.apache.commons.logging.Log log) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Wed May 26 23:31:56 2010
@@ -92,7 +92,7 @@ public class TestClientBlockVerification
return BlockReader.newBlockReader(
s, targetAddr.toString()+ ":" + block.getBlockId(), block.getBlockId(),
- testBlock.getAccessToken(), block.getGenerationStamp(),
+ testBlock.getBlockToken(), block.getGenerationStamp(),
offset, lenToRead,
conf.getInt("io.file.buffer.size", 4096));
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Wed May 26 23:31:56 2010
@@ -49,7 +49,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.IOUtils;
@@ -170,7 +170,7 @@ public class TestDataTransferProtocol ex
DataTransferProtocol.Sender.opWriteBlock(sendOut,
block.getBlockId(), block.getGenerationStamp(), 0,
stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
- new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+ new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
if (eofExcepted) {
ERROR.write(recvOut);
sendRecvData(description, true);
@@ -356,7 +356,7 @@ public class TestDataTransferProtocol ex
DataTransferProtocol.Sender.opWriteBlock(sendOut,
newBlockId, 0L, 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
- new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+ new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
// bad bytes per checksum
@@ -370,7 +370,7 @@ public class TestDataTransferProtocol ex
DataTransferProtocol.Sender.opWriteBlock(sendOut,
++newBlockId, 0L, 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
- new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+ new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512);
sendOut.writeInt(4); // size of packet
@@ -392,7 +392,7 @@ public class TestDataTransferProtocol ex
DataTransferProtocol.Sender.opWriteBlock(sendOut,
++newBlockId, 0L, 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
- new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+ new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
sendOut.writeInt(8); // size of packet
@@ -423,7 +423,7 @@ public class TestDataTransferProtocol ex
sendOut.writeLong(fileLen);
ERROR.write(recvOut);
Text.writeString(sendOut, "cl");
- BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+ BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
// negative block start offset
@@ -435,7 +435,7 @@ public class TestDataTransferProtocol ex
sendOut.writeLong(-1L);
sendOut.writeLong(fileLen);
Text.writeString(sendOut, "cl");
- BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+ BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
sendRecvData("Negative start-offset for read for block " +
firstBlock.getBlockId(), false);
@@ -448,7 +448,7 @@ public class TestDataTransferProtocol ex
sendOut.writeLong(fileLen);
sendOut.writeLong(fileLen);
Text.writeString(sendOut, "cl");
- BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+ BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
sendRecvData("Wrong start-offset for reading block " +
firstBlock.getBlockId(), false);
@@ -463,7 +463,7 @@ public class TestDataTransferProtocol ex
sendOut.writeLong(0);
sendOut.writeLong(-1-random.nextInt(oneMil));
Text.writeString(sendOut, "cl");
- BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+ BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
sendRecvData("Negative length for reading block " +
firstBlock.getBlockId(), false);
@@ -478,7 +478,7 @@ public class TestDataTransferProtocol ex
sendOut.writeLong(0);
sendOut.writeLong(fileLen + 1);
Text.writeString(sendOut, "cl");
- BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+ BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
sendRecvData("Wrong length for reading block " +
firstBlock.getBlockId(), false);
@@ -491,7 +491,7 @@ public class TestDataTransferProtocol ex
sendOut.writeLong(0);
sendOut.writeLong(fileLen);
Text.writeString(sendOut, "cl");
- BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+ BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
readFile(fileSys, file, fileLen);
} finally {
cluster.shutdown();
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java Wed May 26 23:31:56 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/** Utilities for security tests */
+public class SecurityTestUtil {
+
+ /**
+ * check if an access token is expired. return true when token is expired,
+ * false otherwise
+ */
+ public static boolean isBlockTokenExpired(Token<BlockTokenIdentifier> token)
+ throws IOException {
+ return BlockTokenSecretManager.isTokenExpired(token);
+ }
+
+ /**
+ * set access token lifetime.
+ */
+ public static void setBlockTokenLifetime(BlockTokenSecretManager handler,
+ long tokenLifetime) {
+ handler.setTokenLifetime(tokenLifetime);
+ }
+
+}
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java Wed May 26 23:31:56 2010
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.TestWritable;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslRpcClient;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.log4j.Level;
+
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/** Unit tests for block tokens */
+public class TestBlockToken {
+ public static final Log LOG = LogFactory.getLog(TestBlockToken.class);
+ private static final String ADDRESS = "0.0.0.0";
+
+ static final String SERVER_PRINCIPAL_KEY = "test.ipc.server.principal";
+ private static Configuration conf;
+ static {
+ conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ }
+
+ static {
+ ((Log4JLogger) Client.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) Server.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) SaslRpcClient.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) SaslRpcServer.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) SaslInputStream.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ long blockKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
+ long blockTokenLifetime = 2 * 60 * 1000; // 2 mins
+ Block block1 = new Block(0L);
+ Block block2 = new Block(10L);
+ Block block3 = new Block(-108L);
+
+ private static class getLengthAnswer implements Answer<Long> {
+ BlockTokenSecretManager sm;
+ BlockTokenIdentifier ident;
+
+ public getLengthAnswer(BlockTokenSecretManager sm,
+ BlockTokenIdentifier ident) {
+ this.sm = sm;
+ this.ident = ident;
+ }
+
+ @Override
+ public Long answer(InvocationOnMock invocation) throws IOException {
+ Object args[] = invocation.getArguments();
+ assertEquals(1, args.length);
+ Block block = (Block) args[0];
+ Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
+ .getTokenIdentifiers();
+ assertEquals("Only one BlockTokenIdentifier expected", 1, tokenIds.size());
+ long result = 0;
+ for (TokenIdentifier tokenId : tokenIds) {
+ BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
+ LOG.info("Got: " + id.toString());
+ assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
+ sm.checkAccess(id, null, block, BlockTokenSecretManager.AccessMode.WRITE);
+ result = id.getBlockId();
+ }
+ return result;
+ }
+ }
+
+ private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
+ Block block, EnumSet<BlockTokenSecretManager.AccessMode> accessModes)
+ throws IOException {
+ Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes);
+ BlockTokenIdentifier id = sm.createIdentifier();
+ id.readFields(new DataInputStream(new ByteArrayInputStream(token
+ .getIdentifier())));
+ return id;
+ }
+
+ @Test
+ public void testWritable() throws Exception {
+ TestWritable.testWritable(new BlockTokenIdentifier());
+ BlockTokenSecretManager sm = new BlockTokenSecretManager(true,
+ blockKeyUpdateInterval, blockTokenLifetime);
+ TestWritable.testWritable(generateTokenId(sm, block1, EnumSet
+ .allOf(BlockTokenSecretManager.AccessMode.class)));
+ TestWritable.testWritable(generateTokenId(sm, block2, EnumSet
+ .of(BlockTokenSecretManager.AccessMode.WRITE)));
+ TestWritable.testWritable(generateTokenId(sm, block3, EnumSet
+ .noneOf(BlockTokenSecretManager.AccessMode.class)));
+ }
+
+ private void tokenGenerationAndVerification(BlockTokenSecretManager master,
+ BlockTokenSecretManager slave) throws Exception {
+ // single-mode tokens
+ for (BlockTokenSecretManager.AccessMode mode : BlockTokenSecretManager.AccessMode
+ .values()) {
+ // generated by master
+ Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
+ EnumSet.of(mode));
+ master.checkAccess(token1, null, block1, mode);
+ slave.checkAccess(token1, null, block1, mode);
+ // generated by slave
+ Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
+ EnumSet.of(mode));
+ master.checkAccess(token2, null, block2, mode);
+ slave.checkAccess(token2, null, block2, mode);
+ }
+ // multi-mode tokens
+ Token<BlockTokenIdentifier> mtoken = master.generateToken(block3, EnumSet
+ .allOf(BlockTokenSecretManager.AccessMode.class));
+ for (BlockTokenSecretManager.AccessMode mode : BlockTokenSecretManager.AccessMode
+ .values()) {
+ master.checkAccess(mtoken, null, block3, mode);
+ slave.checkAccess(mtoken, null, block3, mode);
+ }
+ }
+
+ /** test block key and token handling */
+ @Test
+ public void testBlockTokenSecretManager() throws Exception {
+ BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(true,
+ blockKeyUpdateInterval, blockTokenLifetime);
+ BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false,
+ blockKeyUpdateInterval, blockTokenLifetime);
+ ExportedBlockKeys keys = masterHandler.exportKeys();
+ slaveHandler.setKeys(keys);
+ tokenGenerationAndVerification(masterHandler, slaveHandler);
+ // key updating
+ masterHandler.updateKeys();
+ tokenGenerationAndVerification(masterHandler, slaveHandler);
+ keys = masterHandler.exportKeys();
+ slaveHandler.setKeys(keys);
+ tokenGenerationAndVerification(masterHandler, slaveHandler);
+ }
+
+ @Test
+ public void testBlockTokenRpc() throws Exception {
+ BlockTokenSecretManager sm = new BlockTokenSecretManager(true,
+ blockKeyUpdateInterval, blockTokenLifetime);
+ Token<BlockTokenIdentifier> token = sm.generateToken(block3,
+ EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
+
+ ClientDatanodeProtocol mockDN = mock(ClientDatanodeProtocol.class);
+ when(mockDN.getProtocolVersion(anyString(), anyLong())).thenReturn(
+ ClientDatanodeProtocol.versionID);
+ BlockTokenIdentifier id = sm.createIdentifier();
+ id.readFields(new DataInputStream(new ByteArrayInputStream(token
+ .getIdentifier())));
+ doAnswer(new getLengthAnswer(sm, id)).when(mockDN).getReplicaVisibleLength(
+ any(Block.class));
+
+ final Server server = RPC.getServer(ClientDatanodeProtocol.class, mockDN,
+ ADDRESS, 0, 5, true, conf, sm);
+
+ server.start();
+
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ final UserGroupInformation ticket = UserGroupInformation
+ .createRemoteUser(block3.toString());
+ ticket.addToken(token);
+
+ ClientDatanodeProtocol proxy = null;
+ try {
+ proxy = (ClientDatanodeProtocol) RPC.getProxy(
+ ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID, addr,
+ ticket, conf, NetUtils.getDefaultSocketFactory(conf));
+ assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
+ } finally {
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
+ }
+ }
+
+}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Wed May 26 23:31:56 2010
@@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -236,7 +236,7 @@ public class TestBlockReplacement extend
out.writeLong(block.getGenerationStamp());
Text.writeString(out, source.getStorageID());
sourceProxy.write(out);
- BlockAccessToken.DUMMY_TOKEN.write(out);
+ BlockTokenSecretManager.DUMMY_TOKEN.write(out);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Wed May 26 23:31:56 2010
@@ -249,7 +249,7 @@ public class TestDataNodeVolumeFailure e
BlockReader.newBlockReader(s, targetAddr.toString() + ":" +
block.getBlockId(),
block.getBlockId(),
- lblock.getAccessToken(),
+ lblock.getBlockToken(),
block.getGenerationStamp(),
0, -1, 4096);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=948634&r1=948633&r2=948634&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Wed May 26 23:31:56 2010
@@ -36,7 +36,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -121,7 +121,7 @@ public class TestDiskError extends TestC
block.getBlock().getGenerationStamp(), 1,
BlockConstructionStage.PIPELINE_SETUP_CREATE,
0L, 0L, 0L, "", null, new DatanodeInfo[0],
- BlockAccessToken.DUMMY_TOKEN);
+ BlockTokenSecretManager.DUMMY_TOKEN);
// write check header
out.writeByte( 1 );
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java?rev=948634&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java Wed May 26 23:31:56 2010
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.token.*;
+import org.apache.log4j.Level;
+
+import junit.framework.TestCase;
+
+public class TestBlockTokenWithDFS extends TestCase {
+
+ private static final int BLOCK_SIZE = 1024;
+ private static final int FILE_SIZE = 2 * BLOCK_SIZE;
+ private static final String FILE_TO_READ = "/fileToRead.dat";
+ private static final String FILE_TO_WRITE = "/fileToWrite.dat";
+ private static final String FILE_TO_APPEND = "/fileToAppend.dat";
+ private final byte[] rawData = new byte[FILE_SIZE];
+
+ {
+ ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ Random r = new Random();
+ r.nextBytes(rawData);
+ }
+
+ private void createFile(FileSystem fs, Path filename) throws IOException {
+ FSDataOutputStream out = fs.create(filename);
+ out.write(rawData);
+ out.close();
+ }
+
+ // read a file using blockSeekTo()
+ private boolean checkFile1(FSDataInputStream in) {
+ byte[] toRead = new byte[FILE_SIZE];
+ int totalRead = 0;
+ int nRead = 0;
+ try {
+ while ((nRead = in.read(toRead, totalRead, toRead.length - totalRead)) > 0) {
+ totalRead += nRead;
+ }
+ } catch (IOException e) {
+ return false;
+ }
+ assertEquals("Cannot read file.", toRead.length, totalRead);
+ return checkFile(toRead);
+ }
+
+ // read a file using fetchBlockByteRange()
+ private boolean checkFile2(FSDataInputStream in) {
+ byte[] toRead = new byte[FILE_SIZE];
+ try {
+ assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0,
+ toRead.length));
+ } catch (IOException e) {
+ return false;
+ }
+ return checkFile(toRead);
+ }
+
+ private boolean checkFile(byte[] fileToCheck) {
+ if (fileToCheck.length != rawData.length) {
+ return false;
+ }
+ for (int i = 0; i < fileToCheck.length; i++) {
+ if (fileToCheck[i] != rawData[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // creates a file and returns a descriptor for writing to it
+ private static FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+ short repl, long blockSize) throws IOException {
+ FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
+ .getInt("io.file.buffer.size", 4096), repl, blockSize);
+ return stm;
+ }
+
+ // try reading a block using a BlockReader directly
+ private static void tryRead(Configuration conf, LocatedBlock lblock,
+ boolean shouldSucceed) {
+ InetSocketAddress targetAddr = null;
+ Socket s = null;
+ BlockReader blockReader = null;
+ Block block = lblock.getBlock();
+ try {
+ DatanodeInfo[] nodes = lblock.getLocations();
+ targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
+ s = new Socket();
+ s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+ s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+ blockReader = BlockReader.newBlockReader(s, targetAddr
+ .toString()
+ + ":" + block.getBlockId(), block.getBlockId(), lblock
+ .getBlockToken(), block.getGenerationStamp(), 0, -1, conf.getInt(
+ "io.file.buffer.size", 4096));
+
+ } catch (IOException ex) {
+ if (ex instanceof InvalidBlockTokenException) {
+ assertFalse("OP_READ_BLOCK: access token is invalid, "
+ + "when it is expected to be valid", shouldSucceed);
+ return;
+ }
+ fail("OP_READ_BLOCK failed due to reasons other than access token");
+ } finally {
+ if (s != null) {
+ try {
+ s.close();
+ } catch (IOException iex) {
+ } finally {
+ s = null;
+ }
+ }
+ }
+ if (blockReader == null) {
+ fail("OP_READ_BLOCK failed due to reasons other than access token");
+ }
+ assertTrue("OP_READ_BLOCK: access token is valid, "
+ + "when it is expected to be invalid", shouldSucceed);
+ }
+
+ // get a conf for testing
+ private static Configuration getConf(int numDataNodes) throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+ conf.setLong("dfs.block.size", BLOCK_SIZE);
+ conf.setInt("io.bytes.per.checksum", BLOCK_SIZE);
+ conf.setInt("dfs.heartbeat.interval", 1);
+ conf.setInt("dfs.replication", numDataNodes);
+ conf.setInt("ipc.client.connect.max.retries", 0);
+ conf.setBoolean("dfs.support.append", true);
+ return conf;
+ }
+
+ /*
+ * testing that APPEND operation can handle token expiration when
+ * re-establishing pipeline is needed
+ */
+ public void testAppend() throws Exception {
+ MiniDFSCluster cluster = null;
+ int numDataNodes = 2;
+ Configuration conf = getConf(numDataNodes);
+
+ try {
+ cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+ // set a short token lifetime (1 second)
+ SecurityTestUtil.setBlockTokenLifetime(
+ cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
+ Path fileToAppend = new Path(FILE_TO_APPEND);
+ FileSystem fs = cluster.getFileSystem();
+
+ // write a one-byte file
+ FSDataOutputStream stm = writeFile(fs, fileToAppend,
+ (short) numDataNodes, BLOCK_SIZE);
+ stm.write(rawData, 0, 1);
+ stm.close();
+ // open the file again for append
+ stm = fs.append(fileToAppend);
+ int mid = rawData.length - 1;
+ stm.write(rawData, 1, mid - 1);
+ stm.sync();
+
+ /*
+ * wait till token used in stm expires
+ */
+ Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm);
+ while (!SecurityTestUtil.isBlockTokenExpired(token)) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ // remove a datanode to force re-establishing pipeline
+ cluster.stopDataNode(0);
+ // append the rest of the file
+ stm.write(rawData, mid, rawData.length - mid);
+ stm.close();
+ // check if append is successful
+ FSDataInputStream in5 = fs.open(fileToAppend);
+ assertTrue(checkFile1(in5));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /*
+ * testing that WRITE operation can handle token expiration when
+ * re-establishing pipeline is needed
+ */
+ public void testWrite() throws Exception {
+ MiniDFSCluster cluster = null;
+ int numDataNodes = 2;
+ Configuration conf = getConf(numDataNodes);
+
+ try {
+ cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+ // set a short token lifetime (1 second)
+ SecurityTestUtil.setBlockTokenLifetime(
+ cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
+ Path fileToWrite = new Path(FILE_TO_WRITE);
+ FileSystem fs = cluster.getFileSystem();
+
+ FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes,
+ BLOCK_SIZE);
+ // write a partial block
+ int mid = rawData.length - 1;
+ stm.write(rawData, 0, mid);
+ stm.sync();
+
+ /*
+ * wait till token used in stm expires
+ */
+ Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm);
+ while (!SecurityTestUtil.isBlockTokenExpired(token)) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ // remove a datanode to force re-establishing pipeline
+ cluster.stopDataNode(0);
+ // write the rest of the file
+ stm.write(rawData, mid, rawData.length - mid);
+ stm.close();
+ // check if write is successful
+ FSDataInputStream in4 = fs.open(fileToWrite);
+ assertTrue(checkFile1(in4));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ public void testRead() throws Exception {
+ MiniDFSCluster cluster = null;
+ int numDataNodes = 2;
+ Configuration conf = getConf(numDataNodes);
+
+ try {
+ cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+ // set a short token lifetime (1 second) initially
+ SecurityTestUtil.setBlockTokenLifetime(
+ cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
+ Path fileToRead = new Path(FILE_TO_READ);
+ FileSystem fs = cluster.getFileSystem();
+ createFile(fs, fileToRead);
+
+ /*
+ * setup for testing expiration handling of cached tokens
+ */
+
+ // read using blockSeekTo(). Acquired tokens are cached in in1
+ FSDataInputStream in1 = fs.open(fileToRead);
+ assertTrue(checkFile1(in1));
+ // read using blockSeekTo(). Acquired tokens are cached in in2
+ FSDataInputStream in2 = fs.open(fileToRead);
+ assertTrue(checkFile1(in2));
+ // read using fetchBlockByteRange(). Acquired tokens are cached in in3
+ FSDataInputStream in3 = fs.open(fileToRead);
+ assertTrue(checkFile2(in3));
+
+ /*
+ * testing READ interface on DN using a BlockReader
+ */
+
+ DFSClient dfsclient = new DFSClient(new InetSocketAddress("localhost",
+ cluster.getNameNodePort()), conf);
+ List<LocatedBlock> locatedBlocks = cluster.getNameNode().getBlockLocations(
+ FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
+ LocatedBlock lblock = locatedBlocks.get(0); // first block
+ Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
+ // verify token is not expired
+ assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken));
+ // read with valid token, should succeed
+ tryRead(conf, lblock, true);
+
+ /*
+ * wait till myToken and all cached tokens in in1, in2 and in3 expire
+ */
+
+ while (!SecurityTestUtil.isBlockTokenExpired(myToken)) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ /*
+ * continue testing READ interface on DN using a BlockReader
+ */
+
+ // verify token is expired
+ assertTrue(SecurityTestUtil.isBlockTokenExpired(myToken));
+ // read should fail
+ tryRead(conf, lblock, false);
+ // use a valid new token
+ lblock.setBlockToken(cluster.getNameNode().getNamesystem()
+ .blockTokenSecretManager.generateToken(lblock.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
+ // read should succeed
+ tryRead(conf, lblock, true);
+ // use a token with wrong blockID
+ Block wrongBlock = new Block(lblock.getBlock().getBlockId() + 1);
+ lblock.setBlockToken(cluster.getNameNode().getNamesystem()
+ .blockTokenSecretManager.generateToken(wrongBlock,
+ EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
+ // read should fail
+ tryRead(conf, lblock, false);
+ // use a token with wrong access modes
+ lblock.setBlockToken(cluster.getNameNode().getNamesystem()
+ .blockTokenSecretManager.generateToken(lblock.getBlock(), EnumSet.of(
+ BlockTokenSecretManager.AccessMode.WRITE,
+ BlockTokenSecretManager.AccessMode.COPY,
+ BlockTokenSecretManager.AccessMode.REPLACE)));
+ // read should fail
+ tryRead(conf, lblock, false);
+
+ // set a long token lifetime for future tokens
+ SecurityTestUtil.setBlockTokenLifetime(
+ cluster.getNameNode().getNamesystem().blockTokenSecretManager, 600 * 1000L);
+
+ /*
+ * testing that when cached tokens are expired, DFSClient will re-fetch
+ * tokens transparently for READ.
+ */
+
+ // confirm all tokens cached in in1 are expired by now
+ List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
+ for (LocatedBlock blk : lblocks) {
+ assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+ }
+ // verify blockSeekTo() is able to re-fetch token transparently
+ in1.seek(0);
+ assertTrue(checkFile1(in1));
+
+ // confirm all tokens cached in in2 are expired by now
+ List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
+ for (LocatedBlock blk : lblocks2) {
+ assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+ }
+ // verify blockSeekTo() is able to re-fetch token transparently (testing
+ // via another interface method)
+ assertTrue(in2.seekToNewSource(0));
+ assertTrue(checkFile1(in2));
+
+ // confirm all tokens cached in in3 are expired by now
+ List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
+ for (LocatedBlock blk : lblocks3) {
+ assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+ }
+ // verify fetchBlockByteRange() is able to re-fetch token transparently
+ assertTrue(checkFile2(in3));
+
+ /*
+ * testing that after datanodes are restarted on the same ports, cached
+ * tokens should still work and there is no need to fetch new tokens from
+ * namenode. This test should run while namenode is down (to make sure no
+ * new tokens can be fetched from namenode).
+ */
+
+ // restart datanodes on the same ports that they currently use
+ assertTrue(cluster.restartDataNodes(true));
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+ cluster.shutdownNameNode();
+
+ // confirm tokens cached in in1 are still valid
+ lblocks = DFSTestUtil.getAllBlocks(in1);
+ for (LocatedBlock blk : lblocks) {
+ assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+ }
+ // verify blockSeekTo() still works (forced to use cached tokens)
+ in1.seek(0);
+ assertTrue(checkFile1(in1));
+
+ // confirm tokens cached in in2 are still valid
+ lblocks2 = DFSTestUtil.getAllBlocks(in2);
+ for (LocatedBlock blk : lblocks2) {
+ assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+ }
+ // verify blockSeekTo() still works (forced to use cached tokens)
+ in2.seekToNewSource(0);
+ assertTrue(checkFile1(in2));
+
+ // confirm tokens cached in in3 are still valid
+ lblocks3 = DFSTestUtil.getAllBlocks(in3);
+ for (LocatedBlock blk : lblocks3) {
+ assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
+ }
+ // verify fetchBlockByteRange() still works (forced to use cached tokens)
+ assertTrue(checkFile2(in3));
+
+ /*
+ * testing that when namenode is restarted, cached tokens should still
+ * work and there is no need to fetch new tokens from namenode. Like the
+ * previous test, this test should also run while namenode is down. The
+ * setup for this test depends on the previous test.
+ */
+
+ // restart the namenode and then shut it down for test
+ cluster.restartNameNode();
+ cluster.shutdownNameNode();
+
+ // verify blockSeekTo() still works (forced to use cached tokens)
+ in1.seek(0);
+ assertTrue(checkFile1(in1));
+ // verify again blockSeekTo() still works (forced to use cached tokens)
+ in2.seekToNewSource(0);
+ assertTrue(checkFile1(in2));
+ // verify fetchBlockByteRange() still works (forced to use cached tokens)
+ assertTrue(checkFile2(in3));
+
+ /*
+ * testing that after both namenode and datanodes got restarted (namenode
+ * first, followed by datanodes), DFSClient can't access DN without
+ * re-fetching tokens and is able to re-fetch tokens transparently. The
+ * setup of this test depends on the previous test.
+ */
+
+ // restore the cluster and restart the datanodes for test
+ cluster.restartNameNode();
+ assertTrue(cluster.restartDataNodes(true));
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+
+ // shutdown namenode so that DFSClient can't get new tokens from namenode
+ cluster.shutdownNameNode();
+
+ // verify blockSeekTo() fails (cached tokens become invalid)
+ in1.seek(0);
+ assertFalse(checkFile1(in1));
+ // verify fetchBlockByteRange() fails (cached tokens become invalid)
+ assertFalse(checkFile2(in3));
+
+ // restart the namenode to allow DFSClient to re-fetch tokens
+ cluster.restartNameNode();
+ // verify blockSeekTo() works again (by transparently re-fetching
+ // tokens from namenode)
+ in1.seek(0);
+ assertTrue(checkFile1(in1));
+ in2.seekToNewSource(0);
+ assertTrue(checkFile1(in2));
+ // verify fetchBlockByteRange() works again (by transparently
+ // re-fetching tokens from namenode)
+ assertTrue(checkFile2(in3));
+
+ /*
+ * testing that when datanodes are restarted on different ports, DFSClient
+ * is able to re-fetch tokens transparently to connect to them
+ */
+
+ // restart datanodes on newly assigned ports
+ assertTrue(cluster.restartDataNodes(false));
+ cluster.waitActive();
+ assertEquals(numDataNodes, cluster.getDataNodes().size());
+ // verify blockSeekTo() is able to re-fetch token transparently
+ in1.seek(0);
+ assertTrue(checkFile1(in1));
+ // verify blockSeekTo() is able to re-fetch token transparently
+ in2.seekToNewSource(0);
+ assertTrue(checkFile1(in2));
+ // verify fetchBlockByteRange() is able to re-fetch token transparently
+ assertTrue(checkFile2(in3));
+
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /*
+ * Integration testing of access token, involving NN, DN, and Balancer
+ */
+ public void testEnd2End() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+ new TestBalancer().integrationTest(conf);
+ }
+}